From e28a698e9b91fb487edfab3151e4e07205e2302d Mon Sep 17 00:00:00 2001 From: ospab Date: Sat, 16 May 2026 20:55:11 +0300 Subject: [PATCH] fix: resolve connection instability under load and refine logging --- Cargo.lock | 10 +- Cargo.toml | 1 + ostp-client/src/bridge.rs | 194 ++++++++++++++++++++------------------ ostp-client/src/runner.rs | 7 +- ostp-server/src/lib.rs | 4 +- 5 files changed, 109 insertions(+), 107 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5580664..7bbe3a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -745,7 +745,7 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" [[package]] name = "ostp" -version = "0.1.47" +version = "0.1.48" dependencies = [ "anyhow", "base64", @@ -762,7 +762,7 @@ dependencies = [ [[package]] name = "ostp-client" -version = "0.1.47" +version = "0.1.48" dependencies = [ "anyhow", "bytes", @@ -779,7 +779,7 @@ dependencies = [ [[package]] name = "ostp-core" -version = "0.1.47" +version = "0.1.48" dependencies = [ "anyhow", "async-trait", @@ -812,7 +812,7 @@ dependencies = [ [[package]] name = "ostp-server" -version = "0.1.47" +version = "0.1.48" dependencies = [ "anyhow", "bytes", @@ -826,7 +826,7 @@ dependencies = [ [[package]] name = "ostp-tun-helper" -version = "0.1.47" +version = "0.1.48" dependencies = [ "anyhow", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 54d953b..5a5d786 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "ostp-jni", "ostp", "ostp-tun-helper" ] +exclude = ["ostp-gui/src-tauri"] resolver = "2" [workspace.package] diff --git a/ostp-client/src/bridge.rs b/ostp-client/src/bridge.rs index f9f0da7..86b44c6 100644 --- a/ostp-client/src/bridge.rs +++ b/ostp-client/src/bridge.rs @@ -121,6 +121,7 @@ impl Bridge { loop { tokio::select! { + biased; _ = shutdown.changed() => { if *shutdown.borrow() { self.running = false; @@ -129,6 +130,89 @@ impl Bridge { break; } } + udp_msg = async { + match udp_rx_opt.as_mut() { + Some(rx) => rx.recv().await, + None => std::future::pending().await, + } + }, if self.running => { + match udp_msg { + Some((session_index, inbound)) => { + self.metrics.bytes_recv.fetch_add(inbound.len() as u64, Ordering::Relaxed); + self.last_valid_recv = Instant::now(); + if let Some(sessions) = sessions_opt.as_mut() { + if session_index < sessions.len() { + let session = &mut sessions[session_index]; + let initial_action = match session.machine.on_event(OstpEvent::Inbound(inbound)) { + Ok(a) => a, + Err(e) => { + let _ = tx.send(UiEvent::Log(format!("Protocol decrypt error: {e}"))).await; + continue; + } + }; + + let mut actions_queue = std::collections::VecDeque::new(); + actions_queue.push_back(initial_action); + + while let Some(current_action) = actions_queue.pop_front() { + match current_action { + ProtocolAction::Multiple(nested) => { + for a in nested { + actions_queue.push_back(a); + } + } + ProtocolAction::DeliverApp(stream_id, dec_payload) => { + match RelayMessage::decode(&dec_payload) { + Ok(relay_msg) => { + match relay_msg { + RelayMessage::ConnectOk => { + let _ = tx.send(UiEvent::Log(format!("Relay CONNECT OK stream_id={stream_id}"))).await; + let _ = proxy_tx.send((stream_id, ProxyToClientMsg::ConnectOk)).await; + } + RelayMessage::Data(data) => { + let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Data(Bytes::from(data)))).await; + } + RelayMessage::Close => { + let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Close)).await; + } + RelayMessage::Error(msg) => { + let _ = tx.send(UiEvent::Log(format!("Relay error for stream {stream_id}: {msg}"))).await; + let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Error(msg))).await; + } + RelayMessage::Pong(ts) => { + let now = SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64; + self.last_rtt_ms = now.saturating_sub(ts) as f64; + } + RelayMessage::KeepAlive | RelayMessage::Ping(_) | RelayMessage::Connect(_) => {} + } + } + Err(err) => { + let _ = tx.send(UiEvent::Log(format!("Relay decode error for stream {stream_id}: {err}"))).await; + let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Error("relay decode failed".to_string()))).await; + } + } + } + ProtocolAction::SendDatagram(frame) => { + let _ = send_datagram(&session.socket, &frame, self.turn_enabled).await; + self.metrics.bytes_sent.fetch_add(frame.len() as u64, Ordering::Relaxed); + } + _ => {} + } + } + } + } + } + None => { + let _ = tx.send(UiEvent::Log("UDP reader channel closed".to_string())).await; + self.running = false; + crate::sysproxy::disable_windows_proxy(); + sessions_opt = None; + udp_rx_opt = None; + stream_map.clear(); + let _ = tx.send(UiEvent::TunnelStopped).await; + } + } + } cmd = bridge_rx.recv() => { match cmd { Some(BridgeCommand::ToggleTunnel) => { @@ -143,12 +227,12 @@ impl Bridge { let stop_msg = if self.mode == "tun" { "TUN Tunnel stopped" } else { "Bridge stopped" }; tx.send(UiEvent::Log(stop_msg.to_string())).await.ok(); } else { - tx.send(UiEvent::Log("Handshaking started".to_string())).await.ok(); + tx.send(UiEvent::Log("Connecting to remote server...".to_string())).await.ok(); tx.send(UiEvent::Metrics { status: ConnectionStatus::Handshaking, rtt_ms: 0.0, throughput_bps: 0 }).await.ok(); self.metrics.connection_state.store(1, Ordering::Relaxed); let session_count = if self.mux_enabled { self.mux_sessions.max(1) } else { 1 }; - let (udp_tx, udp_rx) = mpsc::channel(10000); + let (udp_tx, udp_rx) = mpsc::channel(100000); // Increased for high-speed traffic stability let mut sessions = Vec::with_capacity(session_count); let mut rtt_sum = 0.0; @@ -177,10 +261,14 @@ impl Bridge { Bytes::copy_from_slice(&buf[..n]) }; if udp_tx_clone.send((idx, inbound)).await.is_err() { + eprintln!("[bridge] UDP receiver task exiting: bridge channel full or closed"); break; } } - Err(_) => break, + Err(e) => { + eprintln!("[bridge] UDP socket recv error: {e}"); + break; + } } } }); @@ -197,9 +285,9 @@ impl Bridge { if let Some(err) = handshake_error { _proxy_guard = None; - tx.send(UiEvent::Log(format!("Handshake failed: {err}"))).await.ok(); + tx.send(UiEvent::Log(format!("Connection failed: {err}"))).await.ok(); tx.send(UiEvent::TunnelStopped).await.ok(); - self.metrics.connection_state.store(0, Ordering::Relaxed); + self.metrics.connection_state.store(0, Ordering::Relaxed); continue; } @@ -210,14 +298,16 @@ impl Bridge { self.last_sample_at = Instant::now(); self.last_valid_recv = Instant::now(); - _proxy_guard = Some(crate::sysproxy::WindowsProxyGuard::enable(&self.proxy_addr)); + let sys_proxy_addr = self.proxy_addr.replace("0.0.0.0:", "127.0.0.1:"); + _proxy_guard = Some(crate::sysproxy::WindowsProxyGuard::enable(&sys_proxy_addr)); tx.send(UiEvent::Metrics { status: ConnectionStatus::Established, rtt_ms: self.last_rtt_ms, throughput_bps: 0, }).await.ok(); - self.metrics.connection_state.store(2, Ordering::Relaxed); let start_msg = if self.mode == "tun" { "TUN Tunnel established" } else { "Bridge connection established" }; + self.metrics.connection_state.store(2, Ordering::Relaxed); + let start_msg = if self.mode == "tun" { "TUN Tunnel established" } else { "Connection established" }; tx.send(UiEvent::Log(start_msg.to_string())).await.ok(); } } @@ -233,8 +323,7 @@ impl Bridge { tx.send(UiEvent::Log("Runtime config reloaded".to_string())).await.ok(); if self.running { self.running = false; - self.metrics.connection_state.store(0, Ordering::Relaxed); - self.metrics.connection_state.store(0, Ordering::Relaxed); + self.metrics.connection_state.store(0, Ordering::Relaxed); _proxy_guard = None; sessions_opt = None; stream_map.clear(); @@ -262,8 +351,8 @@ impl Bridge { _ = keepalive_tick.tick() => { if self.running { // 1. Connection Liveness Check - if self.last_valid_recv.elapsed().as_secs() > 15 { - let _ = tx.send(UiEvent::Log("Connection timeout (no UDP packets received). Dropping connection.".into())).await; + if self.last_valid_recv.elapsed().as_secs() > 30 { + let _ = tx.send(UiEvent::Log("Connection lost (timeout). Reconnecting...".into())).await; self.running = false; _proxy_guard = None; sessions_opt = None; @@ -425,90 +514,7 @@ impl Bridge { } } - udp_msg = async { - match udp_rx_opt.as_mut() { - Some(rx) => rx.recv().await, - None => std::future::pending().await, - } - }, if self.running => { - match udp_msg { - Some((session_index, inbound)) => { - self.metrics.bytes_recv.fetch_add(inbound.len() as u64, Ordering::Relaxed); - self.last_valid_recv = Instant::now(); - if let Some(sessions) = sessions_opt.as_mut() { - if session_index >= sessions.len() { - continue; - } - let session = &mut sessions[session_index]; - let initial_action = match session.machine.on_event(OstpEvent::Inbound(inbound)) { - Ok(a) => a, - Err(e) => { - let _ = tx.send(UiEvent::Log(format!("Protocol decrypt error: {e}"))).await; - continue; - } - }; - let mut actions_queue = std::collections::VecDeque::new(); - actions_queue.push_back(initial_action); - - while let Some(current_action) = actions_queue.pop_front() { - match current_action { - ProtocolAction::Multiple(nested) => { - for a in nested { - actions_queue.push_back(a); - } - } - ProtocolAction::DeliverApp(stream_id, dec_payload) => { - match RelayMessage::decode(&dec_payload) { - Ok(relay_msg) => { - match relay_msg { - RelayMessage::ConnectOk => { - let _ = tx.send(UiEvent::Log(format!("Relay CONNECT OK stream_id={stream_id}"))).await; - let _ = proxy_tx.send((stream_id, ProxyToClientMsg::ConnectOk)).await; - } - RelayMessage::Data(data) => { - let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Data(Bytes::from(data)))).await; - } - RelayMessage::Close => { - let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Close)).await; - } - RelayMessage::Error(msg) => { - let _ = tx.send(UiEvent::Log(format!("Relay error for stream {stream_id}: {msg}"))).await; - let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Error(msg))).await; - } - RelayMessage::Pong(ts) => { - let now = SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64; - self.last_rtt_ms = now.saturating_sub(ts) as f64; - } - RelayMessage::KeepAlive | RelayMessage::Ping(_) | RelayMessage::Connect(_) => {} - } - } - Err(err) => { - let _ = tx.send(UiEvent::Log(format!("Relay decode error for stream {stream_id}: {err}"))).await; - let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Error("relay decode failed".to_string()))).await; - } - } - } - ProtocolAction::SendDatagram(frame) => { - let _ = send_datagram(&session.socket, &frame, self.turn_enabled).await; - self.metrics.bytes_sent.fetch_add(frame.len() as u64, Ordering::Relaxed); - } - _ => {} - } - } - } - } - None => { - let _ = tx.send(UiEvent::Log("UDP reader channel closed".to_string())).await; - self.running = false; - crate::sysproxy::disable_windows_proxy(); - sessions_opt = None; - udp_rx_opt = None; - stream_map.clear(); - let _ = tx.send(UiEvent::TunnelStopped).await; - } - } - } } } diff --git a/ostp-client/src/runner.rs b/ostp-client/src/runner.rs index 004549d..b2247d7 100644 --- a/ostp-client/src/runner.rs +++ b/ostp-client/src/runner.rs @@ -163,7 +163,6 @@ pub async fn run_client_core( let (shutdown_tx, shutdown_rx) = watch::channel(false); let proxy_shutdown_rx = shutdown_tx.subscribe(); - let is_tun = config.mode == "tun"; // Auto-connect on startup let _ = cmd_tx.send(BridgeCommand::ToggleTunnel).await; @@ -196,11 +195,7 @@ pub async fn run_client_core( } } crate::app::UiEvent::TunnelStopped => { - if is_tun { - println!("[client] tunnel=tun stopped, reconnecting in 5s"); - } else { - println!("[client] tunnel=proxy stopped, reconnecting in 5s"); - } + println!("[client] Connection lost or failed. Reconnecting in 5s..."); let cmd_tx_inner = cmd_tx_clone.clone(); tokio::spawn(async move { tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; diff --git a/ostp-server/src/lib.rs b/ostp-server/src/lib.rs index 1eb56f4..4cbb7e7 100644 --- a/ostp-server/src/lib.rs +++ b/ostp-server/src/lib.rs @@ -255,7 +255,7 @@ async fn run_server_loop( peer_last_seen.insert(peer_ip, now); if !peer_available.get(&peer_ip).copied().unwrap_or(false) { peer_available.insert(peer_ip, true); - let _ = ui_event_tx.send(UiEvent::Log(format!("Peer {peer_ip} available"))); + let _ = ui_event_tx.send(UiEvent::Log(format!("Client {peer_ip} connected"))); } if app_payloads.is_empty() && now.duration_since(last_empty_app_log) > Duration::from_secs(5) { @@ -315,7 +315,7 @@ async fn run_server_loop( let is_available = peer_available.get(peer_ip).copied().unwrap_or(false); if is_available && now.duration_since(*last_seen) > peer_timeout { peer_available.insert(*peer_ip, false); - let _ = ui_event_tx.send(UiEvent::Log(format!("Peer {peer_ip} unavailable"))); + let _ = ui_event_tx.send(UiEvent::Log(format!("Client {peer_ip} disconnected (timeout)"))); } } let (frames, dropped_sessions) = dispatcher.on_tick();