diff --git a/Cargo.lock b/Cargo.lock index 7bbe3a6..0ab51fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -745,7 +745,7 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" [[package]] name = "ostp" -version = "0.1.48" +version = "0.1.49" dependencies = [ "anyhow", "base64", @@ -762,7 +762,7 @@ dependencies = [ [[package]] name = "ostp-client" -version = "0.1.48" +version = "0.1.49" dependencies = [ "anyhow", "bytes", @@ -779,7 +779,7 @@ dependencies = [ [[package]] name = "ostp-core" -version = "0.1.48" +version = "0.1.49" dependencies = [ "anyhow", "async-trait", @@ -812,7 +812,7 @@ dependencies = [ [[package]] name = "ostp-server" -version = "0.1.48" +version = "0.1.49" dependencies = [ "anyhow", "bytes", @@ -826,7 +826,7 @@ dependencies = [ [[package]] name = "ostp-tun-helper" -version = "0.1.48" +version = "0.1.49" dependencies = [ "anyhow", "chrono", diff --git a/docs/en/client.md b/docs/en/client.md index 33ac904..4e1690b 100644 --- a/docs/en/client.md +++ b/docs/en/client.md @@ -59,7 +59,7 @@ To minimize latency and overhead for trusted resources, the OSTP client incorpor --- -## Multiplexing & Known Session Constraints +## Multiplexing The wire protocol provides support for bundling multiple physical UDP session handles into a single logical transport pipeline via the `"mux"` block: @@ -70,12 +70,5 @@ The wire protocol provides support for bundling multiple physical UDP session ha } ``` -### Current Implementation Limits: -> [!WARNING] -> **Currently, utilizing more than 1 multiplexed session (`sessions > 1`) is NOT supported and will result in complete traffic loss.** -> -> **Observed Bug Behavior:** -> If multiple sessions are initiated (e.g., `sessions: 3`), the client executes successful handshakes for each endpoint (yielding repeated `Connected UDP directly to` lines in diagnostic logs), and the server initializes the matching tracking slots. However, during the payload demultiplexing phase, the server pipeline fails to bridge payloads back to active streams, dropping all encapsulated packets. -> -> **Resolution Requirement:** -> You MUST ensure that the `"mux"` block remains disabled (`"enabled": false`) OR is manually constrained to exactly **1** session (`"sessions": 1`). +### Current Status +Multi-session multiplexing (`sessions > 1`) is supported. Use the `"mux"` block to scale concurrent transport sessions as needed for throughput or resiliency. diff --git a/ostp-client/src/bridge.rs b/ostp-client/src/bridge.rs index 86b44c6..dfe05bb 100644 --- a/ostp-client/src/bridge.rs +++ b/ostp-client/src/bridge.rs @@ -102,7 +102,7 @@ impl Bridge { mut bridge_rx: mpsc::Receiver, mut shutdown: watch::Receiver, mut proxy_rx: mpsc::Receiver, - proxy_tx: mpsc::Sender<(u16, ProxyToClientMsg)>, + proxy_tx: mpsc::UnboundedSender<(u16, ProxyToClientMsg)>, ) -> Result<()> { let mut metrics_tick = interval(Duration::from_millis(500)); let mut keepalive_tick = tokio::time::interval(Duration::from_secs(5)); @@ -167,17 +167,17 @@ impl Bridge { match relay_msg { RelayMessage::ConnectOk => { let _ = tx.send(UiEvent::Log(format!("Relay CONNECT OK stream_id={stream_id}"))).await; - let _ = proxy_tx.send((stream_id, ProxyToClientMsg::ConnectOk)).await; + let _ = proxy_tx.send((stream_id, ProxyToClientMsg::ConnectOk)); } RelayMessage::Data(data) => { - let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Data(Bytes::from(data)))).await; + let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Data(Bytes::from(data)))); } RelayMessage::Close => { - let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Close)).await; + let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Close)); } RelayMessage::Error(msg) => { let _ = tx.send(UiEvent::Log(format!("Relay error for stream {stream_id}: {msg}"))).await; - let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Error(msg))).await; + let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Error(msg))); } RelayMessage::Pong(ts) => { let now = SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64; @@ -188,7 +188,7 @@ impl Bridge { } Err(err) => { let _ = tx.send(UiEvent::Log(format!("Relay decode error for stream {stream_id}: {err}"))).await; - let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Error("relay decode failed".to_string()))).await; + let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Error("relay decode failed".to_string()))); } } } @@ -209,6 +209,7 @@ impl Bridge { sessions_opt = None; udp_rx_opt = None; stream_map.clear(); + self.reset_proxy_streams(&tx, &proxy_tx, "udp reader closed"); let _ = tx.send(UiEvent::TunnelStopped).await; } } @@ -223,6 +224,7 @@ impl Bridge { sessions_opt = None; udp_rx_opt = None; stream_map.clear(); + self.reset_proxy_streams(&tx, &proxy_tx, "manual stop"); tx.send(UiEvent::TunnelStopped).await.ok(); let stop_msg = if self.mode == "tun" { "TUN Tunnel stopped" } else { "Bridge stopped" }; tx.send(UiEvent::Log(stop_msg.to_string())).await.ok(); @@ -327,6 +329,7 @@ impl Bridge { _proxy_guard = None; sessions_opt = None; stream_map.clear(); + self.reset_proxy_streams(&tx, &proxy_tx, "config reload"); // User logic handles UI restart let _ = tx.send(UiEvent::TunnelStopped).await; } @@ -357,6 +360,7 @@ impl Bridge { _proxy_guard = None; sessions_opt = None; stream_map.clear(); + self.reset_proxy_streams(&tx, &proxy_tx, "keepalive timeout"); let _ = tx.send(UiEvent::TunnelStopped).await; self.metrics.connection_state.store(0, Ordering::Relaxed); continue; @@ -421,6 +425,7 @@ impl Bridge { sessions_opt = None; udp_rx_opt = None; stream_map.clear(); + self.reset_proxy_streams(&tx, &proxy_tx, "protocol fatal error"); let _ = tx.send(UiEvent::TunnelStopped).await; self.metrics.connection_state.store(0, Ordering::Relaxed); } @@ -434,7 +439,7 @@ impl Bridge { if let Some(sessions) = sessions_opt.as_mut() { if sessions.is_empty() { if let ProxyEvent::NewStream { stream_id, .. } = ev { - let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Error("tunnel stopped".into()))).await; + let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Error("tunnel stopped".into()))); } continue; } @@ -508,7 +513,7 @@ impl Bridge { } else { // Drop it, not connected if let ProxyEvent::NewStream { stream_id, .. } = ev { - let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Error("tunnel stopped".into()))).await; + let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Error("tunnel stopped".into()))); } } } @@ -522,6 +527,28 @@ impl Bridge { Ok(()) } + fn reset_proxy_streams( + &self, + tx: &mpsc::Sender, + proxy_tx: &mpsc::UnboundedSender<(u16, ProxyToClientMsg)>, + reason: &str, + ) { + if proxy_tx + .send((0, ProxyToClientMsg::Close)) + .is_err() + { + let tx_clone = tx.clone(); + let reason_str = reason.to_string(); + tokio::spawn(async move { + let _ = tx_clone + .send(UiEvent::Log(format!( + "Failed to reset local proxy streams ({reason_str})" + ))) + .await; + }); + } + } + async fn emit_metrics(&mut self, tx: &mpsc::Sender) { let now = Instant::now(); let elapsed = now.duration_since(self.last_sample_at).as_secs_f64().max(0.001); diff --git a/ostp-client/src/runner.rs b/ostp-client/src/runner.rs index b2247d7..f54b62a 100644 --- a/ostp-client/src/runner.rs +++ b/ostp-client/src/runner.rs @@ -154,7 +154,7 @@ pub async fn run_client_core( } let (proxy_events_tx, proxy_events_rx) = mpsc::channel(10000); - let (client_msgs_tx, client_msgs_rx) = mpsc::channel(10000); + let (client_msgs_tx, client_msgs_rx) = mpsc::unbounded_channel(); let bridge = Bridge::new(&config, metrics)?; diff --git a/ostp-client/src/tunnel/mod.rs b/ostp-client/src/tunnel/mod.rs index 8bfd5a6..adfb8bc 100644 --- a/ostp-client/src/tunnel/mod.rs +++ b/ostp-client/src/tunnel/mod.rs @@ -61,7 +61,7 @@ pub async fn run_local_proxy( debug: bool, shutdown: watch::Receiver, proxy_events_tx: mpsc::Sender, - client_msgs_rx: mpsc::Receiver<(u16, ProxyToClientMsg)>, + client_msgs_rx: mpsc::UnboundedReceiver<(u16, ProxyToClientMsg)>, ) -> anyhow::Result<()> { run_local_socks5_proxy(cfg, ostp, exclusions, debug, shutdown, proxy_events_tx, client_msgs_rx).await } diff --git a/ostp-client/src/tunnel/proxy.rs b/ostp-client/src/tunnel/proxy.rs index 09b00a8..284c16f 100644 --- a/ostp-client/src/tunnel/proxy.rs +++ b/ostp-client/src/tunnel/proxy.rs @@ -15,7 +15,7 @@ pub async fn run_local_socks5_proxy( debug: bool, mut shutdown: watch::Receiver, proxy_events_tx: mpsc::Sender, - mut client_msgs_rx: mpsc::Receiver<(u16, ProxyToClientMsg)>, + mut client_msgs_rx: mpsc::UnboundedReceiver<(u16, ProxyToClientMsg)>, ) -> Result<()> { let connect_timeout = Duration::from_millis(cfg.connect_timeout_ms.max(1)); let listener = TcpListener::bind(&cfg.bind_addr) @@ -31,7 +31,7 @@ pub async fn run_local_socks5_proxy( let (connect_tx, mut connect_rx) = mpsc::channel(128); let mut next_stream_id: u16 = 1; - let mut active_streams: HashMap> = HashMap::new(); + let mut active_streams: HashMap> = HashMap::new(); loop { tokio::select! { @@ -46,7 +46,7 @@ pub async fn run_local_socks5_proxy( next_stream_id = next_stream_id.wrapping_add(1); if next_stream_id == 0 { next_stream_id = 1; } - let (tx, rx) = mpsc::channel(256); + let (tx, rx) = mpsc::unbounded_channel(); active_streams.insert(stream_id, tx); let event_tx = proxy_events_tx.clone(); @@ -78,8 +78,17 @@ pub async fn run_local_socks5_proxy( }); } Some((stream_id, msg)) = client_msgs_rx.recv() => { - if let Some(tx) = active_streams.get(&stream_id) { - if tx.send(msg).await.is_err() { + if stream_id == 0 { + if let ProxyToClientMsg::Close = msg { + if debug { + eprintln!("[ostp-client] Resetting all active proxy streams on reconnect"); + } + for (_, tx) in active_streams.drain() { + let _ = tx.send(ProxyToClientMsg::Close); + } + } + } else if let Some(tx) = active_streams.get(&stream_id) { + if tx.send(msg).is_err() { active_streams.remove(&stream_id); } } @@ -116,7 +125,7 @@ async fn handle_proxy_client( mut client: TcpStream, stream_id: u16, event_tx: mpsc::Sender, - mut rx: mpsc::Receiver, + mut rx: mpsc::UnboundedReceiver, close_tx: mpsc::Sender, connect_timeout: Duration, debug: bool, diff --git a/ostp-core/src/protocol.rs b/ostp-core/src/protocol.rs index 9336b84..5977d6d 100644 --- a/ostp-core/src/protocol.rs +++ b/ostp-core/src/protocol.rs @@ -415,12 +415,21 @@ impl ProtocolMachine { } let now = Instant::now(); + let base_rto_ms = self.rto.as_millis().max(1) as u64; for frame in self.sent_history.iter_mut() { - if frame.retries >= self.max_retries { - tracing::error!("FATAL: Frame {} exceeded max retries ({}). Connection is dead.", frame.nonce, self.max_retries); - return Err(ProtocolError::State("connection dead, max retries exceeded".into())); + if frame.retries == self.max_retries { + tracing::warn!( + "Frame {} exceeded max retries ({}); continuing with backoff", + frame.nonce, + self.max_retries + ); } - if now.duration_since(frame.last_sent) >= self.rto { + + let retry_over = frame.retries.saturating_sub(self.max_retries); + let backoff_factor = 1u64 << retry_over.min(6); + let effective_rto = Duration::from_millis(base_rto_ms.saturating_mul(backoff_factor)); + + if now.duration_since(frame.last_sent) >= effective_rto { frame.last_sent = now; frame.retries = frame.retries.saturating_add(1); actions.push(ProtocolAction::SendDatagram(frame.bytes.clone())); diff --git a/ostp-jni/src/lib.rs b/ostp-jni/src/lib.rs index 796f5f7..b35e51f 100644 --- a/ostp-jni/src/lib.rs +++ b/ostp-jni/src/lib.rs @@ -75,7 +75,7 @@ pub extern "system" fn Java_net_ostp_client_OstpClientSdk_startClient( }; let (proxy_events_tx, proxy_events_rx) = mpsc::channel(512); - let (client_msgs_tx, client_msgs_rx) = mpsc::channel(512); + let (client_msgs_tx, client_msgs_rx) = mpsc::unbounded_channel(); let metrics = Arc::new(BridgeMetrics { bytes_sent: portable_atomic::AtomicU64::new(0), diff --git a/scripts/build.ps1 b/scripts/build.ps1 index c161d53..c2dacee 100644 --- a/scripts/build.ps1 +++ b/scripts/build.ps1 @@ -30,7 +30,7 @@ if (Test-Path $CargoToml) { $NewVersionStr = 'version = "' + $Version + '"' $NewContent = $Content -replace 'version\s*=\s*"\d+\.\d+\.\d+"', $NewVersionStr [System.IO.File]::WriteAllText($CargoToml, $NewContent) - Write-Output "✔ Bounded workspace package to target release version: v$Version" + Write-Output "[OK] Bounded workspace package to target release version: v$Version" } } @@ -88,7 +88,7 @@ foreach ($item in $WindowsTargets) { Compress-Archive -Path "$targetStaging\*" -DestinationPath $archivePath -Force $ReleaseArchives += $archivePath - Write-Output "✔ SUCCESSFULLY PACKAGED: $archiveName" + Write-Output "[OK] SUCCESSFULLY PACKAGED: $archiveName" if ($Flatten) { $RawReleaseDir = Join-Path $DistDir "release" @@ -99,7 +99,7 @@ foreach ($item in $WindowsTargets) { } } } else { - Write-Output "⚠ FAILED compiling Windows $arch ($target). Missing local platform C++ toolchain components." + Write-Output "[WARN] FAILED compiling Windows $arch ($target). Missing local platform C++ toolchain components." } } # Restore environment variables @@ -155,7 +155,7 @@ if (Get-Command wsl -ErrorAction SilentlyContinue) { & wsl tar -czf $wslArchiveFile -C $wslStagingDir $bin $ReleaseArchives += Join-Path $DistDir $archiveName - Write-Output "✔ SUCCESSFULLY PACKAGED: $archiveName" + Write-Output "[OK] SUCCESSFULLY PACKAGED: $archiveName" if ($Flatten) { $RawReleaseDir = Join-Path $DistDir "release" @@ -166,11 +166,11 @@ if (Get-Command wsl -ErrorAction SilentlyContinue) { } } } else { - Write-Output "⚠ FAILED compiling Linux $arch ($target)." + Write-Output "[WARN] FAILED compiling Linux $arch ($target)." } } } else { - Write-Output "⚠ WSL utility not discovered on host. Skipping Linux binary compilations." + Write-Output "[WARN] WSL utility not discovered on host. Skipping Linux binary compilations." } # Dissolve staging buffer directory @@ -182,7 +182,7 @@ Write-Output "=========================================================" if ($ReleaseArchives.Count -gt 0) { $ReleaseArchives | ForEach-Object { Write-Output " [+] $_" } } else { - Write-Output "❌ CRITICAL: No architectures compiled successfully." + Write-Output "[ERROR] CRITICAL: No architectures compiled successfully." Pop-Location exit 1 } @@ -214,11 +214,11 @@ Write-Output "Deploying trigger tag to GitHub..." & git push origin "v$Version" --force if ($LASTEXITCODE -eq 0) { - Write-Output "`n🚀 EXCELLENT! Release trigger successfully synchronized with Cloud runners!" - Write-Output "✨ GitHub Actions is now compiling all 13 architectures in parallel." - Write-Output "🔗 Live monitoring link: https://github.com/ospab/ostp/actions" + Write-Output "`n[OK] EXCELLENT! Release trigger successfully synchronized with Cloud runners!" + Write-Output "[INFO] GitHub Actions is now compiling all 13 architectures in parallel." + Write-Output "[INFO] Live monitoring link: https://github.com/ospab/ostp/actions" } else { - Write-Output "`n❌ Failed to deliver release tag to remote origin." + Write-Output "`n[ERROR] Failed to deliver release tag to remote origin." } Pop-Location