feat: comprehensive diagnostic logging across all modules

protocol.rs:
- Gap recovery logs: skipped frames count, delivered count, remaining buffer
- Duplicate frame detection with nonce values
- Max reorder window exceeded with gap width
- NACK handling: retransmit success vs frame evicted from history
- Reorder buffer overflow with capacity stats
- Close frame receipt
- Zombie frame eviction count
- sent_history overflow (the root cause of speedtest death)

dispatcher.rs:
- New session authentication with peer IP, session count, replay cache size
- Client roaming detection (old addr -> new addr)
- Handshake rejection reasons: timestamp drift, replay cache full, max sessions
- Protocol errors and tick errors with session context

bridge.rs:
- UDP socket buffer diagnostics (requested vs actual)
- Handshake response size and RTT
- Inbound protocol errors with session index
- Outbound packing errors with stream_id

signal.rs:
- Specific shutdown signal identification (SIGTERM/SIGINT/Ctrl+C/Close/Break)

server lib.rs:
- Startup banner with access key count and ARQ config
- UDP buffer diagnostics
- Relay CONNECT/CLOSE/error always visible (not gated by debug)
- All println! -> eprintln! for proper stderr logging
- Hot-reload prefix fixed [ostp-server] -> [ostp]
This commit is contained in:
ospab 2026-05-17 14:31:21 +03:00
parent f8aa8906ff
commit 032f694821
5 changed files with 93 additions and 30 deletions

View File

