From 18479129a2386e7c11e23a51a73dc7b8c1930653 Mon Sep 17 00:00:00 2001 From: fufesou Date: Wed, 29 Apr 2026 22:12:26 +0800 Subject: [PATCH] fix: cap terminal reconnect replay output - split reconnect replay backlog into capped chunks - mark terminal data replay chunks for client-side suppression - avoid using open-message text to suppress xterm replies - reuse default terminal padding value - remove misleading Enter-key normalization PR link Signed-off-by: fufesou --- flutter/lib/desktop/pages/terminal_page.dart | 5 +- flutter/lib/models/terminal_model.dart | 5 +- src/flutter.rs | 1 + src/server/terminal_service.rs | 118 ++++++++++++------- 4 files changed, 84 insertions(+), 45 deletions(-) diff --git a/flutter/lib/desktop/pages/terminal_page.dart b/flutter/lib/desktop/pages/terminal_page.dart index e63c3b8e0..d38dc4a8b 100644 --- a/flutter/lib/desktop/pages/terminal_page.dart +++ b/flutter/lib/desktop/pages/terminal_page.dart @@ -176,7 +176,10 @@ class _TerminalPageState extends State return _defaultTerminalPadding; } final topBottom = extraSpace / 2.0; - return EdgeInsets.symmetric(horizontal: 5.0, vertical: topBottom); + return EdgeInsets.symmetric( + horizontal: _defaultTerminalPadding.horizontal / 2, + vertical: topBottom, + ); } @override diff --git a/flutter/lib/models/terminal_model.dart b/flutter/lib/models/terminal_model.dart index d8d6d9a87..20b5b8be5 100644 --- a/flutter/lib/models/terminal_model.dart +++ b/flutter/lib/models/terminal_model.dart @@ -41,7 +41,7 @@ class TerminalModel with ChangeNotifier { Future _handleInput(String data) async { // Soft keyboards (notably iOS) emit '\n' when Enter is pressed, while a // real keyboard's Enter sends '\r'. Some Android keyboards also emit '\n'. - // - Peer Windows: '\r' works, '\n' is just a newline (https://github.com/rustdesk/rustdesk/pull/14736). + // - Peer Windows: '\r' works, '\n' is just a newline. // - Peer Linux: canonical-mode shells accept both, but raw-mode apps // (readline, prompt_toolkit, vim, TUI frameworks) expect '\r'. // - Peer macOS: same as Linux, raw-mode apps expect '\r' @@ -347,7 +347,8 @@ class TerminalModel with ChangeNotifier { final data = evt['data']; if (data != null) { - final suppressTerminalOutput = _suppressNextTerminalDataOutput; + final suppressTerminalOutput = + evt['replay'] == true || _suppressNextTerminalDataOutput; _suppressNextTerminalDataOutput = false; try { String text = ''; diff --git a/src/flutter.rs b/src/flutter.rs index 3d416469b..09115a634 100644 --- a/src/flutter.rs +++ b/src/flutter.rs @@ -1155,6 +1155,7 @@ impl InvokeUiSession for FlutterHandler { ("type", json!("data")), ("terminal_id", json!(data.terminal_id)), ("data", json!(&encoded)), + ("replay", json!(data.replay)), ]; self.push_event_("terminal_response", &event_data, &[], &[]); } diff --git a/src/server/terminal_service.rs b/src/server/terminal_service.rs index 7be381f24..ff8e1742f 100644 --- a/src/server/terminal_service.rs +++ b/src/server/terminal_service.rs @@ -35,6 +35,7 @@ const CHANNEL_BUFFER_SIZE: usize = 500; // Channel buffer size. Max per-message const COMPRESS_THRESHOLD: usize = 512; // Compress terminal data larger than this // Default max bytes for reconnection buffer replay. const DEFAULT_RECONNECT_BUFFER_BYTES: usize = 8 * 1024; +const MAX_REPLAY_RESPONSE_BYTES: usize = DEFAULT_RECONNECT_BUFFER_BYTES; const MAX_SIGWINCH_PHASE_ATTEMPTS: u8 = 3; // Max attempts per SIGWINCH phase before giving up /// Two-phase SIGWINCH trigger for TUI app redraw on reconnection. @@ -722,6 +723,7 @@ pub struct TerminalSession { reader_thread: Option>, writer_thread: Option>, output_buffer: OutputBuffer, + pending_replay_chunks: VecDeque>, title: String, pid: u32, rows: u16, @@ -751,6 +753,7 @@ impl TerminalSession { reader_thread: None, writer_thread: None, output_buffer: OutputBuffer::new(), + pending_replay_chunks: VecDeque::new(), title: format!("Terminal {}", terminal_id), pid: 0, rows, @@ -1070,15 +1073,15 @@ impl TerminalServiceProxy { // Reconnect to existing terminal let mut session = session_arc.lock().unwrap(); // Directly enter Active state with pending replay for immediate streaming. - // The replay starts with output_buffer history, then drains any current channel - // backlog into the same pending response. Keeping reconnect backlog in the first - // response lets the client suppress xterm query answers for the whole replay batch. + // The replay starts with output_buffer history and the channel backlog that was + // already pending at reconnect time. Keep replay data in capped chunks so the + // client can suppress stale xterm query answers without oversized messages. // During disconnect, read_outputs() is not called; channel data can still be lost // if output_rx fills before reconnect drains it. let buffer = session .output_buffer .get_recent(DEFAULT_RECONNECT_BUFFER_BYTES); - let mut pending_buffer = buffer; + session.pending_replay_chunks.clear(); let mut reconnect_backlog = Vec::new(); if let Some(output_rx) = &session.output_rx { while let Ok(data) = output_rx.try_recv() { @@ -1087,15 +1090,16 @@ impl TerminalServiceProxy { } for data in reconnect_backlog { session.output_buffer.append(&data); - pending_buffer.extend_from_slice(&data); + Self::push_replay_chunk(&mut session.pending_replay_chunks, data); } - let has_pending = !pending_buffer.is_empty(); + let has_pending = !buffer.is_empty() || !session.pending_replay_chunks.is_empty(); + let pending_buffer = if !buffer.is_empty() { + Some(buffer) + } else { + session.pending_replay_chunks.pop_front() + }; session.state = SessionState::Active { - pending_buffer: if has_pending { - Some(pending_buffer) - } else { - None - }, + pending_buffer, // Always trigger two-phase SIGWINCH on reconnect to force TUI app redraw, // regardless of whether there's pending buffer data. This avoids edge cases // where buffer is empty but a TUI app (top/htop) still needs a full redraw. @@ -1800,11 +1804,29 @@ impl TerminalServiceProxy { } } + fn push_replay_chunk(chunks: &mut VecDeque>, data: Vec) { + if data.is_empty() { + return; + } + if let Some(last) = chunks.back_mut() { + if last.len() + data.len() <= MAX_REPLAY_RESPONSE_BYTES { + last.extend_from_slice(&data); + return; + } + } + chunks.push_back(data); + } + /// Helper to create a TerminalResponse with optional compression. - fn create_terminal_data_response(terminal_id: i32, data: Vec) -> TerminalResponse { + fn create_terminal_data_response( + terminal_id: i32, + data: Vec, + replay: bool, + ) -> TerminalResponse { let mut response = TerminalResponse::new(); let mut terminal_data = TerminalData::new(); terminal_data.terminal_id = terminal_id; + terminal_data.replay = replay; if data.len() > COMPRESS_THRESHOLD { let compressed = compress::compress(&data); @@ -1870,16 +1892,21 @@ impl TerminalServiceProxy { // is not called, so channel data produced after disconnect may be lost. let mut has_activity = false; let mut received_data = Vec::new(); - if let Some(output_rx) = &session.output_rx { - // Try to read all available data - while let Ok(data) = output_rx.try_recv() { - has_activity = true; - received_data.push(data); + let has_pending_replay = matches!( + &session.state, + SessionState::Active { + pending_buffer: Some(_), + .. + } + ); + if !has_pending_replay { + if let Some(output_rx) = &session.output_rx { + // Try to read all available data + while let Ok(data) = output_rx.try_recv() { + has_activity = true; + received_data.push(data); + } } - } - - if has_activity { - session.update_activity(); } // Update buffer (always buffer for reconnection support) @@ -1891,7 +1918,7 @@ impl TerminalServiceProxy { // Data is already buffered above and will be sent on next reconnection. // Use a scoped block to limit the mutable borrow of session.state, // so we can immutably borrow other session fields afterwards. - let sigwinch_action = { + let (replay_buffer, sigwinch_action) = { let (pending_buffer, sigwinch) = match &mut session.state { SessionState::Active { pending_buffer, @@ -1900,28 +1927,12 @@ impl TerminalServiceProxy { _ => continue, }; - // Send pending replay first (set on reconnection in handle_open). If new - // channel data was drained in this same read_outputs() cycle, keep it in the - // replay response so the client suppresses one complete reconnect batch. - if let Some(buffer) = pending_buffer.take() { - let mut buffer = buffer; - for data in received_data.drain(..) { - // Reconnect replay can include terminal queries like DSR/DA. - // Keep this first backlog batch in one response so the client can - // suppress xterm-generated answers and avoid printing - // "^[[1;1R^[[2;2R^[[>0;0;0c" back to the remote shell. - buffer.extend_from_slice(&data); - } - if !buffer.is_empty() { - responses - .push(Self::create_terminal_data_response(terminal_id, buffer)); - } - } + let replay_buffer = pending_buffer.take(); // Two-phase SIGWINCH: see SigwinchPhase doc comments for rationale. // Each phase is a single PTY resize, spaced ~30ms apart by the polling // interval, ensuring the TUI app sees a real size change on each signal. - match sigwinch { + let sigwinch_action = match sigwinch { SigwinchPhase::TempResize { retries } => { if *retries == 0 { log::warn!( @@ -1949,9 +1960,28 @@ impl TerminalServiceProxy { } } SigwinchPhase::Idle => None, - } + }; + (replay_buffer, sigwinch_action) }; + if let Some(buffer) = replay_buffer { + if !buffer.is_empty() { + responses.push(Self::create_terminal_data_response( + terminal_id, + buffer, + true, + )); + } + let next_replay_buffer = session.pending_replay_chunks.pop_front(); + if let SessionState::Active { pending_buffer, .. } = &mut session.state { + *pending_buffer = next_replay_buffer; + } + } + + if has_activity { + session.update_activity(); + } + // Execute SIGWINCH resize outside the mutable borrow scope of session.state. if let Some(action) = sigwinch_action { #[cfg(target_os = "windows")] @@ -1991,7 +2021,11 @@ impl TerminalServiceProxy { // Send real-time data after historical buffer for data in received_data { - responses.push(Self::create_terminal_data_response(terminal_id, data)); + responses.push(Self::create_terminal_data_response( + terminal_id, + data, + false, + )); } } }