feat: resolve flow control, tun crash route cleanup, log pollution, padding caps

This commit is contained in:
ospab 2026-05-15 18:34:32 +03:00
parent cdc3f408f9
commit 0f81140f06
9 changed files with 113 additions and 67 deletions

8
Cargo.lock generated
View File

@ -485,7 +485,7 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381"
[[package]] [[package]]
name = "ostp" name = "ostp"
version = "0.1.34" version = "0.1.35"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",
@ -500,7 +500,7 @@ dependencies = [
[[package]] [[package]]
name = "ostp-client" name = "ostp-client"
version = "0.1.34" version = "0.1.35"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"bytes", "bytes",
@ -515,7 +515,7 @@ dependencies = [
[[package]] [[package]]
name = "ostp-core" name = "ostp-core"
version = "0.1.34" version = "0.1.35"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait", "async-trait",
@ -548,7 +548,7 @@ dependencies = [
[[package]] [[package]]
name = "ostp-server" name = "ostp-server"
version = "0.1.34" version = "0.1.35"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"bytes", "bytes",

View File

@ -109,6 +109,12 @@ Configuration parameters are defined within `config.json` aligned adjacent to th
} }
``` ```
> [!IMPORTANT]
> **TUN Mode & Privileges**
> To route all global operating system traffic through OSTP, change `"enable": false` to `true` in the `tun` object.
> - **Windows**: Requires running `ostp` as Administrator. The pre-packaged `tun2socks.exe` dependency must be located next to the `ostp` binary.
> - **Linux**: Requires running `ostp` as root. The `tun2socks` and `iproute2` packages must be available in the system PATH or alongside the binary.
### Execution Parameters ### Execution Parameters
Initiate telemetry processing by assigning the active configuration target: Initiate telemetry processing by assigning the active configuration target:

View File

@ -104,6 +104,7 @@ impl Bridge {
let mut sessions_opt: Option<Vec<SessionState>> = None; let mut sessions_opt: Option<Vec<SessionState>> = None;
let mut udp_rx_opt: Option<mpsc::Receiver<(usize, Bytes)>> = None; let mut udp_rx_opt: Option<mpsc::Receiver<(usize, Bytes)>> = None;
let mut _proxy_guard: Option<crate::sysproxy::WindowsProxyGuard> = None; let mut _proxy_guard: Option<crate::sysproxy::WindowsProxyGuard> = None;
let mut stream_map: std::collections::HashMap<u16, usize> = std::collections::HashMap::new();
loop { loop {
tokio::select! { tokio::select! {
@ -122,6 +123,7 @@ impl Bridge {
_proxy_guard = None; _proxy_guard = None;
sessions_opt = None; sessions_opt = None;
udp_rx_opt = None; udp_rx_opt = None;
stream_map.clear();
tx.send(UiEvent::TunnelStopped).await.ok(); tx.send(UiEvent::TunnelStopped).await.ok();
let stop_msg = if self.mode == "tun" { "TUN Tunnel stopped" } else { "Bridge stopped" }; let stop_msg = if self.mode == "tun" { "TUN Tunnel stopped" } else { "Bridge stopped" };
tx.send(UiEvent::Log(stop_msg.to_string())).await.ok(); tx.send(UiEvent::Log(stop_msg.to_string())).await.ok();
@ -206,6 +208,7 @@ impl Bridge {
self.running = false; self.running = false;
_proxy_guard = None; _proxy_guard = None;
sessions_opt = None; sessions_opt = None;
stream_map.clear();
// User logic handles UI restart // User logic handles UI restart
let _ = tx.send(UiEvent::TunnelStopped).await; let _ = tx.send(UiEvent::TunnelStopped).await;
} }
@ -234,6 +237,7 @@ impl Bridge {
self.running = false; self.running = false;
_proxy_guard = None; _proxy_guard = None;
sessions_opt = None; sessions_opt = None;
stream_map.clear();
let _ = tx.send(UiEvent::TunnelStopped).await; let _ = tx.send(UiEvent::TunnelStopped).await;
continue; continue;
} }
@ -280,7 +284,10 @@ impl Bridge {
} }
} }
} }
proxy_ev = proxy_rx.recv(), if self.running => { proxy_ev = proxy_rx.recv(), if self.running && sessions_opt.as_ref().map(|s| {
// §3 FIX: Apply backpressure. Suspend pulling from local proxy if ARQ buffers exceed 1024 unacked frames
s.iter().all(|ses| ses.machine.in_flight_count() < 1024)
}).unwrap_or(true) => {
if let Some(ev) = proxy_ev { if let Some(ev) = proxy_ev {
if let Some(sessions) = sessions_opt.as_mut() { if let Some(sessions) = sessions_opt.as_mut() {
if sessions.is_empty() { if sessions.is_empty() {
@ -289,18 +296,25 @@ impl Bridge {
} }
continue; continue;
} }
let (stream_id, relay_msg) = match ev { let (stream_id, relay_msg, is_close) = match ev {
ProxyEvent::NewStream { stream_id, target } => { ProxyEvent::NewStream { stream_id, target } => {
let _ = tx.send(UiEvent::Log(format!("Proxy CONNECT stream_id={stream_id} target={target}"))).await; let _ = tx.send(UiEvent::Log(format!("Proxy CONNECT stream_id={stream_id} target={target}"))).await;
(stream_id, RelayMessage::Connect(target)) (stream_id, RelayMessage::Connect(target), false)
} }
ProxyEvent::Data { stream_id, payload } => (stream_id, RelayMessage::Data(payload.to_vec())), ProxyEvent::Data { stream_id, payload } => (stream_id, RelayMessage::Data(payload.to_vec()), false),
ProxyEvent::Close { stream_id } => { ProxyEvent::Close { stream_id } => {
let _ = tx.send(UiEvent::Log(format!("Proxy CLOSE stream_id={stream_id}"))).await; let _ = tx.send(UiEvent::Log(format!("Proxy CLOSE stream_id={stream_id}"))).await;
(stream_id, RelayMessage::Close) (stream_id, RelayMessage::Close, true)
} }
}; };
let session_index = (stream_id as usize) % sessions.len(); let len = sessions.len();
let session_index = *stream_map.entry(stream_id).or_insert_with(|| {
// §8 FIX: Load balance multiplexed streams randomly across available connection sockets
rand::thread_rng().gen_range(0..len)
});
if is_close {
stream_map.remove(&stream_id);
}
let session = &mut sessions[session_index]; let session = &mut sessions[session_index];
let out_payload = Bytes::from(relay_msg.encode()); let out_payload = Bytes::from(relay_msg.encode());
match session.machine.on_event(OstpEvent::Outbound(stream_id, out_payload)) { match session.machine.on_event(OstpEvent::Outbound(stream_id, out_payload)) {
@ -436,6 +450,7 @@ impl Bridge {
crate::sysproxy::disable_windows_proxy(); crate::sysproxy::disable_windows_proxy();
sessions_opt = None; sessions_opt = None;
udp_rx_opt = None; udp_rx_opt = None;
stream_map.clear();
let _ = tx.send(UiEvent::TunnelStopped).await; let _ = tx.send(UiEvent::TunnelStopped).await;
} }
} }
@ -496,7 +511,7 @@ impl Bridge {
psk, psk,
session_id, session_id,
handshake_payload, handshake_payload,
max_padding: 256, max_padding: 1400, // §7 FIX: Allow padding up to full MTU size to break traffic analysis fingerprints
padding_strategy: PaddingStrategy::Profile(self.profile), padding_strategy: PaddingStrategy::Profile(self.profile),
obfuscation_key: obf_key, obfuscation_key: obf_key,
max_reorder: 262144, max_reorder: 262144,

View File

@ -216,7 +216,6 @@ pub async fn run_client(config: crate::config::ClientConfig) -> Result<()> {
if let Some(task) = wintun_task { if let Some(task) = wintun_task {
let _ = task.await?; let _ = task.await?;
} }
tunnel::cleanup().await?;
Ok(()) Ok(())
} }

View File

@ -4,10 +4,38 @@ use tokio::sync::watch;
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
use std::process::{Command, Stdio}; use std::process::{Command, Stdio, Child};
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
use std::io::{BufRead, BufReader}; use std::io::{BufRead, BufReader};
#[cfg(target_os = "linux")]
struct LinuxRouteGuard {
server_ip_str: String,
default_gw: String,
default_if: String,
child: Option<Child>,
}
#[cfg(target_os = "linux")]
impl Drop for LinuxRouteGuard {
fn drop(&mut self) {
if let Some(mut child) = self.child.take() {
let _ = child.kill();
}
let cleanup_script = format!(
"ip route del 0.0.0.0/1 dev ostp_tun || true; \
ip route del 128.0.0.0/1 dev ostp_tun || true; \
ip route del {} via {} dev {} || true; \
ip route del 1.1.1.1 via {} dev {} || true; \
ip link set dev ostp_tun down || true; \
ip tuntap del name ostp_tun mode tun || true",
self.server_ip_str, self.default_gw, self.default_if,
self.default_gw, self.default_if
);
let _ = Command::new("sh").args(["-c", &cleanup_script]).output();
}
}
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
pub async fn run_linux_tunnel( pub async fn run_linux_tunnel(
config: crate::config::ClientConfig, config: crate::config::ClientConfig,
@ -124,6 +152,13 @@ pub async fn run_linux_tunnel(
.spawn() .spawn()
.map_err(|e| anyhow!("Failed to spawn tun2socks process: {}", e))?; .map_err(|e| anyhow!("Failed to spawn tun2socks process: {}", e))?;
let mut _guard = LinuxRouteGuard {
server_ip_str: server_ip_str.clone(),
default_gw: default_gw.clone(),
default_if: default_if.clone(),
child: None,
};
println!("[client] TUN Tunnel established, Linux traffic is now routing through OSTP."); println!("[client] TUN Tunnel established, Linux traffic is now routing through OSTP.");
if debug { if debug {
@ -145,29 +180,15 @@ pub async fn run_linux_tunnel(
}); });
} }
_guard.child = Some(child);
// 6. Wait for shutdown signal // 6. Wait for shutdown signal
let _ = shutdown.changed().await; let _ = shutdown.changed().await;
println!("[client] Deactivating TUN tunnel and restoring Linux network topology..."); println!("[client] Deactivating TUN tunnel and restoring Linux network topology...");
// 7. Terminate process // Drop guard runs cleanup automatically
let _ = child.kill(); drop(_guard);
// 8. Cleanup routing and virtual interface
let cleanup_script = format!(
"ip route del 0.0.0.0/1 dev ostp_tun || true; \
ip route del 128.0.0.0/1 dev ostp_tun || true; \
ip route del {} via {} dev {} || true; \
ip route del 1.1.1.1 via {} dev {} || true; \
ip link set dev ostp_tun down || true; \
ip tuntap del name ostp_tun mode tun || true",
server_ip_str, default_gw, default_if,
default_gw, default_if
);
let _ = Command::new("sh")
.args(["-c", &cleanup_script])
.output()?;
println!("[client] Linux TUN Tunnel stopped."); println!("[client] Linux TUN Tunnel stopped.");

View File

@ -53,24 +53,6 @@ pub enum ProxyToClientMsg {
Error(String), Error(String),
} }
#[allow(dead_code)]
pub struct TunnelConfig {
pub local_bind: String,
pub remote_addr: String,
}
impl Default for TunnelConfig {
fn default() -> Self {
Self {
local_bind: "127.0.0.1:1080".to_string(),
remote_addr: "127.0.0.1:443".to_string(),
}
}
}
pub async fn cleanup() -> anyhow::Result<()> {
Ok(())
}
pub async fn run_local_proxy( pub async fn run_local_proxy(
cfg: LocalProxyConfig, cfg: LocalProxyConfig,

View File

@ -7,8 +7,29 @@ pub async fn run_wintun_tunnel(
mut shutdown: watch::Receiver<bool>, mut shutdown: watch::Receiver<bool>,
) -> Result<()> { ) -> Result<()> {
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use std::process::{Command, Stdio}; use std::process::{Command, Stdio, Child};
struct WintunGuard {
server_ip_str: String,
child: Option<Child>,
}
impl Drop for WintunGuard {
fn drop(&mut self) {
if let Some(mut child) = self.child.take() {
let _ = child.kill();
}
let cleanup_script = format!(
"$remote_ip = '{}'\n\
Remove-NetRoute -DestinationPrefix \"$remote_ip/32\" -Confirm:$false -ErrorAction SilentlyContinue\n\
Remove-NetRoute -DestinationPrefix \"1.1.1.1/32\" -Confirm:$false -ErrorAction SilentlyContinue\n\
Remove-NetFirewallRule -DisplayName 'OSTP Tunnel*' -ErrorAction SilentlyContinue\n",
self.server_ip_str
);
let _ = Command::new("powershell").args(["-Command", &cleanup_script]).output();
}
}
let debug = config.debug; let debug = config.debug;
if debug { if debug {
@ -94,6 +115,11 @@ pub async fn run_wintun_tunnel(
.spawn() .spawn()
.map_err(|e| anyhow!("Failed to launch tun2socks.exe background process: {}", e))?; .map_err(|e| anyhow!("Failed to launch tun2socks.exe background process: {}", e))?;
let mut _guard = WintunGuard {
server_ip_str: server_ip_str.clone(),
child: None, // Will set below
};
// 5. Once tun2socks creates the interface, apply network settings (IP, metric) // 5. Once tun2socks creates the interface, apply network settings (IP, metric)
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
@ -116,6 +142,8 @@ pub async fn run_wintun_tunnel(
// 6. Spawn thread to keep logging tun2socks output if in debug mode // 6. Spawn thread to keep logging tun2socks output if in debug mode
let mut stdout = child.stdout.take(); let mut stdout = child.stdout.take();
let mut stderr = child.stderr.take(); let mut stderr = child.stderr.take();
_guard.child = Some(child);
if debug { if debug {
std::thread::spawn(move || { std::thread::spawn(move || {
use std::io::{BufRead, BufReader}; use std::io::{BufRead, BufReader};
@ -142,21 +170,8 @@ pub async fn run_wintun_tunnel(
println!("[client] Deactivating TUN tunnel and restoring system network topology..."); println!("[client] Deactivating TUN tunnel and restoring system network topology...");
// 8. Terminate tun2socks // Drop guard runs cleanup automatically
let _ = child.kill(); drop(_guard);
// 9. Run cleanup routing script
let cleanup_script = format!(
"$remote_ip = '{}'\n\
Remove-NetRoute -DestinationPrefix \"$remote_ip/32\" -Confirm:$false -ErrorAction SilentlyContinue\n\
Remove-NetRoute -DestinationPrefix \"1.1.1.1/32\" -Confirm:$false -ErrorAction SilentlyContinue\n\
Remove-NetFirewallRule -DisplayName 'OSTP Tunnel*' -ErrorAction SilentlyContinue\n",
server_ip_str
);
let _ = Command::new("powershell")
.args(["-Command", &cleanup_script])
.output()?;
println!("[client] TUN Tunnel stopped."); println!("[client] TUN Tunnel stopped.");

View File

@ -123,6 +123,10 @@ impl ProtocolMachine {
}) })
} }
pub fn in_flight_count(&self) -> usize {
self.sent_history.len()
}
pub fn state(&self) -> OstpState { pub fn state(&self) -> OstpState {
self.state self.state
} }

View File

@ -145,11 +145,15 @@ pub async fn run_server(
while let Some(ev) = ui_event_rx.recv().await { while let Some(ev) = ui_event_rx.recv().await {
match ev { match ev {
UiEvent::Log(msg) => { UiEvent::Log(msg) => {
if debug || msg.starts_with("Peer ") || msg.starts_with("Listening on ") { if debug || msg.starts_with("Listening on ") || msg.starts_with("Hot-reloaded ") {
println!("[ostp-server] {msg}"); println!("[ostp-server] {msg}");
} }
} }
UiEvent::KeyCreated { key } => println!("[ostp-server] New access key created: {key}"), UiEvent::KeyCreated { key } => {
if debug {
println!("[ostp-server] New access key created: {key}");
}
}
UiEvent::UnauthorizedProbe { peer, bytes } => { UiEvent::UnauthorizedProbe { peer, bytes } => {
if debug { if debug {
println!("[ostp-server] WARNING: unauthorized probe from {peer} ({bytes} bytes)"); println!("[ostp-server] WARNING: unauthorized probe from {peer} ({bytes} bytes)");