From 26a356d0f5d58ae9cc1e77e214a0ae2b1d2ce31b Mon Sep 17 00:00:00 2001 From: fufesou Date: Wed, 29 Apr 2026 23:11:13 +0800 Subject: [PATCH] fix(terminal): reconnect, refactor Signed-off-by: fufesou --- flutter/lib/models/terminal_model.dart | 6 +- src/flutter.rs | 2 - src/server/terminal_service.rs | 91 ++++++++------------------ 3 files changed, 29 insertions(+), 70 deletions(-) diff --git a/flutter/lib/models/terminal_model.dart b/flutter/lib/models/terminal_model.dart index 20b5b8be5..cd7167805 100644 --- a/flutter/lib/models/terminal_model.dart +++ b/flutter/lib/models/terminal_model.dart @@ -288,7 +288,8 @@ class TerminalModel with ChangeNotifier { // On reconnect, the server may replay recent output. That replay can include // terminal queries like DSR/DA; xterm answers them through onOutput as // "^[[1;1R^[[2;2R^[[>0;0;0c", which must not be sent back to the peer. - _suppressNextTerminalDataOutput = evt['replay_in_next_data'] == true; + _suppressNextTerminalDataOutput = + message == 'Reconnected to existing terminal with pending output'; // Fallback: if terminal view is not yet ready but already has valid // dimensions (e.g. layout completed before open response arrived), @@ -347,8 +348,7 @@ class TerminalModel with ChangeNotifier { final data = evt['data']; if (data != null) { - final suppressTerminalOutput = - evt['replay'] == true || _suppressNextTerminalDataOutput; + final suppressTerminalOutput = _suppressNextTerminalDataOutput; _suppressNextTerminalDataOutput = false; try { String text = ''; diff --git a/src/flutter.rs b/src/flutter.rs index 09115a634..c7e07f892 100644 --- a/src/flutter.rs +++ b/src/flutter.rs @@ -1135,7 +1135,6 @@ impl InvokeUiSession for FlutterHandler { ("message", json!(&opened.message)), ("pid", json!(opened.pid)), ("service_id", json!(&opened.service_id)), - ("replay_in_next_data", json!(opened.replay_in_next_data)), ]; if !opened.persistent_sessions.is_empty() { event_data.push(("persistent_sessions", json!(opened.persistent_sessions))); @@ -1155,7 +1154,6 @@ impl InvokeUiSession for FlutterHandler { ("type", json!("data")), ("terminal_id", json!(data.terminal_id)), ("data", json!(&encoded)), - ("replay", json!(data.replay)), ]; self.push_event_("terminal_response", &event_data, &[], &[]); } diff --git a/src/server/terminal_service.rs b/src/server/terminal_service.rs index 4975dc673..6aceabba1 100644 --- a/src/server/terminal_service.rs +++ b/src/server/terminal_service.rs @@ -35,7 +35,6 @@ const CHANNEL_BUFFER_SIZE: usize = 500; // Channel buffer size. Max per-message const COMPRESS_THRESHOLD: usize = 512; // Compress terminal data larger than this // Default max bytes for reconnection buffer replay. const DEFAULT_RECONNECT_BUFFER_BYTES: usize = 8 * 1024; -const MAX_REPLAY_RESPONSE_BYTES: usize = DEFAULT_RECONNECT_BUFFER_BYTES; const MAX_SIGWINCH_PHASE_ATTEMPTS: u8 = 3; // Max attempts per SIGWINCH phase before giving up /// Two-phase SIGWINCH trigger for TUI app redraw on reconnection. @@ -723,7 +722,6 @@ pub struct TerminalSession { reader_thread: Option>, writer_thread: Option>, output_buffer: OutputBuffer, - pending_replay_chunks: VecDeque>, title: String, pid: u32, rows: u16, @@ -753,7 +751,6 @@ impl TerminalSession { reader_thread: None, writer_thread: None, output_buffer: OutputBuffer::new(), - pending_replay_chunks: VecDeque::new(), title: format!("Terminal {}", terminal_id), pid: 0, rows, @@ -1073,33 +1070,37 @@ impl TerminalServiceProxy { // Reconnect to existing terminal let mut session = session_arc.lock().unwrap(); // Directly enter Active state with pending replay for immediate streaming. - // The replay starts with output_buffer history and the channel backlog that was - // already pending at reconnect time. Keep replay data in capped chunks so the - // client can suppress stale xterm query answers without oversized messages. + // The replay combines output_buffer history and the channel backlog that was + // already pending at reconnect time so the client can suppress stale xterm + // query answers without requiring a protobuf schema change. // During disconnect, read_outputs() is not called; channel data can still be lost // if output_rx fills before reconnect drains it. - let buffer = session + let mut buffer = session .output_buffer .get_recent(DEFAULT_RECONNECT_BUFFER_BYTES); - session.pending_replay_chunks.clear(); let mut reconnect_backlog = Vec::new(); if let Some(output_rx) = &session.output_rx { - while let Ok(data) = output_rx.try_recv() { + // Cap reconnect-time drain so a chatty PTY cannot keep OpenTerminal + // inside this loop indefinitely. Remaining output is drained by read_outputs(). + for _ in 0..CHANNEL_BUFFER_SIZE { + let Ok(data) = output_rx.try_recv() else { + break; + }; reconnect_backlog.push(data); } } + let has_reconnect_backlog = !reconnect_backlog.is_empty(); for data in reconnect_backlog { session.output_buffer.append(&data); - Self::push_replay_chunk(&mut session.pending_replay_chunks, data); } - let has_pending = !buffer.is_empty() || !session.pending_replay_chunks.is_empty(); - let pending_buffer = if !buffer.is_empty() { - Some(buffer) - } else { - session.pending_replay_chunks.pop_front() - }; + if has_reconnect_backlog { + buffer = session + .output_buffer + .get_recent(DEFAULT_RECONNECT_BUFFER_BYTES); + } + let has_pending = !buffer.is_empty(); session.state = SessionState::Active { - pending_buffer, + pending_buffer: if has_pending { Some(buffer) } else { None }, // Always trigger two-phase SIGWINCH on reconnect to force TUI app redraw, // regardless of whether there's pending buffer data. This avoids edge cases // where buffer is empty but a TUI app (top/htop) still needs a full redraw. @@ -1115,7 +1116,6 @@ impl TerminalServiceProxy { } else { "Reconnected to existing terminal".to_string() }; - opened.replay_in_next_data = has_pending; opened.pid = session.pid; opened.service_id = self.service_id.clone(); if service.needs_session_sync { @@ -1804,29 +1804,11 @@ impl TerminalServiceProxy { } } - fn push_replay_chunk(chunks: &mut VecDeque>, data: Vec) { - if data.is_empty() { - return; - } - if let Some(last) = chunks.back_mut() { - if last.len() + data.len() <= MAX_REPLAY_RESPONSE_BYTES { - last.extend_from_slice(&data); - return; - } - } - chunks.push_back(data); - } - /// Helper to create a TerminalResponse with optional compression. - fn create_terminal_data_response( - terminal_id: i32, - data: Vec, - replay: bool, - ) -> TerminalResponse { + fn create_terminal_data_response(terminal_id: i32, data: Vec) -> TerminalResponse { let mut response = TerminalResponse::new(); let mut terminal_data = TerminalData::new(); terminal_data.terminal_id = terminal_id; - terminal_data.replay = replay; if data.len() > COMPRESS_THRESHOLD { let compressed = compress::compress(&data); @@ -1892,20 +1874,11 @@ impl TerminalServiceProxy { // is not called, so channel data produced after disconnect may be lost. let mut has_activity = false; let mut received_data = Vec::new(); - let has_pending_replay = matches!( - &session.state, - SessionState::Active { - pending_buffer: Some(_), - .. - } - ); - if !has_pending_replay { - if let Some(output_rx) = &session.output_rx { - // Try to read all available data - while let Ok(data) = output_rx.try_recv() { - has_activity = true; - received_data.push(data); - } + if let Some(output_rx) = &session.output_rx { + // Try to read all available data + while let Ok(data) = output_rx.try_recv() { + has_activity = true; + received_data.push(data); } } @@ -1966,15 +1939,7 @@ impl TerminalServiceProxy { if let Some(buffer) = replay_buffer { if !buffer.is_empty() { - responses.push(Self::create_terminal_data_response( - terminal_id, - buffer, - true, - )); - } - let next_replay_buffer = session.pending_replay_chunks.pop_front(); - if let SessionState::Active { pending_buffer, .. } = &mut session.state { - *pending_buffer = next_replay_buffer; + responses.push(Self::create_terminal_data_response(terminal_id, buffer)); } } @@ -2021,11 +1986,7 @@ impl TerminalServiceProxy { // Send real-time data after historical buffer for data in received_data { - responses.push(Self::create_terminal_data_response( - terminal_id, - data, - false, - )); + responses.push(Self::create_terminal_data_response(terminal_id, data)); } } }