From 0f81140f0646379c263e8294f56dac4e1b299d4f Mon Sep 17 00:00:00 2001 From: ospab Date: Fri, 15 May 2026 18:34:32 +0300 Subject: [PATCH] feat: resolve flow control, tun crash route cleanup, log pollution, padding caps --- Cargo.lock | 8 ++-- README.md | 6 +++ ostp-client/src/bridge.rs | 29 +++++++++--- ostp-client/src/runner.rs | 1 - ostp-client/src/tunnel/linux_handler.rs | 59 ++++++++++++++++-------- ostp-client/src/tunnel/mod.rs | 18 -------- ostp-client/src/tunnel/wintun_handler.rs | 47 ++++++++++++------- ostp-core/src/protocol.rs | 4 ++ ostp-server/src/lib.rs | 8 +++- 9 files changed, 113 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0585755..aea5133 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -485,7 +485,7 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" [[package]] name = "ostp" -version = "0.1.34" +version = "0.1.35" dependencies = [ "anyhow", "base64", @@ -500,7 +500,7 @@ dependencies = [ [[package]] name = "ostp-client" -version = "0.1.34" +version = "0.1.35" dependencies = [ "anyhow", "bytes", @@ -515,7 +515,7 @@ dependencies = [ [[package]] name = "ostp-core" -version = "0.1.34" +version = "0.1.35" dependencies = [ "anyhow", "async-trait", @@ -548,7 +548,7 @@ dependencies = [ [[package]] name = "ostp-server" -version = "0.1.34" +version = "0.1.35" dependencies = [ "anyhow", "bytes", diff --git a/README.md b/README.md index fe5b9ff..7769b56 100644 --- a/README.md +++ b/README.md @@ -109,6 +109,12 @@ Configuration parameters are defined within `config.json` aligned adjacent to th } ``` +> [!IMPORTANT] +> **TUN Mode & Privileges** +> To route all global operating system traffic through OSTP, change `"enable": false` to `true` in the `tun` object. +> - **Windows**: Requires running `ostp` as Administrator. The pre-packaged `tun2socks.exe` dependency must be located next to the `ostp` binary. +> - **Linux**: Requires running `ostp` as root. The `tun2socks` and `iproute2` packages must be available in the system PATH or alongside the binary. + ### Execution Parameters Initiate telemetry processing by assigning the active configuration target: diff --git a/ostp-client/src/bridge.rs b/ostp-client/src/bridge.rs index 9e848ba..8806a0d 100644 --- a/ostp-client/src/bridge.rs +++ b/ostp-client/src/bridge.rs @@ -104,6 +104,7 @@ impl Bridge { let mut sessions_opt: Option> = None; let mut udp_rx_opt: Option> = None; let mut _proxy_guard: Option = None; + let mut stream_map: std::collections::HashMap = std::collections::HashMap::new(); loop { tokio::select! { @@ -122,6 +123,7 @@ impl Bridge { _proxy_guard = None; sessions_opt = None; udp_rx_opt = None; + stream_map.clear(); tx.send(UiEvent::TunnelStopped).await.ok(); let stop_msg = if self.mode == "tun" { "TUN Tunnel stopped" } else { "Bridge stopped" }; tx.send(UiEvent::Log(stop_msg.to_string())).await.ok(); @@ -206,6 +208,7 @@ impl Bridge { self.running = false; _proxy_guard = None; sessions_opt = None; + stream_map.clear(); // User logic handles UI restart let _ = tx.send(UiEvent::TunnelStopped).await; } @@ -234,6 +237,7 @@ impl Bridge { self.running = false; _proxy_guard = None; sessions_opt = None; + stream_map.clear(); let _ = tx.send(UiEvent::TunnelStopped).await; continue; } @@ -280,7 +284,10 @@ impl Bridge { } } } - proxy_ev = proxy_rx.recv(), if self.running => { + proxy_ev = proxy_rx.recv(), if self.running && sessions_opt.as_ref().map(|s| { + // §3 FIX: Apply backpressure. Suspend pulling from local proxy if ARQ buffers exceed 1024 unacked frames + s.iter().all(|ses| ses.machine.in_flight_count() < 1024) + }).unwrap_or(true) => { if let Some(ev) = proxy_ev { if let Some(sessions) = sessions_opt.as_mut() { if sessions.is_empty() { @@ -289,18 +296,25 @@ impl Bridge { } continue; } - let (stream_id, relay_msg) = match ev { + let (stream_id, relay_msg, is_close) = match ev { ProxyEvent::NewStream { stream_id, target } => { let _ = tx.send(UiEvent::Log(format!("Proxy CONNECT stream_id={stream_id} target={target}"))).await; - (stream_id, RelayMessage::Connect(target)) + (stream_id, RelayMessage::Connect(target), false) } - ProxyEvent::Data { stream_id, payload } => (stream_id, RelayMessage::Data(payload.to_vec())), + ProxyEvent::Data { stream_id, payload } => (stream_id, RelayMessage::Data(payload.to_vec()), false), ProxyEvent::Close { stream_id } => { let _ = tx.send(UiEvent::Log(format!("Proxy CLOSE stream_id={stream_id}"))).await; - (stream_id, RelayMessage::Close) + (stream_id, RelayMessage::Close, true) } }; - let session_index = (stream_id as usize) % sessions.len(); + let len = sessions.len(); + let session_index = *stream_map.entry(stream_id).or_insert_with(|| { + // §8 FIX: Load balance multiplexed streams randomly across available connection sockets + rand::thread_rng().gen_range(0..len) + }); + if is_close { + stream_map.remove(&stream_id); + } let session = &mut sessions[session_index]; let out_payload = Bytes::from(relay_msg.encode()); match session.machine.on_event(OstpEvent::Outbound(stream_id, out_payload)) { @@ -436,6 +450,7 @@ impl Bridge { crate::sysproxy::disable_windows_proxy(); sessions_opt = None; udp_rx_opt = None; + stream_map.clear(); let _ = tx.send(UiEvent::TunnelStopped).await; } } @@ -496,7 +511,7 @@ impl Bridge { psk, session_id, handshake_payload, - max_padding: 256, + max_padding: 1400, // §7 FIX: Allow padding up to full MTU size to break traffic analysis fingerprints padding_strategy: PaddingStrategy::Profile(self.profile), obfuscation_key: obf_key, max_reorder: 262144, diff --git a/ostp-client/src/runner.rs b/ostp-client/src/runner.rs index eae3c23..ab5d5fb 100644 --- a/ostp-client/src/runner.rs +++ b/ostp-client/src/runner.rs @@ -216,7 +216,6 @@ pub async fn run_client(config: crate::config::ClientConfig) -> Result<()> { if let Some(task) = wintun_task { let _ = task.await?; } - tunnel::cleanup().await?; Ok(()) } diff --git a/ostp-client/src/tunnel/linux_handler.rs b/ostp-client/src/tunnel/linux_handler.rs index dd3902e..5e726fa 100644 --- a/ostp-client/src/tunnel/linux_handler.rs +++ b/ostp-client/src/tunnel/linux_handler.rs @@ -4,10 +4,38 @@ use tokio::sync::watch; #[cfg(target_os = "linux")] use std::net::ToSocketAddrs; #[cfg(target_os = "linux")] -use std::process::{Command, Stdio}; +use std::process::{Command, Stdio, Child}; #[cfg(target_os = "linux")] use std::io::{BufRead, BufReader}; +#[cfg(target_os = "linux")] +struct LinuxRouteGuard { + server_ip_str: String, + default_gw: String, + default_if: String, + child: Option, +} + +#[cfg(target_os = "linux")] +impl Drop for LinuxRouteGuard { + fn drop(&mut self) { + if let Some(mut child) = self.child.take() { + let _ = child.kill(); + } + let cleanup_script = format!( + "ip route del 0.0.0.0/1 dev ostp_tun || true; \ + ip route del 128.0.0.0/1 dev ostp_tun || true; \ + ip route del {} via {} dev {} || true; \ + ip route del 1.1.1.1 via {} dev {} || true; \ + ip link set dev ostp_tun down || true; \ + ip tuntap del name ostp_tun mode tun || true", + self.server_ip_str, self.default_gw, self.default_if, + self.default_gw, self.default_if + ); + let _ = Command::new("sh").args(["-c", &cleanup_script]).output(); + } +} + #[cfg(target_os = "linux")] pub async fn run_linux_tunnel( config: crate::config::ClientConfig, @@ -124,6 +152,13 @@ pub async fn run_linux_tunnel( .spawn() .map_err(|e| anyhow!("Failed to spawn tun2socks process: {}", e))?; + let mut _guard = LinuxRouteGuard { + server_ip_str: server_ip_str.clone(), + default_gw: default_gw.clone(), + default_if: default_if.clone(), + child: None, + }; + println!("[client] TUN Tunnel established, Linux traffic is now routing through OSTP."); if debug { @@ -145,29 +180,15 @@ pub async fn run_linux_tunnel( }); } + _guard.child = Some(child); + // 6. Wait for shutdown signal let _ = shutdown.changed().await; println!("[client] Deactivating TUN tunnel and restoring Linux network topology..."); - // 7. Terminate process - let _ = child.kill(); - - // 8. Cleanup routing and virtual interface - let cleanup_script = format!( - "ip route del 0.0.0.0/1 dev ostp_tun || true; \ - ip route del 128.0.0.0/1 dev ostp_tun || true; \ - ip route del {} via {} dev {} || true; \ - ip route del 1.1.1.1 via {} dev {} || true; \ - ip link set dev ostp_tun down || true; \ - ip tuntap del name ostp_tun mode tun || true", - server_ip_str, default_gw, default_if, - default_gw, default_if - ); - - let _ = Command::new("sh") - .args(["-c", &cleanup_script]) - .output()?; + // Drop guard runs cleanup automatically + drop(_guard); println!("[client] Linux TUN Tunnel stopped."); diff --git a/ostp-client/src/tunnel/mod.rs b/ostp-client/src/tunnel/mod.rs index 83ffe97..8bfd5a6 100644 --- a/ostp-client/src/tunnel/mod.rs +++ b/ostp-client/src/tunnel/mod.rs @@ -53,24 +53,6 @@ pub enum ProxyToClientMsg { Error(String), } -#[allow(dead_code)] -pub struct TunnelConfig { - pub local_bind: String, - pub remote_addr: String, -} - -impl Default for TunnelConfig { - fn default() -> Self { - Self { - local_bind: "127.0.0.1:1080".to_string(), - remote_addr: "127.0.0.1:443".to_string(), - } - } -} - -pub async fn cleanup() -> anyhow::Result<()> { - Ok(()) -} pub async fn run_local_proxy( cfg: LocalProxyConfig, diff --git a/ostp-client/src/tunnel/wintun_handler.rs b/ostp-client/src/tunnel/wintun_handler.rs index 93ddfd1..ec8aae0 100644 --- a/ostp-client/src/tunnel/wintun_handler.rs +++ b/ostp-client/src/tunnel/wintun_handler.rs @@ -7,8 +7,29 @@ pub async fn run_wintun_tunnel( mut shutdown: watch::Receiver, ) -> Result<()> { use std::net::ToSocketAddrs; - use std::process::{Command, Stdio}; + use std::process::{Command, Stdio, Child}; + + struct WintunGuard { + server_ip_str: String, + child: Option, + } + impl Drop for WintunGuard { + fn drop(&mut self) { + if let Some(mut child) = self.child.take() { + let _ = child.kill(); + } + let cleanup_script = format!( + "$remote_ip = '{}'\n\ + Remove-NetRoute -DestinationPrefix \"$remote_ip/32\" -Confirm:$false -ErrorAction SilentlyContinue\n\ + Remove-NetRoute -DestinationPrefix \"1.1.1.1/32\" -Confirm:$false -ErrorAction SilentlyContinue\n\ + Remove-NetFirewallRule -DisplayName 'OSTP Tunnel*' -ErrorAction SilentlyContinue\n", + self.server_ip_str + ); + let _ = Command::new("powershell").args(["-Command", &cleanup_script]).output(); + } + } + let debug = config.debug; if debug { @@ -94,6 +115,11 @@ pub async fn run_wintun_tunnel( .spawn() .map_err(|e| anyhow!("Failed to launch tun2socks.exe background process: {}", e))?; + let mut _guard = WintunGuard { + server_ip_str: server_ip_str.clone(), + child: None, // Will set below + }; + // 5. Once tun2socks creates the interface, apply network settings (IP, metric) tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; @@ -116,6 +142,8 @@ pub async fn run_wintun_tunnel( // 6. Spawn thread to keep logging tun2socks output if in debug mode let mut stdout = child.stdout.take(); let mut stderr = child.stderr.take(); + _guard.child = Some(child); + if debug { std::thread::spawn(move || { use std::io::{BufRead, BufReader}; @@ -142,21 +170,8 @@ pub async fn run_wintun_tunnel( println!("[client] Deactivating TUN tunnel and restoring system network topology..."); - // 8. Terminate tun2socks - let _ = child.kill(); - - // 9. Run cleanup routing script - let cleanup_script = format!( - "$remote_ip = '{}'\n\ - Remove-NetRoute -DestinationPrefix \"$remote_ip/32\" -Confirm:$false -ErrorAction SilentlyContinue\n\ - Remove-NetRoute -DestinationPrefix \"1.1.1.1/32\" -Confirm:$false -ErrorAction SilentlyContinue\n\ - Remove-NetFirewallRule -DisplayName 'OSTP Tunnel*' -ErrorAction SilentlyContinue\n", - server_ip_str - ); - - let _ = Command::new("powershell") - .args(["-Command", &cleanup_script]) - .output()?; + // Drop guard runs cleanup automatically + drop(_guard); println!("[client] TUN Tunnel stopped."); diff --git a/ostp-core/src/protocol.rs b/ostp-core/src/protocol.rs index 6c6708f..d33ef07 100644 --- a/ostp-core/src/protocol.rs +++ b/ostp-core/src/protocol.rs @@ -123,6 +123,10 @@ impl ProtocolMachine { }) } + pub fn in_flight_count(&self) -> usize { + self.sent_history.len() + } + pub fn state(&self) -> OstpState { self.state } diff --git a/ostp-server/src/lib.rs b/ostp-server/src/lib.rs index d659e3f..1eb56f4 100644 --- a/ostp-server/src/lib.rs +++ b/ostp-server/src/lib.rs @@ -145,11 +145,15 @@ pub async fn run_server( while let Some(ev) = ui_event_rx.recv().await { match ev { UiEvent::Log(msg) => { - if debug || msg.starts_with("Peer ") || msg.starts_with("Listening on ") { + if debug || msg.starts_with("Listening on ") || msg.starts_with("Hot-reloaded ") { println!("[ostp-server] {msg}"); } } - UiEvent::KeyCreated { key } => println!("[ostp-server] New access key created: {key}"), + UiEvent::KeyCreated { key } => { + if debug { + println!("[ostp-server] New access key created: {key}"); + } + } UiEvent::UnauthorizedProbe { peer, bytes } => { if debug { println!("[ostp-server] WARNING: unauthorized probe from {peer} ({bytes} bytes)");