From ceb760e4ce3140336afbad5346d58d86a895c440 Mon Sep 17 00:00:00 2001 From: ospab Date: Thu, 21 May 2026 02:23:49 +0300 Subject: [PATCH] feat: implement server-side UoT and MTU tuning --- Cargo.lock | 3 + ostp-client/src/bridge.rs | 3 + ostp-client/src/config.rs | 8 ++ ostp-core/src/protocol.rs | 7 +- ostp-server/Cargo.toml | 3 + ostp-server/src/lib.rs | 60 ++++++++++- ostp-server/src/transport/mod.rs | 1 + ostp-server/src/transport/uot.rs | 171 +++++++++++++++++++++++++++++++ ostp/src/main.rs | 3 + 9 files changed, 253 insertions(+), 6 deletions(-) create mode 100644 ostp-server/src/transport/mod.rs create mode 100644 ostp-server/src/transport/uot.rs diff --git a/Cargo.lock b/Cargo.lock index b286657..c5f04b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1083,12 +1083,15 @@ version = "0.2.6" dependencies = [ "anyhow", "axum", + "base64", "bytes", + "hmac", "ostp-core", "portable-atomic", "rand", "serde", "serde_json", + "sha2", "socket2", "tokio", "tower-http", diff --git a/ostp-client/src/bridge.rs b/ostp-client/src/bridge.rs index 70a0c9f..4f2de60 100644 --- a/ostp-client/src/bridge.rs +++ b/ostp-client/src/bridge.rs @@ -77,6 +77,7 @@ pub struct Bridge { pub transport_mode: String, pub stealth_sni: String, pub stealth_port: u16, + pub mtu: usize, metrics: Arc, sample_sent: u64, @@ -110,6 +111,7 @@ impl Bridge { transport_mode: config.transport.mode.clone(), stealth_sni: config.transport.stealth_sni.clone(), stealth_port: config.transport.stealth_port, + mtu: config.ostp.mtu, metrics, sample_sent: 0, @@ -784,6 +786,7 @@ impl Bridge { max_sent_history: 32768, // Reduced: gap recovery handles unrecoverable frames handshake_pad_min: secrets.handshake_pad_min, handshake_pad_max: secrets.handshake_pad_max, + mtu: self.mtu, })?; let resolved_addrs: Vec = match tokio::net::lookup_host(&self.server_addr).await { diff --git a/ostp-client/src/config.rs b/ostp-client/src/config.rs index d8725dc..8fabe49 100644 --- a/ostp-client/src/config.rs +++ b/ostp-client/src/config.rs @@ -46,8 +46,12 @@ pub struct OstpConfig { pub access_key: String, pub handshake_timeout_ms: u64, pub io_timeout_ms: u64, + #[serde(default = "default_mtu")] + pub mtu: usize, } +fn default_mtu() -> usize { 1350 } + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LocalProxyConfig { pub bind_addr: String, @@ -100,6 +104,7 @@ impl Default for OstpConfig { access_key: String::new(), handshake_timeout_ms: 5000, io_timeout_ms: 2500, + mtu: default_mtu(), } } } @@ -148,6 +153,7 @@ struct RawUnifiedConfig { debug: Option, server: Option, access_key: Option, + mtu: Option, socks5_bind: Option, tun: Option, exclude: Option, @@ -199,6 +205,7 @@ impl ClientConfig { let is_tun = raw.tun.as_ref().and_then(|t| t.enable).unwrap_or(false); let server = raw.server.unwrap_or_else(|| "127.0.0.1:50000".to_string()); let key = raw.access_key.unwrap_or_default(); + let mtu = raw.mtu.unwrap_or(default_mtu()); let socks5 = raw.socks5_bind.unwrap_or_else(|| "127.0.0.1:1088".to_string()); let exclusions = raw.exclude.unwrap_or(RawExcludeSection { domains: None, @@ -219,6 +226,7 @@ impl ClientConfig { access_key: key, handshake_timeout_ms: 5000, io_timeout_ms: 2500, + mtu, }, local_proxy: LocalProxyConfig { bind_addr: socks5, diff --git a/ostp-core/src/protocol.rs b/ostp-core/src/protocol.rs index 062b10c..f049c17 100644 --- a/ostp-core/src/protocol.rs +++ b/ostp-core/src/protocol.rs @@ -38,6 +38,7 @@ pub struct ProtocolConfig { /// Different access keys produce different handshake packet sizes. pub handshake_pad_min: usize, pub handshake_pad_max: usize, + pub mtu: usize, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -129,7 +130,7 @@ impl ProtocolMachine { sent_history: VecDeque::with_capacity(config.max_sent_history.max(1)), session_id: config.session_id, handshake_payload: config.handshake_payload, - padder: AdaptivePadder::new(1200, config.max_padding, config.padding_strategy), + padder: AdaptivePadder::new(config.mtu, config.max_padding, config.padding_strategy), obfuscation_key: config.obfuscation_key, max_reorder: config.max_reorder.max(1), max_reorder_buffer: config.max_reorder_buffer.max(1), @@ -141,8 +142,8 @@ impl ProtocolMachine { last_ack_sent: Instant::now(), last_nack_sent: Instant::now() - Duration::from_secs(1), last_recv_advance: Instant::now(), - cc: CongestionController::new(1200), - handshake_pad_min: config.handshake_pad_min.max(8), + cc: CongestionController::new(config.mtu as u64), + handshake_pad_min: config.handshake_pad_min.max(8), handshake_pad_max: config.handshake_pad_max.max(config.handshake_pad_min + 16), }) } diff --git a/ostp-server/Cargo.toml b/ostp-server/Cargo.toml index cf407b5..8261a34 100644 --- a/ostp-server/Cargo.toml +++ b/ostp-server/Cargo.toml @@ -17,3 +17,6 @@ socket2 = "0.6.3" axum = "0.8" tower-http = { version = "0.6", features = ["cors"] } portable-atomic.workspace = true +hmac.workspace = true +sha2.workspace = true +base64 = "0.22" diff --git a/ostp-server/src/lib.rs b/ostp-server/src/lib.rs index f46da6b..963fb2a 100644 --- a/ostp-server/src/lib.rs +++ b/ostp-server/src/lib.rs @@ -14,6 +14,7 @@ mod dispatcher; pub mod outbound; pub mod api; pub mod fallback; +pub mod transport; mod relay; mod signal; @@ -145,6 +146,7 @@ pub async fn run_server( // Defaults -- overridden per-session by dispatcher using derive_all_secrets() handshake_pad_min: 32, handshake_pad_max: 128, + mtu: 1350, }; let dispatcher = Dispatcher::new(protocol_config, shared_keys.clone()); @@ -212,7 +214,7 @@ pub async fn run_server( tracing::info!(listeners = bind_addrs.len(), keys = key_count, "server started"); tracing::info!("ARQ config: max_reorder=16384, reorder_buf=8192, sent_history=32768, rto=100ms"); tokio::select! { - res = run_server_loop(primary_socket, sockets, dispatcher, ui_cmd_rx, ui_event_tx, shared_keys, outbound, debug) => { + res = run_server_loop(bind_addrs.clone(), primary_socket, sockets, dispatcher, ui_cmd_rx, ui_event_tx, shared_keys, outbound, debug) => { if let Err(e) = res { tracing::error!("Server error: {e}"); } @@ -228,6 +230,7 @@ pub async fn run_server( // ── Server main loop ───────────────────────────────────────────────────────── async fn run_server_loop( + bind_addrs: Vec, primary_socket: std::sync::Arc, sockets: Vec>, mut dispatcher: Dispatcher, @@ -241,6 +244,8 @@ async fn run_server_loop( let (stream_tx, mut stream_rx) = mpsc::unbounded_channel::<(u32, u16, Vec)>(); let (connect_tx, mut connect_rx) = mpsc::unbounded_channel::<(u32, u16, String, Result<(tokio::net::tcp::OwnedWriteHalf, mpsc::Sender<()>), String>)>(); + let tcp_map = std::sync::Arc::new(tokio::sync::RwLock::new(HashMap::new())); + let socket = primary_socket; // Spawn a recv task for each socket, all feeding into the same channel let (udp_tx, mut udp_rx) = mpsc::channel(10000); @@ -262,6 +267,35 @@ async fn run_server_loop( } }); } + + // Spawn UoT (TCP) listeners + for bind_addr in &bind_addrs { + let addr = bind_addr.parse::().unwrap(); + let tcp_map_clone = tcp_map.clone(); + let shared_keys_clone = shared_keys.clone(); + let udp_tx_clone = udp_tx.clone(); + + tokio::spawn(async move { + if let Ok(listener) = tokio::net::TcpListener::bind(&addr).await { + tracing::info!("TCP (UoT) listener bound to {}", addr); + loop { + if let Ok((stream, peer_addr)) = listener.accept().await { + let tm = tcp_map_clone.clone(); + let keys = shared_keys_clone.clone(); + let tx = udp_tx_clone.clone(); + tokio::spawn(async move { + if let Err(e) = crate::transport::uot::handle_tcp_connection(stream, peer_addr, keys, tx, tm).await { + tracing::debug!("UoT connection from {} failed: {}", peer_addr, e); + } + }); + } + } + } else { + tracing::warn!("Failed to bind TCP (UoT) listener to {}", addr); + } + }); + } + drop(udp_tx); // Drop the original sender so the channel closes when all tasks end if debug { @@ -317,7 +351,17 @@ async fn run_server_loop( for resp in responses { let resp_len = resp.len(); - let _ = socket.send_to(&resp, peer_addr).await?; + let mut sent_tcp = false; + { + let map = tcp_map.read().await; + if let Some(tx) = map.get(&peer_addr) { + let _ = tx.try_send(resp.clone()); + sent_tcp = true; + } + } + if !sent_tcp { + let _ = socket.send_to(&resp, peer_addr).await?; + } let _ = ui_event_tx.send(UiEvent::Tx { peer: peer_ip, bytes: resp_len }); } @@ -391,7 +435,17 @@ async fn run_server_loop( } let (frames, dropped_sessions) = dispatcher.on_tick(); for (frame, peer_addr) in frames { - let _ = socket.send_to(&frame, peer_addr).await?; + let mut sent_tcp = false; + { + let map = tcp_map.read().await; + if let Some(tx) = map.get(&peer_addr) { + let _ = tx.try_send(frame.clone()); + sent_tcp = true; + } + } + if !sent_tcp { + let _ = socket.send_to(&frame, peer_addr).await?; + } } for sid in dropped_sessions { let _ = ui_event_tx.send(UiEvent::Log(format!("Session {sid} expired, releasing resources"))); diff --git a/ostp-server/src/transport/mod.rs b/ostp-server/src/transport/mod.rs new file mode 100644 index 0000000..6433eb6 --- /dev/null +++ b/ostp-server/src/transport/mod.rs @@ -0,0 +1 @@ +pub mod uot; diff --git a/ostp-server/src/transport/uot.rs b/ostp-server/src/transport/uot.rs new file mode 100644 index 0000000..eae9105 --- /dev/null +++ b/ostp-server/src/transport/uot.rs @@ -0,0 +1,171 @@ +use anyhow::{Context, Result}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use hmac::{Hmac, Mac}; +use sha2::Sha256; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::{Arc, RwLock as StdRwLock}; +use std::time::{SystemTime, UNIX_EPOCH}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; +use tokio::sync::{mpsc, RwLock}; +use tracing::{info, warn}; + +pub async fn handle_tcp_connection( + mut stream: TcpStream, + peer_addr: SocketAddr, + shared_keys: Arc>>, + udp_tx: mpsc::Sender<(Bytes, SocketAddr)>, + tcp_map: Arc>>>, +) -> Result<()> { + // 1. Read HTTP Handshake + let mut buf = [0u8; 4096]; + let mut header_len = 0; + loop { + let n = stream.read(&mut buf[header_len..]).await?; + if n == 0 { + anyhow::bail!("connection closed before handshake complete"); + } + header_len += n; + if buf[..header_len].windows(4).any(|w| w == b"\r\n\r\n") { + break; + } + if header_len == buf.len() { + anyhow::bail!("handshake headers too large"); + } + } + + let headers_str = String::from_utf8_lossy(&buf[..header_len]); + + // Fast-fail scanner bots + if !headers_str.starts_with("GET /stream HTTP/1.1\r\n") { + send_404(&mut stream).await?; + anyhow::bail!("invalid request line"); + } + + // Extract Authorization or Cookie for signature + let mut signature_base64 = None; + for line in headers_str.lines() { + let lower = line.to_ascii_lowercase(); + if lower.starts_with("authorization: bearer ") { + signature_base64 = Some(line[22..].trim().to_string()); + } else if lower.starts_with("cookie: ostp_token=") { + signature_base64 = Some(line[19..].trim().to_string()); + } + } + + let sig_b64 = match signature_base64 { + Some(s) => s, + None => { + send_404(&mut stream).await?; + anyhow::bail!("missing authorization"); + } + }; + + let sig_bytes = match base64::Engine::decode(&base64::engine::general_purpose::STANDARD_NO_PAD, &sig_b64) { + Ok(b) => b, + Err(_) => { + send_404(&mut stream).await?; + anyhow::bail!("invalid base64 signature"); + } + }; + + if sig_bytes.len() < 8 { + send_404(&mut stream).await?; + anyhow::bail!("signature too short"); + } + + let ts_bytes: [u8; 8] = sig_bytes[0..8].try_into().unwrap(); + let client_ts = u64::from_be_bytes(ts_bytes); + let provided_mac = &sig_bytes[8..]; + + let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); + if client_ts > now + 30 || client_ts < now.saturating_sub(60) { + send_404(&mut stream).await?; + anyhow::bail!("timestamp out of bounds (replay protection)"); + } + + // Verify HMAC against known keys + let keys = { + let guard = shared_keys.read().unwrap(); + guard.keys().cloned().collect::>() + }; + + let mut authenticated = false; + for key in keys { + let mut mac = Hmac::::new_from_slice(key.as_bytes()) + .unwrap_or_else(|_| Hmac::::new_from_slice(b"default").unwrap()); + mac.update(&ts_bytes); + if mac.verify_slice(provided_mac).is_ok() { + authenticated = true; + break; + } + } + + if !authenticated { + send_404(&mut stream).await?; + anyhow::bail!("unauthorized (invalid HMAC)"); + } + + // Reply 200 OK + let response = "HTTP/1.1 200 OK\r\nConnection: keep-alive\r\n\r\n"; + stream.write_all(response.as_bytes()).await?; + + info!("UoT client authenticated from {}", peer_addr); + + // Register this connection in the map + let (tx, mut rx) = mpsc::channel::(1024); + { + tcp_map.write().await.insert(peer_addr, tx); + } + + // Process streams + let (mut read_half, mut write_half) = stream.into_split(); + + // Spawn writer task + let peer_clone = peer_addr; + let tcp_map_clone = tcp_map.clone(); + let writer_task = tokio::spawn(async move { + while let Some(packet) = rx.recv().await { + let mut out = BytesMut::with_capacity(2 + packet.len()); + out.put_u16(packet.len() as u16); + out.put_slice(&packet); + if write_half.write_all(&out).await.is_err() { + break; + } + } + // Cleanup on writer exit + tcp_map_clone.write().await.remove(&peer_clone); + }); + + // Reader loop + let mut len_buf = [0u8; 2]; + loop { + if read_half.read_exact(&mut len_buf).await.is_err() { + break; + } + let len = u16::from_be_bytes(len_buf) as usize; + let mut packet_buf = vec![0u8; len]; + if read_half.read_exact(&mut packet_buf).await.is_err() { + break; + } + if udp_tx.send((Bytes::from(packet_buf), peer_addr)).await.is_err() { + break; + } + } + + writer_task.abort(); + tcp_map.write().await.remove(&peer_addr); + Ok(()) +} + +async fn send_404(stream: &mut TcpStream) -> Result<()> { + let body = "Not Found"; + let resp = format!( + "HTTP/1.1 404 Not Found\r\nContent-Type: text/plain\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + body.len(), + body + ); + let _ = stream.write_all(resp.as_bytes()).await; + Ok(()) +} diff --git a/ostp/src/main.rs b/ostp/src/main.rs index 72c6525..2a849ad 100644 --- a/ostp/src/main.rs +++ b/ostp/src/main.rs @@ -59,6 +59,7 @@ fn parse_ostp_link(link: &str) -> Result { Ok(ClientConfig { server, access_key, + mtu: None, socks5_bind: Some("127.0.0.1:1088".to_string()), // Fallback to standard SOCKS5 port tun: Some(TunConfig { enable: false, // Default to proxy, configurable via settings GUI @@ -188,6 +189,7 @@ struct FallbackCfg { struct ClientConfig { server: String, access_key: String, + mtu: Option, socks5_bind: Option, tun: Option, turn: Option, @@ -634,6 +636,7 @@ async fn run_client_directly(client_cfg: ClientConfig) -> Result<()> { access_key: client_cfg.access_key.clone(), handshake_timeout_ms: 5000, io_timeout_ms: 5000, + mtu: client_cfg.mtu.unwrap_or(1350), }, local_proxy: ostp_client::config::LocalProxyConfig { bind_addr: client_cfg.socks5_bind.clone().unwrap_or_else(|| "127.0.0.1:1088".to_string()),