mirror of
https://github.com/rustdesk/rustdesk.git
synced 2026-05-02 10:16:28 +02:00
fix(terminal): reconnect, refactor
Signed-off-by: fufesou <linlong1266@gmail.com>
This commit is contained in:
@@ -288,7 +288,8 @@ class TerminalModel with ChangeNotifier {
|
||||
// On reconnect, the server may replay recent output. That replay can include
|
||||
// terminal queries like DSR/DA; xterm answers them through onOutput as
|
||||
// "^[[1;1R^[[2;2R^[[>0;0;0c", which must not be sent back to the peer.
|
||||
_suppressNextTerminalDataOutput = evt['replay_in_next_data'] == true;
|
||||
_suppressNextTerminalDataOutput =
|
||||
message == 'Reconnected to existing terminal with pending output';
|
||||
|
||||
// Fallback: if terminal view is not yet ready but already has valid
|
||||
// dimensions (e.g. layout completed before open response arrived),
|
||||
@@ -347,8 +348,7 @@ class TerminalModel with ChangeNotifier {
|
||||
final data = evt['data'];
|
||||
|
||||
if (data != null) {
|
||||
final suppressTerminalOutput =
|
||||
evt['replay'] == true || _suppressNextTerminalDataOutput;
|
||||
final suppressTerminalOutput = _suppressNextTerminalDataOutput;
|
||||
_suppressNextTerminalDataOutput = false;
|
||||
try {
|
||||
String text = '';
|
||||
|
||||
@@ -1135,7 +1135,6 @@ impl InvokeUiSession for FlutterHandler {
|
||||
("message", json!(&opened.message)),
|
||||
("pid", json!(opened.pid)),
|
||||
("service_id", json!(&opened.service_id)),
|
||||
("replay_in_next_data", json!(opened.replay_in_next_data)),
|
||||
];
|
||||
if !opened.persistent_sessions.is_empty() {
|
||||
event_data.push(("persistent_sessions", json!(opened.persistent_sessions)));
|
||||
@@ -1155,7 +1154,6 @@ impl InvokeUiSession for FlutterHandler {
|
||||
("type", json!("data")),
|
||||
("terminal_id", json!(data.terminal_id)),
|
||||
("data", json!(&encoded)),
|
||||
("replay", json!(data.replay)),
|
||||
];
|
||||
self.push_event_("terminal_response", &event_data, &[], &[]);
|
||||
}
|
||||
|
||||
@@ -35,7 +35,6 @@ const CHANNEL_BUFFER_SIZE: usize = 500; // Channel buffer size. Max per-message
|
||||
const COMPRESS_THRESHOLD: usize = 512; // Compress terminal data larger than this
|
||||
// Default max bytes for reconnection buffer replay.
|
||||
const DEFAULT_RECONNECT_BUFFER_BYTES: usize = 8 * 1024;
|
||||
const MAX_REPLAY_RESPONSE_BYTES: usize = DEFAULT_RECONNECT_BUFFER_BYTES;
|
||||
const MAX_SIGWINCH_PHASE_ATTEMPTS: u8 = 3; // Max attempts per SIGWINCH phase before giving up
|
||||
|
||||
/// Two-phase SIGWINCH trigger for TUI app redraw on reconnection.
|
||||
@@ -723,7 +722,6 @@ pub struct TerminalSession {
|
||||
reader_thread: Option<thread::JoinHandle<()>>,
|
||||
writer_thread: Option<thread::JoinHandle<()>>,
|
||||
output_buffer: OutputBuffer,
|
||||
pending_replay_chunks: VecDeque<Vec<u8>>,
|
||||
title: String,
|
||||
pid: u32,
|
||||
rows: u16,
|
||||
@@ -753,7 +751,6 @@ impl TerminalSession {
|
||||
reader_thread: None,
|
||||
writer_thread: None,
|
||||
output_buffer: OutputBuffer::new(),
|
||||
pending_replay_chunks: VecDeque::new(),
|
||||
title: format!("Terminal {}", terminal_id),
|
||||
pid: 0,
|
||||
rows,
|
||||
@@ -1073,33 +1070,37 @@ impl TerminalServiceProxy {
|
||||
// Reconnect to existing terminal
|
||||
let mut session = session_arc.lock().unwrap();
|
||||
// Directly enter Active state with pending replay for immediate streaming.
|
||||
// The replay starts with output_buffer history and the channel backlog that was
|
||||
// already pending at reconnect time. Keep replay data in capped chunks so the
|
||||
// client can suppress stale xterm query answers without oversized messages.
|
||||
// The replay combines output_buffer history and the channel backlog that was
|
||||
// already pending at reconnect time so the client can suppress stale xterm
|
||||
// query answers without requiring a protobuf schema change.
|
||||
// During disconnect, read_outputs() is not called; channel data can still be lost
|
||||
// if output_rx fills before reconnect drains it.
|
||||
let buffer = session
|
||||
let mut buffer = session
|
||||
.output_buffer
|
||||
.get_recent(DEFAULT_RECONNECT_BUFFER_BYTES);
|
||||
session.pending_replay_chunks.clear();
|
||||
let mut reconnect_backlog = Vec::new();
|
||||
if let Some(output_rx) = &session.output_rx {
|
||||
while let Ok(data) = output_rx.try_recv() {
|
||||
// Cap reconnect-time drain so a chatty PTY cannot keep OpenTerminal
|
||||
// inside this loop indefinitely. Remaining output is drained by read_outputs().
|
||||
for _ in 0..CHANNEL_BUFFER_SIZE {
|
||||
let Ok(data) = output_rx.try_recv() else {
|
||||
break;
|
||||
};
|
||||
reconnect_backlog.push(data);
|
||||
}
|
||||
}
|
||||
let has_reconnect_backlog = !reconnect_backlog.is_empty();
|
||||
for data in reconnect_backlog {
|
||||
session.output_buffer.append(&data);
|
||||
Self::push_replay_chunk(&mut session.pending_replay_chunks, data);
|
||||
}
|
||||
let has_pending = !buffer.is_empty() || !session.pending_replay_chunks.is_empty();
|
||||
let pending_buffer = if !buffer.is_empty() {
|
||||
Some(buffer)
|
||||
} else {
|
||||
session.pending_replay_chunks.pop_front()
|
||||
};
|
||||
if has_reconnect_backlog {
|
||||
buffer = session
|
||||
.output_buffer
|
||||
.get_recent(DEFAULT_RECONNECT_BUFFER_BYTES);
|
||||
}
|
||||
let has_pending = !buffer.is_empty();
|
||||
session.state = SessionState::Active {
|
||||
pending_buffer,
|
||||
pending_buffer: if has_pending { Some(buffer) } else { None },
|
||||
// Always trigger two-phase SIGWINCH on reconnect to force TUI app redraw,
|
||||
// regardless of whether there's pending buffer data. This avoids edge cases
|
||||
// where buffer is empty but a TUI app (top/htop) still needs a full redraw.
|
||||
@@ -1115,7 +1116,6 @@ impl TerminalServiceProxy {
|
||||
} else {
|
||||
"Reconnected to existing terminal".to_string()
|
||||
};
|
||||
opened.replay_in_next_data = has_pending;
|
||||
opened.pid = session.pid;
|
||||
opened.service_id = self.service_id.clone();
|
||||
if service.needs_session_sync {
|
||||
@@ -1804,29 +1804,11 @@ impl TerminalServiceProxy {
|
||||
}
|
||||
}
|
||||
|
||||
fn push_replay_chunk(chunks: &mut VecDeque<Vec<u8>>, data: Vec<u8>) {
|
||||
if data.is_empty() {
|
||||
return;
|
||||
}
|
||||
if let Some(last) = chunks.back_mut() {
|
||||
if last.len() + data.len() <= MAX_REPLAY_RESPONSE_BYTES {
|
||||
last.extend_from_slice(&data);
|
||||
return;
|
||||
}
|
||||
}
|
||||
chunks.push_back(data);
|
||||
}
|
||||
|
||||
/// Helper to create a TerminalResponse with optional compression.
|
||||
fn create_terminal_data_response(
|
||||
terminal_id: i32,
|
||||
data: Vec<u8>,
|
||||
replay: bool,
|
||||
) -> TerminalResponse {
|
||||
fn create_terminal_data_response(terminal_id: i32, data: Vec<u8>) -> TerminalResponse {
|
||||
let mut response = TerminalResponse::new();
|
||||
let mut terminal_data = TerminalData::new();
|
||||
terminal_data.terminal_id = terminal_id;
|
||||
terminal_data.replay = replay;
|
||||
|
||||
if data.len() > COMPRESS_THRESHOLD {
|
||||
let compressed = compress::compress(&data);
|
||||
@@ -1892,20 +1874,11 @@ impl TerminalServiceProxy {
|
||||
// is not called, so channel data produced after disconnect may be lost.
|
||||
let mut has_activity = false;
|
||||
let mut received_data = Vec::new();
|
||||
let has_pending_replay = matches!(
|
||||
&session.state,
|
||||
SessionState::Active {
|
||||
pending_buffer: Some(_),
|
||||
..
|
||||
}
|
||||
);
|
||||
if !has_pending_replay {
|
||||
if let Some(output_rx) = &session.output_rx {
|
||||
// Try to read all available data
|
||||
while let Ok(data) = output_rx.try_recv() {
|
||||
has_activity = true;
|
||||
received_data.push(data);
|
||||
}
|
||||
if let Some(output_rx) = &session.output_rx {
|
||||
// Try to read all available data
|
||||
while let Ok(data) = output_rx.try_recv() {
|
||||
has_activity = true;
|
||||
received_data.push(data);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1966,15 +1939,7 @@ impl TerminalServiceProxy {
|
||||
|
||||
if let Some(buffer) = replay_buffer {
|
||||
if !buffer.is_empty() {
|
||||
responses.push(Self::create_terminal_data_response(
|
||||
terminal_id,
|
||||
buffer,
|
||||
true,
|
||||
));
|
||||
}
|
||||
let next_replay_buffer = session.pending_replay_chunks.pop_front();
|
||||
if let SessionState::Active { pending_buffer, .. } = &mut session.state {
|
||||
*pending_buffer = next_replay_buffer;
|
||||
responses.push(Self::create_terminal_data_response(terminal_id, buffer));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2021,11 +1986,7 @@ impl TerminalServiceProxy {
|
||||
|
||||
// Send real-time data after historical buffer
|
||||
for data in received_data {
|
||||
responses.push(Self::create_terminal_data_response(
|
||||
terminal_id,
|
||||
data,
|
||||
false,
|
||||
));
|
||||
responses.push(Self::create_terminal_data_response(terminal_id, data));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user