mirror of
https://github.com/rustdesk/rustdesk.git
synced 2026-05-02 10:16:28 +02:00
fix: cap terminal reconnect replay output
- split reconnect replay backlog into capped chunks - mark terminal data replay chunks for client-side suppression - avoid using open-message text to suppress xterm replies - reuse default terminal padding value - remove misleading Enter-key normalization PR link Signed-off-by: fufesou <linlong1266@gmail.com>
This commit is contained in:
@@ -176,7 +176,10 @@ class _TerminalPageState extends State<TerminalPage>
|
|||||||
return _defaultTerminalPadding;
|
return _defaultTerminalPadding;
|
||||||
}
|
}
|
||||||
final topBottom = extraSpace / 2.0;
|
final topBottom = extraSpace / 2.0;
|
||||||
return EdgeInsets.symmetric(horizontal: 5.0, vertical: topBottom);
|
return EdgeInsets.symmetric(
|
||||||
|
horizontal: _defaultTerminalPadding.horizontal / 2,
|
||||||
|
vertical: topBottom,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@override
|
@override
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ class TerminalModel with ChangeNotifier {
|
|||||||
Future<void> _handleInput(String data) async {
|
Future<void> _handleInput(String data) async {
|
||||||
// Soft keyboards (notably iOS) emit '\n' when Enter is pressed, while a
|
// Soft keyboards (notably iOS) emit '\n' when Enter is pressed, while a
|
||||||
// real keyboard's Enter sends '\r'. Some Android keyboards also emit '\n'.
|
// real keyboard's Enter sends '\r'. Some Android keyboards also emit '\n'.
|
||||||
// - Peer Windows: '\r' works, '\n' is just a newline (https://github.com/rustdesk/rustdesk/pull/14736).
|
// - Peer Windows: '\r' works, '\n' is just a newline.
|
||||||
// - Peer Linux: canonical-mode shells accept both, but raw-mode apps
|
// - Peer Linux: canonical-mode shells accept both, but raw-mode apps
|
||||||
// (readline, prompt_toolkit, vim, TUI frameworks) expect '\r'.
|
// (readline, prompt_toolkit, vim, TUI frameworks) expect '\r'.
|
||||||
// - Peer macOS: same as Linux, raw-mode apps expect '\r'
|
// - Peer macOS: same as Linux, raw-mode apps expect '\r'
|
||||||
@@ -347,7 +347,8 @@ class TerminalModel with ChangeNotifier {
|
|||||||
final data = evt['data'];
|
final data = evt['data'];
|
||||||
|
|
||||||
if (data != null) {
|
if (data != null) {
|
||||||
final suppressTerminalOutput = _suppressNextTerminalDataOutput;
|
final suppressTerminalOutput =
|
||||||
|
evt['replay'] == true || _suppressNextTerminalDataOutput;
|
||||||
_suppressNextTerminalDataOutput = false;
|
_suppressNextTerminalDataOutput = false;
|
||||||
try {
|
try {
|
||||||
String text = '';
|
String text = '';
|
||||||
|
|||||||
@@ -1155,6 +1155,7 @@ impl InvokeUiSession for FlutterHandler {
|
|||||||
("type", json!("data")),
|
("type", json!("data")),
|
||||||
("terminal_id", json!(data.terminal_id)),
|
("terminal_id", json!(data.terminal_id)),
|
||||||
("data", json!(&encoded)),
|
("data", json!(&encoded)),
|
||||||
|
("replay", json!(data.replay)),
|
||||||
];
|
];
|
||||||
self.push_event_("terminal_response", &event_data, &[], &[]);
|
self.push_event_("terminal_response", &event_data, &[], &[]);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -35,6 +35,7 @@ const CHANNEL_BUFFER_SIZE: usize = 500; // Channel buffer size. Max per-message
|
|||||||
const COMPRESS_THRESHOLD: usize = 512; // Compress terminal data larger than this
|
const COMPRESS_THRESHOLD: usize = 512; // Compress terminal data larger than this
|
||||||
// Default max bytes for reconnection buffer replay.
|
// Default max bytes for reconnection buffer replay.
|
||||||
const DEFAULT_RECONNECT_BUFFER_BYTES: usize = 8 * 1024;
|
const DEFAULT_RECONNECT_BUFFER_BYTES: usize = 8 * 1024;
|
||||||
|
const MAX_REPLAY_RESPONSE_BYTES: usize = DEFAULT_RECONNECT_BUFFER_BYTES;
|
||||||
const MAX_SIGWINCH_PHASE_ATTEMPTS: u8 = 3; // Max attempts per SIGWINCH phase before giving up
|
const MAX_SIGWINCH_PHASE_ATTEMPTS: u8 = 3; // Max attempts per SIGWINCH phase before giving up
|
||||||
|
|
||||||
/// Two-phase SIGWINCH trigger for TUI app redraw on reconnection.
|
/// Two-phase SIGWINCH trigger for TUI app redraw on reconnection.
|
||||||
@@ -722,6 +723,7 @@ pub struct TerminalSession {
|
|||||||
reader_thread: Option<thread::JoinHandle<()>>,
|
reader_thread: Option<thread::JoinHandle<()>>,
|
||||||
writer_thread: Option<thread::JoinHandle<()>>,
|
writer_thread: Option<thread::JoinHandle<()>>,
|
||||||
output_buffer: OutputBuffer,
|
output_buffer: OutputBuffer,
|
||||||
|
pending_replay_chunks: VecDeque<Vec<u8>>,
|
||||||
title: String,
|
title: String,
|
||||||
pid: u32,
|
pid: u32,
|
||||||
rows: u16,
|
rows: u16,
|
||||||
@@ -751,6 +753,7 @@ impl TerminalSession {
|
|||||||
reader_thread: None,
|
reader_thread: None,
|
||||||
writer_thread: None,
|
writer_thread: None,
|
||||||
output_buffer: OutputBuffer::new(),
|
output_buffer: OutputBuffer::new(),
|
||||||
|
pending_replay_chunks: VecDeque::new(),
|
||||||
title: format!("Terminal {}", terminal_id),
|
title: format!("Terminal {}", terminal_id),
|
||||||
pid: 0,
|
pid: 0,
|
||||||
rows,
|
rows,
|
||||||
@@ -1070,15 +1073,15 @@ impl TerminalServiceProxy {
|
|||||||
// Reconnect to existing terminal
|
// Reconnect to existing terminal
|
||||||
let mut session = session_arc.lock().unwrap();
|
let mut session = session_arc.lock().unwrap();
|
||||||
// Directly enter Active state with pending replay for immediate streaming.
|
// Directly enter Active state with pending replay for immediate streaming.
|
||||||
// The replay starts with output_buffer history, then drains any current channel
|
// The replay starts with output_buffer history and the channel backlog that was
|
||||||
// backlog into the same pending response. Keeping reconnect backlog in the first
|
// already pending at reconnect time. Keep replay data in capped chunks so the
|
||||||
// response lets the client suppress xterm query answers for the whole replay batch.
|
// client can suppress stale xterm query answers without oversized messages.
|
||||||
// During disconnect, read_outputs() is not called; channel data can still be lost
|
// During disconnect, read_outputs() is not called; channel data can still be lost
|
||||||
// if output_rx fills before reconnect drains it.
|
// if output_rx fills before reconnect drains it.
|
||||||
let buffer = session
|
let buffer = session
|
||||||
.output_buffer
|
.output_buffer
|
||||||
.get_recent(DEFAULT_RECONNECT_BUFFER_BYTES);
|
.get_recent(DEFAULT_RECONNECT_BUFFER_BYTES);
|
||||||
let mut pending_buffer = buffer;
|
session.pending_replay_chunks.clear();
|
||||||
let mut reconnect_backlog = Vec::new();
|
let mut reconnect_backlog = Vec::new();
|
||||||
if let Some(output_rx) = &session.output_rx {
|
if let Some(output_rx) = &session.output_rx {
|
||||||
while let Ok(data) = output_rx.try_recv() {
|
while let Ok(data) = output_rx.try_recv() {
|
||||||
@@ -1087,15 +1090,16 @@ impl TerminalServiceProxy {
|
|||||||
}
|
}
|
||||||
for data in reconnect_backlog {
|
for data in reconnect_backlog {
|
||||||
session.output_buffer.append(&data);
|
session.output_buffer.append(&data);
|
||||||
pending_buffer.extend_from_slice(&data);
|
Self::push_replay_chunk(&mut session.pending_replay_chunks, data);
|
||||||
}
|
}
|
||||||
let has_pending = !pending_buffer.is_empty();
|
let has_pending = !buffer.is_empty() || !session.pending_replay_chunks.is_empty();
|
||||||
|
let pending_buffer = if !buffer.is_empty() {
|
||||||
|
Some(buffer)
|
||||||
|
} else {
|
||||||
|
session.pending_replay_chunks.pop_front()
|
||||||
|
};
|
||||||
session.state = SessionState::Active {
|
session.state = SessionState::Active {
|
||||||
pending_buffer: if has_pending {
|
pending_buffer,
|
||||||
Some(pending_buffer)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
},
|
|
||||||
// Always trigger two-phase SIGWINCH on reconnect to force TUI app redraw,
|
// Always trigger two-phase SIGWINCH on reconnect to force TUI app redraw,
|
||||||
// regardless of whether there's pending buffer data. This avoids edge cases
|
// regardless of whether there's pending buffer data. This avoids edge cases
|
||||||
// where buffer is empty but a TUI app (top/htop) still needs a full redraw.
|
// where buffer is empty but a TUI app (top/htop) still needs a full redraw.
|
||||||
@@ -1800,11 +1804,29 @@ impl TerminalServiceProxy {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn push_replay_chunk(chunks: &mut VecDeque<Vec<u8>>, data: Vec<u8>) {
|
||||||
|
if data.is_empty() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if let Some(last) = chunks.back_mut() {
|
||||||
|
if last.len() + data.len() <= MAX_REPLAY_RESPONSE_BYTES {
|
||||||
|
last.extend_from_slice(&data);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
chunks.push_back(data);
|
||||||
|
}
|
||||||
|
|
||||||
/// Helper to create a TerminalResponse with optional compression.
|
/// Helper to create a TerminalResponse with optional compression.
|
||||||
fn create_terminal_data_response(terminal_id: i32, data: Vec<u8>) -> TerminalResponse {
|
fn create_terminal_data_response(
|
||||||
|
terminal_id: i32,
|
||||||
|
data: Vec<u8>,
|
||||||
|
replay: bool,
|
||||||
|
) -> TerminalResponse {
|
||||||
let mut response = TerminalResponse::new();
|
let mut response = TerminalResponse::new();
|
||||||
let mut terminal_data = TerminalData::new();
|
let mut terminal_data = TerminalData::new();
|
||||||
terminal_data.terminal_id = terminal_id;
|
terminal_data.terminal_id = terminal_id;
|
||||||
|
terminal_data.replay = replay;
|
||||||
|
|
||||||
if data.len() > COMPRESS_THRESHOLD {
|
if data.len() > COMPRESS_THRESHOLD {
|
||||||
let compressed = compress::compress(&data);
|
let compressed = compress::compress(&data);
|
||||||
@@ -1870,16 +1892,21 @@ impl TerminalServiceProxy {
|
|||||||
// is not called, so channel data produced after disconnect may be lost.
|
// is not called, so channel data produced after disconnect may be lost.
|
||||||
let mut has_activity = false;
|
let mut has_activity = false;
|
||||||
let mut received_data = Vec::new();
|
let mut received_data = Vec::new();
|
||||||
if let Some(output_rx) = &session.output_rx {
|
let has_pending_replay = matches!(
|
||||||
// Try to read all available data
|
&session.state,
|
||||||
while let Ok(data) = output_rx.try_recv() {
|
SessionState::Active {
|
||||||
has_activity = true;
|
pending_buffer: Some(_),
|
||||||
received_data.push(data);
|
..
|
||||||
|
}
|
||||||
|
);
|
||||||
|
if !has_pending_replay {
|
||||||
|
if let Some(output_rx) = &session.output_rx {
|
||||||
|
// Try to read all available data
|
||||||
|
while let Ok(data) = output_rx.try_recv() {
|
||||||
|
has_activity = true;
|
||||||
|
received_data.push(data);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if has_activity {
|
|
||||||
session.update_activity();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update buffer (always buffer for reconnection support)
|
// Update buffer (always buffer for reconnection support)
|
||||||
@@ -1891,7 +1918,7 @@ impl TerminalServiceProxy {
|
|||||||
// Data is already buffered above and will be sent on next reconnection.
|
// Data is already buffered above and will be sent on next reconnection.
|
||||||
// Use a scoped block to limit the mutable borrow of session.state,
|
// Use a scoped block to limit the mutable borrow of session.state,
|
||||||
// so we can immutably borrow other session fields afterwards.
|
// so we can immutably borrow other session fields afterwards.
|
||||||
let sigwinch_action = {
|
let (replay_buffer, sigwinch_action) = {
|
||||||
let (pending_buffer, sigwinch) = match &mut session.state {
|
let (pending_buffer, sigwinch) = match &mut session.state {
|
||||||
SessionState::Active {
|
SessionState::Active {
|
||||||
pending_buffer,
|
pending_buffer,
|
||||||
@@ -1900,28 +1927,12 @@ impl TerminalServiceProxy {
|
|||||||
_ => continue,
|
_ => continue,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Send pending replay first (set on reconnection in handle_open). If new
|
let replay_buffer = pending_buffer.take();
|
||||||
// channel data was drained in this same read_outputs() cycle, keep it in the
|
|
||||||
// replay response so the client suppresses one complete reconnect batch.
|
|
||||||
if let Some(buffer) = pending_buffer.take() {
|
|
||||||
let mut buffer = buffer;
|
|
||||||
for data in received_data.drain(..) {
|
|
||||||
// Reconnect replay can include terminal queries like DSR/DA.
|
|
||||||
// Keep this first backlog batch in one response so the client can
|
|
||||||
// suppress xterm-generated answers and avoid printing
|
|
||||||
// "^[[1;1R^[[2;2R^[[>0;0;0c" back to the remote shell.
|
|
||||||
buffer.extend_from_slice(&data);
|
|
||||||
}
|
|
||||||
if !buffer.is_empty() {
|
|
||||||
responses
|
|
||||||
.push(Self::create_terminal_data_response(terminal_id, buffer));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Two-phase SIGWINCH: see SigwinchPhase doc comments for rationale.
|
// Two-phase SIGWINCH: see SigwinchPhase doc comments for rationale.
|
||||||
// Each phase is a single PTY resize, spaced ~30ms apart by the polling
|
// Each phase is a single PTY resize, spaced ~30ms apart by the polling
|
||||||
// interval, ensuring the TUI app sees a real size change on each signal.
|
// interval, ensuring the TUI app sees a real size change on each signal.
|
||||||
match sigwinch {
|
let sigwinch_action = match sigwinch {
|
||||||
SigwinchPhase::TempResize { retries } => {
|
SigwinchPhase::TempResize { retries } => {
|
||||||
if *retries == 0 {
|
if *retries == 0 {
|
||||||
log::warn!(
|
log::warn!(
|
||||||
@@ -1949,9 +1960,28 @@ impl TerminalServiceProxy {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
SigwinchPhase::Idle => None,
|
SigwinchPhase::Idle => None,
|
||||||
}
|
};
|
||||||
|
(replay_buffer, sigwinch_action)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if let Some(buffer) = replay_buffer {
|
||||||
|
if !buffer.is_empty() {
|
||||||
|
responses.push(Self::create_terminal_data_response(
|
||||||
|
terminal_id,
|
||||||
|
buffer,
|
||||||
|
true,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
let next_replay_buffer = session.pending_replay_chunks.pop_front();
|
||||||
|
if let SessionState::Active { pending_buffer, .. } = &mut session.state {
|
||||||
|
*pending_buffer = next_replay_buffer;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if has_activity {
|
||||||
|
session.update_activity();
|
||||||
|
}
|
||||||
|
|
||||||
// Execute SIGWINCH resize outside the mutable borrow scope of session.state.
|
// Execute SIGWINCH resize outside the mutable borrow scope of session.state.
|
||||||
if let Some(action) = sigwinch_action {
|
if let Some(action) = sigwinch_action {
|
||||||
#[cfg(target_os = "windows")]
|
#[cfg(target_os = "windows")]
|
||||||
@@ -1991,7 +2021,11 @@ impl TerminalServiceProxy {
|
|||||||
|
|
||||||
// Send real-time data after historical buffer
|
// Send real-time data after historical buffer
|
||||||
for data in received_data {
|
for data in received_data {
|
||||||
responses.push(Self::create_terminal_data_response(terminal_id, data));
|
responses.push(Self::create_terminal_data_response(
|
||||||
|
terminal_id,
|
||||||
|
data,
|
||||||
|
false,
|
||||||
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user