@ -147,6 +147,7 @@ impl Bridge {
Ok(a) => a, Ok(a) => a,
Err(e) => { Err(e) => {
let _ = tx.send(UiEvent::Log(format!("Protocol decrypt error: {e}"))).await; let _ = tx.send(UiEvent::Log(format!("Protocol decrypt error: {e}"))).await;
eprintln!("[ostp] Inbound protocol error (session {}): {}", session_index, e);
continue; continue;
} }
}; };
@ -507,6 +508,7 @@ impl Bridge {
} }
} }
Err(e) => { Err(e) => {
eprintln!("[ostp] Protocol error packing outbound stream_id={}: {}", stream_id, e);
let _ = tx.send(UiEvent::Log(format!("Protocol error packing TCP: {e}"))).await; let _ = tx.send(UiEvent::Log(format!("Protocol error packing TCP: {e}"))).await;
} }
} }
@ -614,6 +616,9 @@ impl Bridge {
let sock = socket2::Socket::new(domain, socket2::Type::DGRAM, Some(socket2::Protocol::UDP))?; let sock = socket2::Socket::new(domain, socket2::Type::DGRAM, Some(socket2::Protocol::UDP))?;
let _ = sock.set_recv_buffer_size(33554432); // 32MB let _ = sock.set_recv_buffer_size(33554432); // 32MB
let _ = sock.set_send_buffer_size(33554432); // 32MB let _ = sock.set_send_buffer_size(33554432); // 32MB
let actual_recv = sock.recv_buffer_size().unwrap_or(0);
let actual_send = sock.send_buffer_size().unwrap_or(0);
eprintln!("[ostp] UDP socket buffers: recv={}KB send={}KB", actual_recv / 1024, actual_send / 1024);
sock.bind(&addr.into())?; sock.bind(&addr.into())?;
sock.set_nonblocking(true)?; sock.set_nonblocking(true)?;
let socket = UdpSocket::from_std(sock.into())?; let socket = UdpSocket::from_std(sock.into())?;
@ -671,6 +676,7 @@ impl Bridge {
.await .await
.context("handshake timeout waiting server response")??; .context("handshake timeout waiting server response")??;
self.metrics.bytes_recv.fetch_add(size as u64, Ordering::Relaxed); self.metrics.bytes_recv.fetch_add(size as u64, Ordering::Relaxed);
eprintln!("[ostp] Handshake response received: {} bytes", size);
let inbound = if self.turn_enabled && size >= 4 && buf[0] == 0x40 && buf[1] == 0x00 { let inbound = if self.turn_enabled && size >= 4 && buf[0] == 0x40 && buf[1] == 0x00 {
let len = u16::from_be_bytes([buf[2], buf[3]]) as usize; let len = u16::from_be_bytes([buf[2], buf[3]]) as usize;
@ -684,8 +690,8 @@ impl Bridge {
}; };
machine.on_event(OstpEvent::Inbound(inbound))?; machine.on_event(OstpEvent::Inbound(inbound))?;
let rtt_ms = start.elapsed().as_secs_f64() * 1000.0; let rtt_ms = start.elapsed().as_secs_f64() * 1000.0;
eprintln!("[ostp] Handshake complete: session={:#010x} rtt={:.1}ms", session_id, rtt_ms);
// Success
Ok((socket, machine, rtt_ms)) Ok((socket, machine, rtt_ms))
} }

View File

@ -8,8 +8,12 @@ pub async fn wait_for_shutdown_signal() -> Result<()> {
let mut sigint = signal(SignalKind::interrupt())?; let mut sigint = signal(SignalKind::interrupt())?;
tokio::select! { tokio::select! {
_ = sigterm.recv() => {} _ = sigterm.recv() => {
_ = sigint.recv() => {} eprintln!("[ostp] Received SIGTERM, shutting down");
}
_ = sigint.recv() => {
eprintln!("[ostp] Received SIGINT, shutting down");
}
} }
Ok(()) Ok(())
@ -26,16 +30,19 @@ pub async fn wait_for_shutdown_signal() -> Result<()> {
tokio::select! { tokio::select! {
res = c_c.recv() => { res = c_c.recv() => {
eprintln!("[ostp] Received Ctrl+C, shutting down");
if res.is_none() { if res.is_none() {
std::future::pending::<()>().await; std::future::pending::<()>().await;
} }
} }
res = c_close.recv() => { res = c_close.recv() => {
eprintln!("[ostp] Received console close event, shutting down");
if res.is_none() { if res.is_none() {
std::future::pending::<()>().await; std::future::pending::<()>().await;
} }
} }
res = c_break.recv() => { res = c_break.recv() => {
eprintln!("[ostp] Received Ctrl+Break, shutting down");
if res.is_none() { if res.is_none() {
std::future::pending::<()>().await; std::future::pending::<()>().await;
} }

View File

@ -243,17 +243,19 @@ impl ProtocolMachine {
let nonce = u64::from_be_bytes(raw_vec[4..12].try_into().unwrap()); let nonce = u64::from_be_bytes(raw_vec[4..12].try_into().unwrap());
if nonce < self.expected_recv_nonce { if nonce < self.expected_recv_nonce {
// Duplicate packet! The ACK we sent was likely lost or delayed. // Duplicate — the ACK we sent was likely lost or delayed.
// We MUST trigger an immediate ACK to unblock the sender's congestion window. eprintln!("[ostp] Duplicate frame nonce={} (expected {}), forcing ACK", nonce, self.expected_recv_nonce);
if let Some(ack_frame) = self.force_build_ack()? { if let Some(ack_frame) = self.force_build_ack()? {
return Ok(ProtocolAction::SendDatagram(ack_frame)); return Ok(ProtocolAction::SendDatagram(ack_frame));
} }
return Ok(ProtocolAction::Noop); return Ok(ProtocolAction::Noop);
} }
// Buffer limit to prevent memory bloat, widened to handle high latency/speed gaps
if nonce > self.expected_recv_nonce + self.max_reorder { if nonce > self.expected_recv_nonce + self.max_reorder {
// Treat as heavy loss: request retransmit of the earliest missing packet. eprintln!(
"[ostp] Frame nonce={} exceeds max reorder window (expected={}, max_gap={}), sending NACK",
nonce, self.expected_recv_nonce, self.max_reorder
);
if let Ok(nack_frame) = self.build_control_datagram( if let Ok(nack_frame) = self.build_control_datagram(
0, 0,
FrameKind::Nack, FrameKind::Nack,
@ -280,9 +282,11 @@ impl ProtocolMachine {
if packet.header.kind == FrameKind::Nack { if packet.header.kind == FrameKind::Nack {
if packet.payload.len() >= 8 { if packet.payload.len() >= 8 {
let req_nonce = u64::from_be_bytes(packet.payload[..8].try_into().unwrap()); let req_nonce = u64::from_be_bytes(packet.payload[..8].try_into().unwrap());
// Search history from back to front (newest most likely requested)
if let Some(cached_frame) = self.lookup_sent_frame(req_nonce) { if let Some(cached_frame) = self.lookup_sent_frame(req_nonce) {
eprintln!("[ostp] NACK received: retransmitting nonce={}", req_nonce);
outbound_actions.push(ProtocolAction::SendDatagram(cached_frame)); outbound_actions.push(ProtocolAction::SendDatagram(cached_frame));
} else {
eprintln!("[ostp] NACK received: nonce={} not found in sent_history (evicted)", req_nonce);
} }
} }
} }
@ -297,6 +301,7 @@ impl ProtocolMachine {
ProtocolAction::DeliverApp(packet.header.stream_id, packet.payload) ProtocolAction::DeliverApp(packet.header.stream_id, packet.payload)
} }
FrameKind::Close => { FrameKind::Close => {
eprintln!("[ostp] Received Close frame, terminating session");
self.state = OstpState::Closed; self.state = OstpState::Closed;
ProtocolAction::Noop ProtocolAction::Noop
} }
@ -326,9 +331,14 @@ impl ProtocolMachine {
} }
self.last_recv_advance = Instant::now(); self.last_recv_advance = Instant::now();
} else { } else {
// Gap detected! Buffer current packet and request retransmit of the gap packet. // Gap detected
if self.reorder_buffer.len() < self.max_reorder_buffer { if self.reorder_buffer.len() < self.max_reorder_buffer {
self.reorder_buffer.insert(nonce, action); self.reorder_buffer.insert(nonce, action);
} else {
eprintln!(
"[ostp] Reorder buffer full ({}/{}), dropping frame nonce={}",
self.reorder_buffer.len(), self.max_reorder_buffer, nonce
);
} }
// Rate-limited NACK: send at most once per 30ms to prevent retransmit storms. // Rate-limited NACK: send at most once per 30ms to prevent retransmit storms.
@ -441,16 +451,21 @@ impl ProtocolMachine {
&& self.last_recv_advance.elapsed() > Duration::from_secs(5) && self.last_recv_advance.elapsed() > Duration::from_secs(5)
{ {
if let Some(&first_buffered) = self.reorder_buffer.keys().next() { if let Some(&first_buffered) = self.reorder_buffer.keys().next() {
// Skip expected_recv_nonce forward to the first buffered frame let skipped = first_buffered.saturating_sub(self.expected_recv_nonce);
self.expected_recv_nonce = first_buffered; self.expected_recv_nonce = first_buffered;
self.last_recv_advance = Instant::now(); self.last_recv_advance = Instant::now();
// Drain all continuous frames from the reorder buffer let mut delivered = 0u64;
while let Some(buffered_action) = self.reorder_buffer.remove(&self.expected_recv_nonce) { while let Some(buffered_action) = self.reorder_buffer.remove(&self.expected_recv_nonce) {
actions.push(buffered_action); actions.push(buffered_action);
self.expected_recv_nonce = self.expected_recv_nonce.saturating_add(1); self.expected_recv_nonce = self.expected_recv_nonce.saturating_add(1);
delivered += 1;
} }
self.ack_pending = true; self.ack_pending = true;
eprintln!(
"[ostp] Gap recovery: skipped {} lost frames, delivered {} buffered frames (reorder_buf={})",
skipped, delivered, self.reorder_buffer.len()
);
} }
} }
@ -467,7 +482,12 @@ impl ProtocolMachine {
// Shorter grace period than before (was +4) to free memory faster // Shorter grace period than before (was +4) to free memory faster
// after high-throughput bursts. // after high-throughput bursts.
let grace = self.max_retries.saturating_add(2); let grace = self.max_retries.saturating_add(2);
let before = self.sent_history.len();
self.sent_history.retain(|f| !f.is_retransmittable || f.retries <= grace); self.sent_history.retain(|f| !f.is_retransmittable || f.retries <= grace);
let evicted = before - self.sent_history.len();
if evicted > 0 {
eprintln!("[ostp] Evicted {} zombie frames from sent_history (remaining={})", evicted, self.sent_history.len());
}
// ── Retransmit expired frames ──────────────────────────────── // ── Retransmit expired frames ────────────────────────────────
// Limit retransmits per tick to prevent bandwidth saturation // Limit retransmits per tick to prevent bandwidth saturation
@ -600,8 +620,15 @@ impl ProtocolMachine {
retries: 0, retries: 0,
is_retransmittable, is_retransmittable,
}); });
while self.sent_history.len() > self.max_sent_history { if self.sent_history.len() > self.max_sent_history {
self.sent_history.pop_front(); let overflow = self.sent_history.len() - self.max_sent_history;
eprintln!(
"[ostp] sent_history overflow: evicting {} oldest frames (cap={})",
overflow, self.max_sent_history
);
while self.sent_history.len() > self.max_sent_history {
self.sent_history.pop_front();
}
} }
} }

View File

@ -99,8 +99,8 @@ impl Dispatcher {
if let Some(session_id) = session_id_opt { if let Some(session_id) = session_id_opt {
if let Some(peer_state) = self.peer_machines.get_mut(&session_id) { if let Some(peer_state) = self.peer_machines.get_mut(&session_id) {
// Update address on seamless roaming: remove old mapping to prevent HashMap leak
if peer_state.last_addr != peer { if peer_state.last_addr != peer {
eprintln!("[ostp] Client roamed: session {} from {} to {}", session_id, peer_state.last_addr, peer);
self.addr_to_session.remove(&peer_state.last_addr); self.addr_to_session.remove(&peer_state.last_addr);
} }
peer_state.last_addr = peer; peer_state.last_addr = peer;
@ -109,7 +109,10 @@ impl Dispatcher {
let action = match peer_state.machine.on_event(OstpEvent::Inbound(packet)) { let action = match peer_state.machine.on_event(OstpEvent::Inbound(packet)) {
Ok(a) => a, Ok(a) => a,
Err(_) => return Ok(DispatchOutcome::Unauthorized), Err(e) => {
eprintln!("[ostp] Protocol error for session {}: {}", session_id, e);
return Ok(DispatchOutcome::Unauthorized);
}
}; };
let mut responses = Vec::new(); let mut responses = Vec::new();
@ -168,7 +171,10 @@ impl Dispatcher {
let mut machine = match ProtocolMachine::new(cfg) { let mut machine = match ProtocolMachine::new(cfg) {
Ok(m) => m, Ok(m) => m,
Err(_) => continue, Err(e) => {
eprintln!("[ostp] Failed to create protocol machine for key trial: {}", e);
continue;
}
}; };
let action = match machine.on_event(OstpEvent::Inbound(packet.clone())) { let action = match machine.on_event(OstpEvent::Inbound(packet.clone())) {
Ok(a) => a, Ok(a) => a,
@ -203,18 +209,17 @@ impl Dispatcher {
let drift = (now as i64 - ts as i64).abs(); let drift = (now as i64 - ts as i64).abs();
if drift > 300 { if drift > 300 {
// Narrow window (5 mins) limits replay risk and bounds cache memory eprintln!("[ostp] Handshake rejected: timestamp drift {}s exceeds 300s limit (peer={})", drift, peer);
continue; continue;
} }
if !self.replay_cache.contains_key(&payload.to_vec()) { if !self.replay_cache.contains_key(&payload.to_vec()) {
// Hard cap: prevent OOM under DDoS — replay cache grows
// unboundedly between purge ticks without this limit.
if self.replay_cache.len() >= 100_000 { if self.replay_cache.len() >= 100_000 {
eprintln!("[ostp] Replay cache full (100000 entries), rejecting handshake from {}", peer);
return Ok(DispatchOutcome::Unauthorized); return Ok(DispatchOutcome::Unauthorized);
} }
// §4 fix: hard cap on concurrent sessions to prevent RAM exhaustion
if self.peer_machines.len() >= MAX_SESSIONS { if self.peer_machines.len() >= MAX_SESSIONS {
eprintln!("[ostp] Max sessions reached ({}), rejecting handshake from {}", MAX_SESSIONS, peer);
return Ok(DispatchOutcome::Unauthorized); return Ok(DispatchOutcome::Unauthorized);
} }
@ -230,6 +235,11 @@ impl Dispatcher {
}); });
self.addr_to_session.insert(peer, candidate_session_id); self.addr_to_session.insert(peer, candidate_session_id);
eprintln!(
"[ostp] New session authenticated: sid={} peer={} (active_sessions={}, replay_cache={})",
candidate_session_id, peer, self.peer_machines.len(), self.replay_cache.len()
);
return Ok(DispatchOutcome::Accepted { return Ok(DispatchOutcome::Accepted {
responses: response_opt.into_iter().collect(), responses: response_opt.into_iter().collect(),
app_payloads: Vec::new(), app_payloads: Vec::new(),
@ -280,6 +290,7 @@ impl Dispatcher {
// Clear expired sessions from internal state // Clear expired sessions from internal state
for sid in &expired { for sid in &expired {
eprintln!("[ostp] Session {} expired (inactive >5min), releasing", sid);
self.drop_session(*sid); self.drop_session(*sid);
} }
@ -287,7 +298,10 @@ impl Dispatcher {
for peer_state in self.peer_machines.values_mut() { for peer_state in self.peer_machines.values_mut() {
let action = match peer_state.machine.on_event(OstpEvent::Tick) { let action = match peer_state.machine.on_event(OstpEvent::Tick) {
Ok(a) => a, Ok(a) => a,
Err(_) => continue, Err(e) => {
eprintln!("[ostp] Tick error for session: {}", e);
continue;
}
}; };
let mut queue = vec![action]; let mut queue = vec![action];

View File

@ -106,7 +106,7 @@ pub async fn run_server(
} }
let mut keys_lock = shared_keys_clone.write().unwrap(); let mut keys_lock = shared_keys_clone.write().unwrap();
*keys_lock = new_keys; *keys_lock = new_keys;
println!("[ostp-server] Hot-reloaded {} access keys from config.json", keys_lock.len()); eprintln!("[ostp] Hot-reloaded {} access keys from config.json", keys_lock.len());
} }
} }
} }
@ -121,6 +121,9 @@ pub async fn run_server(
let sock = socket2::Socket::new(domain, socket2::Type::DGRAM, Some(socket2::Protocol::UDP))?; let sock = socket2::Socket::new(domain, socket2::Type::DGRAM, Some(socket2::Protocol::UDP))?;
let _ = sock.set_recv_buffer_size(33554432); // 32MB let _ = sock.set_recv_buffer_size(33554432); // 32MB
let _ = sock.set_send_buffer_size(33554432); // 32MB let _ = sock.set_send_buffer_size(33554432); // 32MB
let actual_recv = sock.recv_buffer_size().unwrap_or(0);
let actual_send = sock.send_buffer_size().unwrap_or(0);
eprintln!("[ostp] UDP socket buffers: recv={}KB send={}KB", actual_recv / 1024, actual_send / 1024);
sock.bind(&addr.into())?; sock.bind(&addr.into())?;
sock.set_nonblocking(true)?; sock.set_nonblocking(true)?;
let socket = UdpSocket::from_std(sock.into())?; let socket = UdpSocket::from_std(sock.into())?;
@ -156,17 +159,20 @@ pub async fn run_server(
let is_essential = msg.starts_with("Client ") let is_essential = msg.starts_with("Client ")
|| msg.starts_with("Listening") || msg.starts_with("Listening")
|| msg.starts_with("Shutdown") || msg.starts_with("Shutdown")
|| msg.starts_with("Session "); || msg.starts_with("Session ")
|| msg.starts_with("Relay CONNECT")
|| msg.starts_with("Relay CLOSE")
|| msg.starts_with("Relay error");
if debug || is_essential { if debug || is_essential {
println!("[ostp] {msg}"); eprintln!("[ostp] {msg}");
} }
} }
UiEvent::KeyCreated { key } => { UiEvent::KeyCreated { key } => {
println!("[ostp] Access key created: {key}"); eprintln!("[ostp] Access key created: {key}");
} }
UiEvent::UnauthorizedProbe { peer, bytes } => { UiEvent::UnauthorizedProbe { peer, bytes } => {
if debug { if debug {
println!("[ostp] Unauthorized probe from {peer} ({bytes} bytes)"); eprintln!("[ostp] Unauthorized probe from {peer} ({bytes} bytes)");
} }
} }
UiEvent::PeerSeen { .. } => {} UiEvent::PeerSeen { .. } => {}
@ -175,7 +181,9 @@ pub async fn run_server(
} }
}); });
println!("[ostp] Listening on {bind_addr}"); let key_count = shared_keys.read().unwrap().len();
eprintln!("[ostp] Listening on {bind_addr} ({key_count} access keys loaded)");
eprintln!("[ostp] ARQ config: max_reorder=16384, reorder_buf=8192, sent_history=32768, rto=100ms");
tokio::select! { tokio::select! {
res = run_server_loop(socket, dispatcher, max_datagram_size, ui_cmd_rx, ui_event_tx, shared_keys, outbound, debug) => { res = run_server_loop(socket, dispatcher, max_datagram_size, ui_cmd_rx, ui_event_tx, shared_keys, outbound, debug) => {
if let Err(e) = res { if let Err(e) = res {
@ -183,7 +191,7 @@ pub async fn run_server(
} }
} }
_ = wait_for_shutdown_signal() => { _ = wait_for_shutdown_signal() => {
println!("[ostp] Shutdown signal received"); eprintln!("[ostp] Shutdown signal received");
} }
} }
@ -446,6 +454,7 @@ async fn handle_relay_message(
RelayMessage::Close => { RelayMessage::Close => {
if let Some(state) = remotes.remove(&(session_id, stream_id)) { if let Some(state) = remotes.remove(&(session_id, stream_id)) {
let _ = state.cancel_tx.try_send(()); let _ = state.cancel_tx.try_send(());
let _ = ui_event_tx.send(UiEvent::Log(format!("Relay CLOSE [{session_id}:{stream_id}]")));
} }
} }
RelayMessage::ConnectOk => {} RelayMessage::ConnectOk => {}
@ -536,7 +545,7 @@ async fn select_outbound_action(
let action = matched.unwrap_or(outbound.default_action); let action = matched.unwrap_or(outbound.default_action);
if debug { if debug {
println!("[ostp-server] outbound decision target={target} action={action:?}"); eprintln!("[ostp] Outbound routing: target={target} action={action:?}");
} }
action action
} }