diff --git a/ostp-client/src/bridge.rs b/ostp-client/src/bridge.rs index ad9ce50..11cc2ec 100644 --- a/ostp-client/src/bridge.rs +++ b/ostp-client/src/bridge.rs @@ -432,8 +432,8 @@ impl Bridge { } } proxy_ev = proxy_rx.recv(), if self.running && sessions_opt.as_ref().map(|s| { - // §3 FIX: Apply backpressure. Suspend pulling from local proxy if ARQ buffers exceed 512 unacked frames - s.iter().all(|ses| ses.machine.in_flight_count() < 512) + // Backpressure: suspend proxy reads when ARQ window is saturated + s.iter().all(|ses| ses.machine.in_flight_count() < 256) }).unwrap_or(true) => { if let Some(ev) = proxy_ev { if let Some(sessions) = sessions_opt.as_mut() { @@ -601,12 +601,12 @@ impl Bridge { max_padding: 1280, // Safe MTU size to avoid UDP fragmentation on Windows/PPPoE padding_strategy: PaddingStrategy::Profile(self.profile), obfuscation_key: obf_key, - max_reorder: 262144, - max_reorder_buffer: 32768, // Expanded to prevent dropping out-of-order packets during high-speed tests - ack_delay_ms: 5, // Reduced from 20ms to 5ms for rapid ACK unblocking and throughput acceleration - rto_ms: 100, // Reduced from 200ms to 100ms for faster recovery on packet loss + max_reorder: 16384, // Max gap between expected and received nonce + max_reorder_buffer: 8192, // Max buffered out-of-order frames + ack_delay_ms: 5, + rto_ms: 100, max_retries: 8, - max_sent_history: 65536, // Greatly expanded to guarantee that oldest unacked packets are not prematurely popped and lost + max_sent_history: 32768, // Reduced: gap recovery handles unrecoverable frames })?; let addr = self.local_bind_addr.parse::().map_err(|e| anyhow::anyhow!("invalid bind addr: {}", e))?; diff --git a/ostp-core/src/protocol.rs b/ostp-core/src/protocol.rs index b2552b8..8b1d8a5 100644 --- a/ostp-core/src/protocol.rs +++ b/ostp-core/src/protocol.rs @@ -83,6 +83,11 @@ pub struct ProtocolMachine { last_ack_sent: Instant, /// Rate-limit: prevents sending a NACK more than once per 30ms to avoid storms last_nack_sent: Instant, + /// Tracks when expected_recv_nonce last advanced. Used for gap recovery: + /// if the receiver is stuck waiting for a lost frame that the sender already + /// evicted from sent_history, this timer detects the deadlock and skips + /// the gap to restore liveness. + last_recv_advance: Instant, } #[derive(Debug, Clone)] @@ -124,6 +129,7 @@ impl ProtocolMachine { ack_pending: false, last_ack_sent: Instant::now(), last_nack_sent: Instant::now() - Duration::from_secs(1), + last_recv_advance: Instant::now(), }) } @@ -307,18 +313,18 @@ impl ProtocolMachine { if nonce == self.expected_recv_nonce { app_actions.push(action); self.expected_recv_nonce = self.expected_recv_nonce.checked_add(1).ok_or_else(|| { - tracing::error!("FATAL: Recv nonce sequence exhausted (2^64 frames). Session must be terminated to prevent AEAD keystream reuse!"); ProtocolError::Crypto("recv nonce sequence exhausted".to_string()) })?; + self.last_recv_advance = Instant::now(); // Drain continuous queue while let Some(buffered_action) = self.reorder_buffer.remove(&self.expected_recv_nonce) { app_actions.push(buffered_action); self.expected_recv_nonce = self.expected_recv_nonce.checked_add(1).ok_or_else(|| { - tracing::error!("FATAL: Recv nonce sequence exhausted (2^64 frames). Session must be terminated to prevent AEAD keystream reuse!"); ProtocolError::Crypto("recv nonce sequence exhausted".to_string()) })?; } + self.last_recv_advance = Instant::now(); } else { // Gap detected! Buffer current packet and request retransmit of the gap packet. if self.reorder_buffer.len() < self.max_reorder_buffer { @@ -399,7 +405,6 @@ impl ProtocolMachine { let nonce = self.send_nonce; self.send_nonce = self.send_nonce.checked_add(1).ok_or_else(|| { - tracing::error!("FATAL: Send nonce sequence exhausted (2^64 frames). Session must be terminated to prevent AEAD keystream reuse!"); ProtocolError::Crypto("send nonce sequence exhausted".to_string()) })?; @@ -427,6 +432,29 @@ impl ProtocolMachine { fn handle_tick(&mut self) -> Result { let mut actions = Vec::new(); + // ── Gap Recovery ────────────────────────────────────────────── + // If expected_recv_nonce hasn't advanced for 5+ seconds and there + // are buffered frames waiting, the sender likely evicted the lost + // frame from sent_history. Skip the gap to restore data flow. + // This trades a small amount of data loss for connection liveness. + if !self.reorder_buffer.is_empty() + && self.last_recv_advance.elapsed() > Duration::from_secs(5) + { + if let Some(&first_buffered) = self.reorder_buffer.keys().next() { + // Skip expected_recv_nonce forward to the first buffered frame + self.expected_recv_nonce = first_buffered; + self.last_recv_advance = Instant::now(); + + // Drain all continuous frames from the reorder buffer + while let Some(buffered_action) = self.reorder_buffer.remove(&self.expected_recv_nonce) { + actions.push(buffered_action); + self.expected_recv_nonce = self.expected_recv_nonce.saturating_add(1); + } + self.ack_pending = true; + } + } + + // ── Pending ACK flush ───────────────────────────────────────── if let Some(ack_frame) = self.build_ack_if_due()? { actions.push(ProtocolAction::SendDatagram(ack_frame)); } @@ -434,24 +462,24 @@ impl ProtocolMachine { let now = Instant::now(); let base_rto_ms = self.rto.as_millis().max(1) as u64; - // Evict zombie frames that exceeded max_retries + grace period. - // Without eviction, unacknowledged frames accumulate forever, consuming memory - // and wasting bandwidth on retransmits that will never be acknowledged. - let grace = self.max_retries.saturating_add(4); + // ── Zombie frame eviction ──────────────────────────────────── + // Evict frames that exceeded max_retries + 2 grace retries. + // Shorter grace period than before (was +4) to free memory faster + // after high-throughput bursts. + let grace = self.max_retries.saturating_add(2); self.sent_history.retain(|f| !f.is_retransmittable || f.retries <= grace); + // ── Retransmit expired frames ──────────────────────────────── + // Limit retransmits per tick to prevent bandwidth saturation + let mut retransmit_budget: usize = 8; for frame in self.sent_history.iter_mut() { + if retransmit_budget == 0 { + break; + } if !frame.is_retransmittable { continue; } - if frame.retries >= self.max_retries { - tracing::warn!( - "Frame nonce={} retry {}/{} (backoff active)", - frame.nonce, frame.retries, self.max_retries - ); - } - let retry_over = frame.retries.saturating_sub(self.max_retries); let backoff_factor = 1u64 << retry_over.min(6); let effective_rto = Duration::from_millis(base_rto_ms.saturating_mul(backoff_factor)); @@ -460,6 +488,7 @@ impl ProtocolMachine { frame.last_sent = now; frame.retries = frame.retries.saturating_add(1); actions.push(ProtocolAction::SendDatagram(frame.bytes.clone())); + retransmit_budget -= 1; } } diff --git a/ostp-server/src/lib.rs b/ostp-server/src/lib.rs index 62639bb..265890a 100644 --- a/ostp-server/src/lib.rs +++ b/ostp-server/src/lib.rs @@ -132,12 +132,12 @@ pub async fn run_server( max_padding: 256, padding_strategy: PaddingStrategy::Adaptive, obfuscation_key: [0u8; 8], - max_reorder: 262144, - max_reorder_buffer: 32768, - ack_delay_ms: 5, // Reduced to 5ms for drastically faster ACK loopback throughput - rto_ms: 100, // Reduced to 100ms for aggressive, low-latency packet recovery + max_reorder: 16384, + max_reorder_buffer: 8192, + ack_delay_ms: 5, + rto_ms: 100, max_retries: 8, - max_sent_history: 65536, + max_sent_history: 32768, }; let dispatcher = Dispatcher::new(protocol_config, shared_keys.clone());