From f8f27d366d6f5f9d64d6dcba1ea0522f51ae2b90 Mon Sep 17 00:00:00 2001 From: ospab Date: Fri, 19 Jun 2026 16:11:37 +0300 Subject: [PATCH] Fix empty handshake payload and dummy keys in ostp outbound client --- ostp-client/src/tunnel/outbounds/ostp.rs | 202 +++++++++++++---------- ostp-core/src/protocol.rs | 32 +++- 2 files changed, 145 insertions(+), 89 deletions(-) diff --git a/ostp-client/src/tunnel/outbounds/ostp.rs b/ostp-client/src/tunnel/outbounds/ostp.rs index 401ee48..7a5950f 100644 --- a/ostp-client/src/tunnel/outbounds/ostp.rs +++ b/ostp-client/src/tunnel/outbounds/ostp.rs @@ -5,6 +5,63 @@ use crate::config::{TransportConfig, MultiplexConfig}; use ostp_core::{OstpEvent, ProtocolAction, ProtocolConfig, ProtocolMachine}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; +/// Build the handshake payload the server expects: +/// [timestamp_u64_be (8 bytes)] [session_id_u32_be (4 bytes)] [access_key bytes] +fn build_handshake_payload(session_id: u32, access_key: &str) -> Vec { + let ts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let mut payload = Vec::with_capacity(12 + access_key.len()); + payload.extend_from_slice(&ts.to_be_bytes()); + payload.extend_from_slice(&session_id.to_be_bytes()); + payload.extend_from_slice(access_key.as_bytes()); + payload +} + +/// Build a correctly configured ProtocolConfig for an outgoing OSTP connection. +fn make_initiator_config( + session_id: u32, + access_key: &str, + transport_cfg: &TransportConfig, +) -> ProtocolConfig { + let secrets = ostp_core::crypto::derive_all_secrets(access_key.as_bytes()); + let payload = build_handshake_payload(session_id, access_key); + + let mtu = match transport_cfg.r#type.as_str() { + "dns" => 1100, + _ => 1350, + }; + + ProtocolConfig { + role: ostp_core::NoiseRole::Initiator, + psk: secrets.psk, + session_id, + handshake_payload: payload, + max_padding: 256, + padding_strategy: ostp_core::framing::PaddingStrategy::Adaptive, + obfuscation_key: secrets.obfuscation_key, + max_reorder: 16384, + max_reorder_buffer: 8192, + ack_delay_ms: 5, + rto_ms: 100, + max_retries: 8, + max_sent_history: 32768, + handshake_pad_min: secrets.handshake_pad_min, + handshake_pad_max: secrets.handshake_pad_max, + mtu, + } +} + +fn random_session_id() -> u32 { + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + let mut h = DefaultHasher::new(); + std::time::Instant::now().hash(&mut h); + std::thread::current().id().hash(&mut h); + h.finish() as u32 +} + pub async fn dial_tcp( server: &str, port: u16, @@ -17,54 +74,21 @@ pub async fn dial_tcp( let client_stream = tokio::net::TcpStream::connect(local_addr).await?; let (mut server_stream, _) = listener.accept().await?; - let transport = match transport_cfg.r#type.as_str() { - "dns" => { - let domain = transport_cfg.domain.clone().unwrap_or_else(|| "tunnel.example.com".to_string()); - let resolver = transport_cfg.resolver.clone().unwrap_or_else(|| "8.8.8.8".to_string()); - crate::transport::dns::start_dns_transport(domain, resolver, transport_cfg.pubkey.clone()).await? - } - // Fallback to UDP for now if unknown - _ => { - let udp = tokio::net::UdpSocket::bind("0.0.0.0:0").await?; - udp.connect((server, port)).await?; - crate::transport::Transport::Udp(std::sync::Arc::new(udp)) - } - }; - - let mut psk = [0u8; 32]; - let key_bytes = access_key.as_bytes(); - let len = key_bytes.len().min(32); - psk[..len].copy_from_slice(&key_bytes[..len]); - - let config = ProtocolConfig { - role: ostp_core::NoiseRole::Initiator, - psk, - session_id: 1, - handshake_payload: vec![], - max_padding: 0, - padding_strategy: ostp_core::framing::PaddingStrategy::Fixed(0), - obfuscation_key: [0; 8], - max_reorder: 16384, - max_reorder_buffer: 8192, - ack_delay_ms: 10, - rto_ms: 100, - max_retries: 5, - max_sent_history: 32768, - handshake_pad_min: 8, - handshake_pad_max: 24, - mtu: 1400, - }; - + let transport = make_transport(transport_cfg, server, port).await?; + + let session_id = random_session_id(); + let config = make_initiator_config(session_id, access_key, transport_cfg); let mut machine = ProtocolMachine::new(config).unwrap(); - + // Spawn bridge task tokio::spawn(async move { + // Send initial handshake if let Ok(action) = machine.on_event(OstpEvent::Start) { handle_action(action, &transport, &mut server_stream).await; } let mut buf = [0u8; 65535]; let mut udp_buf = [0u8; 65535]; - + loop { tokio::select! { Ok(n) = server_stream.read(&mut buf) => { @@ -100,63 +124,53 @@ pub async fn handle_udp( transport_cfg: &TransportConfig, _multiplex: &MultiplexConfig, ) -> Result<()> { - let transport = match transport_cfg.r#type.as_str() { - "dns" => { - let domain = transport_cfg.domain.clone().unwrap_or_else(|| "tunnel.example.com".to_string()); - let resolver = transport_cfg.resolver.clone().unwrap_or_else(|| "8.8.8.8".to_string()); - crate::transport::dns::start_dns_transport(domain, resolver, transport_cfg.pubkey.clone()).await? + let transport = make_transport(transport_cfg, server, port).await?; + + // Derive session_id from client source addr for stable per-flow sessions + let ip_bytes = match client_src.ip() { + std::net::IpAddr::V4(v4) => { + let o = v4.octets(); + u32::from_be_bytes(o) } - _ => { - let udp = tokio::net::UdpSocket::bind("0.0.0.0:0").await?; - udp.connect((server, port)).await?; - crate::transport::Transport::Udp(std::sync::Arc::new(udp)) + std::net::IpAddr::V6(v6) => { + let o = v6.octets(); + u32::from_be_bytes([o[12], o[13], o[14], o[15]]) } }; + let session_id = ip_bytes ^ (client_src.port() as u32); - let mut psk = [0u8; 32]; - let key_bytes = access_key.as_bytes(); - let len = key_bytes.len().min(32); - psk[..len].copy_from_slice(&key_bytes[..len]); - - let config = ProtocolConfig { - role: ostp_core::NoiseRole::Initiator, - psk, - session_id: u32::from_ne_bytes([ - client_src.ip().to_string().as_bytes().get(0).copied().unwrap_or(0), - client_src.ip().to_string().as_bytes().get(1).copied().unwrap_or(0), - client_src.ip().to_string().as_bytes().get(2).copied().unwrap_or(0), - client_src.ip().to_string().as_bytes().get(3).copied().unwrap_or(0), - ]), - handshake_payload: vec![], - max_padding: 0, - padding_strategy: ostp_core::framing::PaddingStrategy::Fixed(0), - obfuscation_key: [0; 8], - max_reorder: 4096, - max_reorder_buffer: 2048, - ack_delay_ms: 50, - rto_ms: 200, - max_retries: 3, - max_sent_history: 8192, - handshake_pad_min: 8, - handshake_pad_max: 24, - mtu: 1400, - }; - + let config = make_initiator_config(session_id, access_key, transport_cfg); let mut machine = ProtocolMachine::new(config)?; - // Send initial packet with UDP payload + // Send handshake first if let Ok(action) = machine.on_event(OstpEvent::Start) { handle_udp_action(action, &transport).await; } - // Send the actual UDP payload - let relay_msg = ostp_core::relay::RelayMessage::Connect(format!("{}:{}", target_dst.ip(), target_dst.port())); + // Wait for handshake response (server sends HandshakePayload back) + let mut buf = [0u8; 8192]; + match tokio::time::timeout( + std::time::Duration::from_millis(2000), + transport.recv(&mut buf), + ).await { + Ok(Ok(n)) => { + let _ = machine.on_event(OstpEvent::Inbound(bytes::Bytes::copy_from_slice(&buf[..n]))); + } + _ => { + tracing::warn!("UDP handshake timeout for {}:{}", server, port); + return Ok(()); + } + } + + // Send relay connect + data + let relay_msg = ostp_core::relay::RelayMessage::Connect( + format!("{}:{}", target_dst.ip(), target_dst.port()) + ); let encoded = relay_msg.encode(); if let Ok(action) = machine.on_event(OstpEvent::Outbound(1, bytes::Bytes::from(encoded))) { handle_udp_action(action, &transport).await; } - // Send data packet let data_msg = ostp_core::relay::RelayMessage::Data(payload.to_vec()); let encoded = data_msg.encode(); if let Ok(action) = machine.on_event(OstpEvent::Outbound(1, bytes::Bytes::from(encoded))) { @@ -165,10 +179,9 @@ pub async fn handle_udp( // Keep-alive for a short time to receive response for _ in 0..5 { - let mut buf = [0u8; 8192]; match tokio::time::timeout( std::time::Duration::from_millis(100), - transport.recv(&mut buf) + transport.recv(&mut buf), ).await { Ok(Ok(n)) => { let _ = machine.on_event(OstpEvent::Inbound(bytes::Bytes::copy_from_slice(&buf[..n]))); @@ -180,6 +193,27 @@ pub async fn handle_udp( Ok(()) } +async fn make_transport( + transport_cfg: &TransportConfig, + server: &str, + port: u16, +) -> Result { + match transport_cfg.r#type.as_str() { + "dns" => { + let domain = transport_cfg.domain.clone() + .unwrap_or_else(|| "tunnel.example.com".to_string()); + let resolver = transport_cfg.resolver.clone() + .unwrap_or_else(|| "8.8.8.8".to_string()); + crate::transport::dns::start_dns_transport(domain, resolver, transport_cfg.pubkey.clone()).await + } + _ => { + let udp = tokio::net::UdpSocket::bind("0.0.0.0:0").await?; + udp.connect((server, port)).await?; + Ok(crate::transport::Transport::Udp(std::sync::Arc::new(udp))) + } + } +} + async fn handle_udp_action(action: ProtocolAction, transport: &crate::transport::Transport) { match action { ProtocolAction::SendDatagram(data) => { diff --git a/ostp-core/src/protocol.rs b/ostp-core/src/protocol.rs index a2dd6bb..84b8692 100644 --- a/ostp-core/src/protocol.rs +++ b/ostp-core/src/protocol.rs @@ -392,12 +392,34 @@ impl ProtocolMachine { self.last_recv_advance = Instant::now(); } else { // Gap detected - if self.reorder_buffer.len() < self.max_reorder_buffer { - self.reorder_buffer.insert(nonce, action); - } else { - tracing::warn!("Reorder buffer full ({}/{}), dropping frame nonce={}", - self.reorder_buffer.len(), self.max_reorder_buffer, nonce + if self.reorder_buffer.len() >= self.max_reorder_buffer { + tracing::warn!("Reorder buffer full ({}/{}), forcing gap recovery to prevent packet drops", + self.reorder_buffer.len(), self.max_reorder_buffer ); + if let Some(&first_buffered) = self.reorder_buffer.keys().next() { + let skipped = first_buffered.saturating_sub(self.expected_recv_nonce); + self.expected_recv_nonce = first_buffered; + self.last_recv_advance = Instant::now(); + + let mut delivered = 0u64; + while let Some(buffered_action) = self.reorder_buffer.remove(&self.expected_recv_nonce) { + app_actions.push(buffered_action); + self.expected_recv_nonce = self.expected_recv_nonce.saturating_add(1); + delivered += 1; + } + self.ack_pending = true; + tracing::debug!("Forced Gap recovery: skipped {} lost frames, delivered {} buffered frames", skipped, delivered); + } + } + + if nonce >= self.expected_recv_nonce { + if self.reorder_buffer.len() < self.max_reorder_buffer { + self.reorder_buffer.insert(nonce, action); + } else { + tracing::warn!("Reorder buffer still full after gap recovery, dropping frame nonce={}", nonce); + } + } else { + tracing::debug!("Frame nonce={} arrived too late after gap recovery, dropping", nonce); } // Rate-limited NACK: send at most once per 30ms to prevent retransmit storms.