From 032f694821966a633f55f8de86f04e364d65e15d Mon Sep 17 00:00:00 2001 From: ospab Date: Sun, 17 May 2026 14:31:21 +0300 Subject: [PATCH] feat: comprehensive diagnostic logging across all modules protocol.rs: - Gap recovery logs: skipped frames count, delivered count, remaining buffer - Duplicate frame detection with nonce values - Max reorder window exceeded with gap width - NACK handling: retransmit success vs frame evicted from history - Reorder buffer overflow with capacity stats - Close frame receipt - Zombie frame eviction count - sent_history overflow (the root cause of speedtest death) dispatcher.rs: - New session authentication with peer IP, session count, replay cache size - Client roaming detection (old addr -> new addr) - Handshake rejection reasons: timestamp drift, replay cache full, max sessions - Protocol errors and tick errors with session context bridge.rs: - UDP socket buffer diagnostics (requested vs actual) - Handshake response size and RTT - Inbound protocol errors with session index - Outbound packing errors with stream_id signal.rs: - Specific shutdown signal identification (SIGTERM/SIGINT/Ctrl+C/Close/Break) server lib.rs: - Startup banner with access key count and ARQ config - UDP buffer diagnostics - Relay CONNECT/CLOSE/error always visible (not gated by debug) - All println! -> eprintln! for proper stderr logging - Hot-reload prefix fixed [ostp-server] -> [ostp] --- ostp-client/src/bridge.rs | 10 ++++++-- ostp-client/src/signal.rs | 11 ++++++-- ostp-core/src/protocol.rs | 47 +++++++++++++++++++++++++++-------- ostp-server/src/dispatcher.rs | 30 ++++++++++++++++------ ostp-server/src/lib.rs | 25 +++++++++++++------ 5 files changed, 93 insertions(+), 30 deletions(-) diff --git a/ostp-client/src/bridge.rs b/ostp-client/src/bridge.rs index 11cc2ec..ac36467 100644 --- a/ostp-client/src/bridge.rs +++ b/ostp-client/src/bridge.rs @@ -147,6 +147,7 @@ impl Bridge { Ok(a) => a, Err(e) => { let _ = tx.send(UiEvent::Log(format!("Protocol decrypt error: {e}"))).await; + eprintln!("[ostp] Inbound protocol error (session {}): {}", session_index, e); continue; } }; @@ -507,6 +508,7 @@ impl Bridge { } } Err(e) => { + eprintln!("[ostp] Protocol error packing outbound stream_id={}: {}", stream_id, e); let _ = tx.send(UiEvent::Log(format!("Protocol error packing TCP: {e}"))).await; } } @@ -614,6 +616,9 @@ impl Bridge { let sock = socket2::Socket::new(domain, socket2::Type::DGRAM, Some(socket2::Protocol::UDP))?; let _ = sock.set_recv_buffer_size(33554432); // 32MB let _ = sock.set_send_buffer_size(33554432); // 32MB + let actual_recv = sock.recv_buffer_size().unwrap_or(0); + let actual_send = sock.send_buffer_size().unwrap_or(0); + eprintln!("[ostp] UDP socket buffers: recv={}KB send={}KB", actual_recv / 1024, actual_send / 1024); sock.bind(&addr.into())?; sock.set_nonblocking(true)?; let socket = UdpSocket::from_std(sock.into())?; @@ -671,6 +676,7 @@ impl Bridge { .await .context("handshake timeout waiting server response")??; self.metrics.bytes_recv.fetch_add(size as u64, Ordering::Relaxed); + eprintln!("[ostp] Handshake response received: {} bytes", size); let inbound = if self.turn_enabled && size >= 4 && buf[0] == 0x40 && buf[1] == 0x00 { let len = u16::from_be_bytes([buf[2], buf[3]]) as usize; @@ -684,8 +690,8 @@ impl Bridge { }; machine.on_event(OstpEvent::Inbound(inbound))?; let rtt_ms = start.elapsed().as_secs_f64() * 1000.0; - - // Success + eprintln!("[ostp] Handshake complete: session={:#010x} rtt={:.1}ms", session_id, rtt_ms); + Ok((socket, machine, rtt_ms)) } diff --git a/ostp-client/src/signal.rs b/ostp-client/src/signal.rs index 9b696ef..d91e3ca 100644 --- a/ostp-client/src/signal.rs +++ b/ostp-client/src/signal.rs @@ -8,8 +8,12 @@ pub async fn wait_for_shutdown_signal() -> Result<()> { let mut sigint = signal(SignalKind::interrupt())?; tokio::select! { - _ = sigterm.recv() => {} - _ = sigint.recv() => {} + _ = sigterm.recv() => { + eprintln!("[ostp] Received SIGTERM, shutting down"); + } + _ = sigint.recv() => { + eprintln!("[ostp] Received SIGINT, shutting down"); + } } Ok(()) @@ -26,16 +30,19 @@ pub async fn wait_for_shutdown_signal() -> Result<()> { tokio::select! { res = c_c.recv() => { + eprintln!("[ostp] Received Ctrl+C, shutting down"); if res.is_none() { std::future::pending::<()>().await; } } res = c_close.recv() => { + eprintln!("[ostp] Received console close event, shutting down"); if res.is_none() { std::future::pending::<()>().await; } } res = c_break.recv() => { + eprintln!("[ostp] Received Ctrl+Break, shutting down"); if res.is_none() { std::future::pending::<()>().await; } diff --git a/ostp-core/src/protocol.rs b/ostp-core/src/protocol.rs index 8b1d8a5..6f9fa98 100644 --- a/ostp-core/src/protocol.rs +++ b/ostp-core/src/protocol.rs @@ -243,17 +243,19 @@ impl ProtocolMachine { let nonce = u64::from_be_bytes(raw_vec[4..12].try_into().unwrap()); if nonce < self.expected_recv_nonce { - // Duplicate packet! The ACK we sent was likely lost or delayed. - // We MUST trigger an immediate ACK to unblock the sender's congestion window. + // Duplicate — the ACK we sent was likely lost or delayed. + eprintln!("[ostp] Duplicate frame nonce={} (expected {}), forcing ACK", nonce, self.expected_recv_nonce); if let Some(ack_frame) = self.force_build_ack()? { return Ok(ProtocolAction::SendDatagram(ack_frame)); } return Ok(ProtocolAction::Noop); } - // Buffer limit to prevent memory bloat, widened to handle high latency/speed gaps if nonce > self.expected_recv_nonce + self.max_reorder { - // Treat as heavy loss: request retransmit of the earliest missing packet. + eprintln!( + "[ostp] Frame nonce={} exceeds max reorder window (expected={}, max_gap={}), sending NACK", + nonce, self.expected_recv_nonce, self.max_reorder + ); if let Ok(nack_frame) = self.build_control_datagram( 0, FrameKind::Nack, @@ -280,9 +282,11 @@ impl ProtocolMachine { if packet.header.kind == FrameKind::Nack { if packet.payload.len() >= 8 { let req_nonce = u64::from_be_bytes(packet.payload[..8].try_into().unwrap()); - // Search history from back to front (newest most likely requested) if let Some(cached_frame) = self.lookup_sent_frame(req_nonce) { + eprintln!("[ostp] NACK received: retransmitting nonce={}", req_nonce); outbound_actions.push(ProtocolAction::SendDatagram(cached_frame)); + } else { + eprintln!("[ostp] NACK received: nonce={} not found in sent_history (evicted)", req_nonce); } } } @@ -297,6 +301,7 @@ impl ProtocolMachine { ProtocolAction::DeliverApp(packet.header.stream_id, packet.payload) } FrameKind::Close => { + eprintln!("[ostp] Received Close frame, terminating session"); self.state = OstpState::Closed; ProtocolAction::Noop } @@ -326,9 +331,14 @@ impl ProtocolMachine { } self.last_recv_advance = Instant::now(); } else { - // Gap detected! Buffer current packet and request retransmit of the gap packet. + // Gap detected if self.reorder_buffer.len() < self.max_reorder_buffer { self.reorder_buffer.insert(nonce, action); + } else { + eprintln!( + "[ostp] Reorder buffer full ({}/{}), dropping frame nonce={}", + self.reorder_buffer.len(), self.max_reorder_buffer, nonce + ); } // Rate-limited NACK: send at most once per 30ms to prevent retransmit storms. @@ -441,16 +451,21 @@ impl ProtocolMachine { && self.last_recv_advance.elapsed() > Duration::from_secs(5) { if let Some(&first_buffered) = self.reorder_buffer.keys().next() { - // Skip expected_recv_nonce forward to the first buffered frame + let skipped = first_buffered.saturating_sub(self.expected_recv_nonce); self.expected_recv_nonce = first_buffered; self.last_recv_advance = Instant::now(); - // Drain all continuous frames from the reorder buffer + let mut delivered = 0u64; while let Some(buffered_action) = self.reorder_buffer.remove(&self.expected_recv_nonce) { actions.push(buffered_action); self.expected_recv_nonce = self.expected_recv_nonce.saturating_add(1); + delivered += 1; } self.ack_pending = true; + eprintln!( + "[ostp] Gap recovery: skipped {} lost frames, delivered {} buffered frames (reorder_buf={})", + skipped, delivered, self.reorder_buffer.len() + ); } } @@ -467,7 +482,12 @@ impl ProtocolMachine { // Shorter grace period than before (was +4) to free memory faster // after high-throughput bursts. let grace = self.max_retries.saturating_add(2); + let before = self.sent_history.len(); self.sent_history.retain(|f| !f.is_retransmittable || f.retries <= grace); + let evicted = before - self.sent_history.len(); + if evicted > 0 { + eprintln!("[ostp] Evicted {} zombie frames from sent_history (remaining={})", evicted, self.sent_history.len()); + } // ── Retransmit expired frames ──────────────────────────────── // Limit retransmits per tick to prevent bandwidth saturation @@ -600,8 +620,15 @@ impl ProtocolMachine { retries: 0, is_retransmittable, }); - while self.sent_history.len() > self.max_sent_history { - self.sent_history.pop_front(); + if self.sent_history.len() > self.max_sent_history { + let overflow = self.sent_history.len() - self.max_sent_history; + eprintln!( + "[ostp] sent_history overflow: evicting {} oldest frames (cap={})", + overflow, self.max_sent_history + ); + while self.sent_history.len() > self.max_sent_history { + self.sent_history.pop_front(); + } } } diff --git a/ostp-server/src/dispatcher.rs b/ostp-server/src/dispatcher.rs index fbcac99..9e6ed26 100644 --- a/ostp-server/src/dispatcher.rs +++ b/ostp-server/src/dispatcher.rs @@ -99,8 +99,8 @@ impl Dispatcher { if let Some(session_id) = session_id_opt { if let Some(peer_state) = self.peer_machines.get_mut(&session_id) { - // Update address on seamless roaming: remove old mapping to prevent HashMap leak if peer_state.last_addr != peer { + eprintln!("[ostp] Client roamed: session {} from {} to {}", session_id, peer_state.last_addr, peer); self.addr_to_session.remove(&peer_state.last_addr); } peer_state.last_addr = peer; @@ -109,7 +109,10 @@ impl Dispatcher { let action = match peer_state.machine.on_event(OstpEvent::Inbound(packet)) { Ok(a) => a, - Err(_) => return Ok(DispatchOutcome::Unauthorized), + Err(e) => { + eprintln!("[ostp] Protocol error for session {}: {}", session_id, e); + return Ok(DispatchOutcome::Unauthorized); + } }; let mut responses = Vec::new(); @@ -168,7 +171,10 @@ impl Dispatcher { let mut machine = match ProtocolMachine::new(cfg) { Ok(m) => m, - Err(_) => continue, + Err(e) => { + eprintln!("[ostp] Failed to create protocol machine for key trial: {}", e); + continue; + } }; let action = match machine.on_event(OstpEvent::Inbound(packet.clone())) { Ok(a) => a, @@ -203,18 +209,17 @@ impl Dispatcher { let drift = (now as i64 - ts as i64).abs(); if drift > 300 { - // Narrow window (5 mins) limits replay risk and bounds cache memory + eprintln!("[ostp] Handshake rejected: timestamp drift {}s exceeds 300s limit (peer={})", drift, peer); continue; } if !self.replay_cache.contains_key(&payload.to_vec()) { - // Hard cap: prevent OOM under DDoS — replay cache grows - // unboundedly between purge ticks without this limit. if self.replay_cache.len() >= 100_000 { + eprintln!("[ostp] Replay cache full (100000 entries), rejecting handshake from {}", peer); return Ok(DispatchOutcome::Unauthorized); } - // §4 fix: hard cap on concurrent sessions to prevent RAM exhaustion if self.peer_machines.len() >= MAX_SESSIONS { + eprintln!("[ostp] Max sessions reached ({}), rejecting handshake from {}", MAX_SESSIONS, peer); return Ok(DispatchOutcome::Unauthorized); } @@ -230,6 +235,11 @@ impl Dispatcher { }); self.addr_to_session.insert(peer, candidate_session_id); + eprintln!( + "[ostp] New session authenticated: sid={} peer={} (active_sessions={}, replay_cache={})", + candidate_session_id, peer, self.peer_machines.len(), self.replay_cache.len() + ); + return Ok(DispatchOutcome::Accepted { responses: response_opt.into_iter().collect(), app_payloads: Vec::new(), @@ -280,6 +290,7 @@ impl Dispatcher { // Clear expired sessions from internal state for sid in &expired { + eprintln!("[ostp] Session {} expired (inactive >5min), releasing", sid); self.drop_session(*sid); } @@ -287,7 +298,10 @@ impl Dispatcher { for peer_state in self.peer_machines.values_mut() { let action = match peer_state.machine.on_event(OstpEvent::Tick) { Ok(a) => a, - Err(_) => continue, + Err(e) => { + eprintln!("[ostp] Tick error for session: {}", e); + continue; + } }; let mut queue = vec![action]; diff --git a/ostp-server/src/lib.rs b/ostp-server/src/lib.rs index 265890a..7b15a29 100644 --- a/ostp-server/src/lib.rs +++ b/ostp-server/src/lib.rs @@ -106,7 +106,7 @@ pub async fn run_server( } let mut keys_lock = shared_keys_clone.write().unwrap(); *keys_lock = new_keys; - println!("[ostp-server] Hot-reloaded {} access keys from config.json", keys_lock.len()); + eprintln!("[ostp] Hot-reloaded {} access keys from config.json", keys_lock.len()); } } } @@ -121,6 +121,9 @@ pub async fn run_server( let sock = socket2::Socket::new(domain, socket2::Type::DGRAM, Some(socket2::Protocol::UDP))?; let _ = sock.set_recv_buffer_size(33554432); // 32MB let _ = sock.set_send_buffer_size(33554432); // 32MB + let actual_recv = sock.recv_buffer_size().unwrap_or(0); + let actual_send = sock.send_buffer_size().unwrap_or(0); + eprintln!("[ostp] UDP socket buffers: recv={}KB send={}KB", actual_recv / 1024, actual_send / 1024); sock.bind(&addr.into())?; sock.set_nonblocking(true)?; let socket = UdpSocket::from_std(sock.into())?; @@ -156,17 +159,20 @@ pub async fn run_server( let is_essential = msg.starts_with("Client ") || msg.starts_with("Listening") || msg.starts_with("Shutdown") - || msg.starts_with("Session "); + || msg.starts_with("Session ") + || msg.starts_with("Relay CONNECT") + || msg.starts_with("Relay CLOSE") + || msg.starts_with("Relay error"); if debug || is_essential { - println!("[ostp] {msg}"); + eprintln!("[ostp] {msg}"); } } UiEvent::KeyCreated { key } => { - println!("[ostp] Access key created: {key}"); + eprintln!("[ostp] Access key created: {key}"); } UiEvent::UnauthorizedProbe { peer, bytes } => { if debug { - println!("[ostp] Unauthorized probe from {peer} ({bytes} bytes)"); + eprintln!("[ostp] Unauthorized probe from {peer} ({bytes} bytes)"); } } UiEvent::PeerSeen { .. } => {} @@ -175,7 +181,9 @@ pub async fn run_server( } }); - println!("[ostp] Listening on {bind_addr}"); + let key_count = shared_keys.read().unwrap().len(); + eprintln!("[ostp] Listening on {bind_addr} ({key_count} access keys loaded)"); + eprintln!("[ostp] ARQ config: max_reorder=16384, reorder_buf=8192, sent_history=32768, rto=100ms"); tokio::select! { res = run_server_loop(socket, dispatcher, max_datagram_size, ui_cmd_rx, ui_event_tx, shared_keys, outbound, debug) => { if let Err(e) = res { @@ -183,7 +191,7 @@ pub async fn run_server( } } _ = wait_for_shutdown_signal() => { - println!("[ostp] Shutdown signal received"); + eprintln!("[ostp] Shutdown signal received"); } } @@ -446,6 +454,7 @@ async fn handle_relay_message( RelayMessage::Close => { if let Some(state) = remotes.remove(&(session_id, stream_id)) { let _ = state.cancel_tx.try_send(()); + let _ = ui_event_tx.send(UiEvent::Log(format!("Relay CLOSE [{session_id}:{stream_id}]"))); } } RelayMessage::ConnectOk => {} @@ -536,7 +545,7 @@ async fn select_outbound_action( let action = matched.unwrap_or(outbound.default_action); if debug { - println!("[ostp-server] outbound decision target={target} action={action:?}"); + eprintln!("[ostp] Outbound routing: target={target} action={action:?}"); } action }