From fc339b364355e63248e2b68860da31cf6662880d Mon Sep 17 00:00:00 2001 From: ospab Date: Fri, 19 Jun 2026 19:14:46 +0300 Subject: [PATCH] feat(server): log reasons for dropped packets --- ostp-client/src/tunnel/outbounds/ostp.rs | 48 ++++++++++++------------ ostp-server/src/dispatcher.rs | 38 +++++++++++++------ ostp-server/src/lib.rs | 13 +++---- ostp-server/src/tui.rs | 2 +- 4 files changed, 59 insertions(+), 42 deletions(-) diff --git a/ostp-client/src/tunnel/outbounds/ostp.rs b/ostp-client/src/tunnel/outbounds/ostp.rs index b975907..801a057 100644 --- a/ostp-client/src/tunnel/outbounds/ostp.rs +++ b/ostp-client/src/tunnel/outbounds/ostp.rs @@ -84,31 +84,33 @@ pub async fn dial_tcp( let target_host_str = target_host.to_string(); - // Spawn bridge task - tokio::spawn(async move { - // Send initial handshake - if let Ok(action) = machine.on_event(OstpEvent::Start) { - handle_action(action, &transport, &mut server_stream).await; - } - - // Wait for handshake response (server sends HandshakePayload back) - let mut buf = [0u8; 8192]; - let mut handshake_success = false; - match tokio::time::timeout( - std::time::Duration::from_millis(3000), - transport.recv(&mut buf), - ).await { - Ok(Ok(n)) => { - if let Ok(action) = machine.on_event(OstpEvent::Inbound(bytes::Bytes::copy_from_slice(&buf[..n]))) { - handle_action(action, &transport, &mut server_stream).await; - handshake_success = true; + let server_str = server.to_string(); + + // Spawn bridge task + tokio::spawn(async move { + // Send initial handshake + if let Ok(action) = machine.on_event(OstpEvent::Start) { + handle_action(action, &transport, &mut server_stream).await; + } + + // Wait for handshake response (server sends HandshakePayload back) + let mut buf = [0u8; 8192]; + let mut handshake_success = false; + match tokio::time::timeout( + std::time::Duration::from_millis(3000), + transport.recv(&mut buf), + ).await { + Ok(Ok(n)) => { + if let Ok(action) = machine.on_event(OstpEvent::Inbound(bytes::Bytes::copy_from_slice(&buf[..n]))) { + handle_action(action, &transport, &mut server_stream).await; + handshake_success = true; + } + } + _ => { + tracing::warn!("TCP handshake timeout for {}:{}", server_str, port); + return; } } - _ => { - tracing::warn!("TCP handshake timeout for {}:{}", server, port); - return; - } - } if !handshake_success { tracing::warn!("TCP handshake failed or protocol machine error"); diff --git a/ostp-server/src/dispatcher.rs b/ostp-server/src/dispatcher.rs index 3ebb6d4..d8355fd 100644 --- a/ostp-server/src/dispatcher.rs +++ b/ostp-server/src/dispatcher.rs @@ -12,7 +12,7 @@ use portable_atomic::AtomicU64; // const MAX_SESSIONS removed because dynamic limit is used pub enum DispatchOutcome { - Unauthorized, + Unauthorized(String), Accepted { responses: Vec, app_payloads: Vec<(u32, u16, Bytes)>, // session_id, stream_id, payload @@ -182,7 +182,7 @@ impl Dispatcher { pub fn on_datagram(&mut self, peer: SocketAddr, packet: Bytes) -> Result { if packet.len() < 4 { - return Ok(DispatchOutcome::Unauthorized); + return Ok(DispatchOutcome::Unauthorized("packet too short".to_string())); } let mut session_id_opt = None; @@ -239,7 +239,7 @@ impl Dispatcher { tracing::info!("Dropping session {} for key {} (valid={}, over_limit={})", session_id, access_key, key_valid, user_stats.is_over_limit()); self.drop_session(session_id); - return Ok(DispatchOutcome::Unauthorized); + return Ok(DispatchOutcome::Unauthorized("key invalid or over limit".to_string())); } } @@ -260,8 +260,7 @@ impl Dispatcher { let action = match peer_state.machine.on_event(OstpEvent::Inbound(packet)) { Ok(a) => a, Err(e) => { - tracing::warn!("Protocol error for session {}: {}", session_id, e); - return Ok(DispatchOutcome::Unauthorized); + return Ok(DispatchOutcome::Unauthorized(format!("protocol error: {}", e))); } }; @@ -303,13 +302,17 @@ impl Dispatcher { // Not an existing session — try each registered access key's derived obfuscation key let keys_snapshot: Vec = self.access_keys.read().unwrap_or_else(|e| e.into_inner()).keys().cloned().collect(); + let mut failed_trials = Vec::new(); for candidate_key in keys_snapshot { let secrets = ostp_core::crypto::derive_all_secrets(candidate_key.as_bytes()); // Decode the session_id using this key's obfuscation // The handshake mask is derived from the Noise payload at bytes [6..], // so we must deobfuscate the full packet, not just the header. - if packet.len() < 7 { continue; } + if packet.len() < 7 { + failed_trials.push(format!("key {}: packet too short", candidate_key)); + continue; + } let mut trial = packet.to_vec(); ostp_core::crypto::deobfuscate_packet_inplace(&mut trial, &secrets.obfuscation_key, true); let candidate_session_id = u32::from_be_bytes([trial[0], trial[1], trial[2], trial[3]]); @@ -331,7 +334,10 @@ impl Dispatcher { }; let action = match machine.on_event(OstpEvent::Inbound(packet.clone())) { Ok(a) => a, - Err(_) => continue, + Err(e) => { + failed_trials.push(format!("key {}: crypto err: {}", candidate_key, e)); + continue; + } }; if let ProtocolAction::HandshakePayload(payload, response_opt) = action { @@ -345,6 +351,7 @@ impl Dispatcher { let sid_from_payload = u32::from_be_bytes(sid_bytes); if sid_from_payload != candidate_session_id { + failed_trials.push(format!("key {}: sid mismatch", candidate_key)); continue; } @@ -352,6 +359,7 @@ impl Dispatcher { if let Ok(key_from_payload) = std::str::from_utf8(key_bytes) { // The key embedded in the payload must match the candidate key we decoded with if key_from_payload != candidate_key { + failed_trials.push(format!("key {}: embedded key mismatch", candidate_key)); continue; } @@ -362,14 +370,16 @@ impl Dispatcher { let drift = (now as i64 - ts as i64).abs(); if drift > 300 { - tracing::warn!("Handshake rejected: timestamp drift {}s exceeds 300s limit (peer={})", drift, peer); + let reason = format!("timestamp drift {}s exceeds 300s limit", drift); + tracing::warn!("Handshake rejected for {}: {}", peer, reason); + failed_trials.push(format!("key {}: {}", candidate_key, reason)); continue; } if !self.replay_cache.contains_key(&payload.to_vec()) { if self.replay_cache.len() >= 50_000 { tracing::warn!("Replay cache full (100000 entries), rejecting handshake from {}", peer); - return Ok(DispatchOutcome::Unauthorized); + return Ok(DispatchOutcome::Unauthorized("replay cache full".to_string())); } self.replay_cache.insert(payload.to_vec(), ts); @@ -383,7 +393,7 @@ impl Dispatcher { // Check traffic limit before accepting if user_stats.is_over_limit() { tracing::warn!("User {} exceeded traffic limit, rejecting handshake from {}", candidate_key, peer); - return Ok(DispatchOutcome::Unauthorized); + return Ok(DispatchOutcome::Unauthorized("user over traffic limit".to_string())); } self.peer_machines.insert(candidate_session_id, PeerState { @@ -410,7 +420,13 @@ impl Dispatcher { } } - Ok(DispatchOutcome::Unauthorized) + let reason = if failed_trials.is_empty() { + "no valid handshake payload found".to_string() + } else { + format!("all key trials failed: {}", failed_trials.join(", ")) + }; + + Ok(DispatchOutcome::Unauthorized(reason)) } pub fn outbound_to_session(&mut self, session_id: u32, stream_id: u16, payload: Bytes) -> Result> { diff --git a/ostp-server/src/lib.rs b/ostp-server/src/lib.rs index cd985db..e77f91e 100644 --- a/ostp-server/src/lib.rs +++ b/ostp-server/src/lib.rs @@ -45,7 +45,7 @@ pub(crate) enum UiEvent { PeerSeen { peer: IpAddr }, #[allow(dead_code)] Rx { peer: IpAddr, bytes: usize }, #[allow(dead_code)] Tx { peer: IpAddr, bytes: usize }, - UnauthorizedProbe { peer: IpAddr, bytes: usize }, + UnauthorizedProbe { peer: IpAddr, bytes: usize, reason: String }, KeyCreated { key: String }, Log(String), #[allow(dead_code)] @@ -328,10 +328,9 @@ pub async fn run_server( UiEvent::KeyCreated { key } => { tracing::info!("Access key created: {key}"); } - UiEvent::UnauthorizedProbe { peer, bytes } => { - if debug { - tracing::debug!("Unauthorized probe from {peer} ({bytes} bytes)"); - } + UiEvent::UnauthorizedProbe { peer, bytes, reason } => { + // Make it a warn so it's always visible outside debug mode! + tracing::warn!("Unauthorized probe from {peer} ({bytes} bytes): {reason}"); } UiEvent::PeerSeen { .. } => {} _ => {} @@ -576,8 +575,8 @@ async fn handle_udp_packet( ) -> Result<()> { let size = packet.len(); match dispatcher.on_datagram(peer, packet) { - Ok(DispatchOutcome::Unauthorized) => { - let _ = ui_event_tx.send(UiEvent::UnauthorizedProbe { peer: peer.ip(), bytes: size }); + Ok(DispatchOutcome::Unauthorized(reason)) => { + let _ = ui_event_tx.send(UiEvent::UnauthorizedProbe { peer: peer.ip(), bytes: size, reason }); } Ok(DispatchOutcome::Accepted { responses, app_payloads, peer_addr }) => { let peer_ip = peer_addr.ip(); diff --git a/ostp-server/src/tui.rs b/ostp-server/src/tui.rs index 6dc1a5f..f327c3e 100644 --- a/ostp-server/src/tui.rs +++ b/ostp-server/src/tui.rs @@ -12,7 +12,7 @@ pub enum UiEvent { PeerSeen { peer: IpAddr }, Rx { peer: IpAddr, bytes: usize }, Tx { peer: IpAddr, bytes: usize }, - UnauthorizedProbe { peer: IpAddr, bytes: usize }, + UnauthorizedProbe { peer: IpAddr, bytes: usize, reason: String }, KeyCreated { key: String }, Log(String), KeyCount(usize),