fix: tunnel death after speedtest — gap recovery and ARQ tuning

This commit is contained in:
ospab 2026-05-17 14:22:50 +03:00
parent e36d743ad5
commit dc6635e248
3 changed files with 55 additions and 26 deletions

View File

@ -432,8 +432,8 @@ impl Bridge {
}
}
proxy_ev = proxy_rx.recv(), if self.running && sessions_opt.as_ref().map(|s| {
// §3 FIX: Apply backpressure. Suspend pulling from local proxy if ARQ buffers exceed 512 unacked frames
s.iter().all(|ses| ses.machine.in_flight_count() < 512)
// Backpressure: suspend proxy reads when ARQ window is saturated
s.iter().all(|ses| ses.machine.in_flight_count() < 256)
}).unwrap_or(true) => {
if let Some(ev) = proxy_ev {
if let Some(sessions) = sessions_opt.as_mut() {
@ -601,12 +601,12 @@ impl Bridge {
max_padding: 1280, // Safe MTU size to avoid UDP fragmentation on Windows/PPPoE
padding_strategy: PaddingStrategy::Profile(self.profile),
obfuscation_key: obf_key,
max_reorder: 262144,
max_reorder_buffer: 32768, // Expanded to prevent dropping out-of-order packets during high-speed tests
ack_delay_ms: 5, // Reduced from 20ms to 5ms for rapid ACK unblocking and throughput acceleration
rto_ms: 100, // Reduced from 200ms to 100ms for faster recovery on packet loss
max_reorder: 16384, // Max gap between expected and received nonce
max_reorder_buffer: 8192, // Max buffered out-of-order frames
ack_delay_ms: 5,
rto_ms: 100,
max_retries: 8,
max_sent_history: 65536, // Greatly expanded to guarantee that oldest unacked packets are not prematurely popped and lost
max_sent_history: 32768, // Reduced: gap recovery handles unrecoverable frames
})?;
let addr = self.local_bind_addr.parse::<std::net::SocketAddr>().map_err(|e| anyhow::anyhow!("invalid bind addr: {}", e))?;

View File

@ -83,6 +83,11 @@ pub struct ProtocolMachine {
last_ack_sent: Instant,
/// Rate-limit: prevents sending a NACK more than once per 30ms to avoid storms
last_nack_sent: Instant,
/// Tracks when expected_recv_nonce last advanced. Used for gap recovery:
/// if the receiver is stuck waiting for a lost frame that the sender already
/// evicted from sent_history, this timer detects the deadlock and skips
/// the gap to restore liveness.
last_recv_advance: Instant,
}
#[derive(Debug, Clone)]
@ -124,6 +129,7 @@ impl ProtocolMachine {
ack_pending: false,
last_ack_sent: Instant::now(),
last_nack_sent: Instant::now() - Duration::from_secs(1),
last_recv_advance: Instant::now(),
})
}
@ -307,18 +313,18 @@ impl ProtocolMachine {
if nonce == self.expected_recv_nonce {
app_actions.push(action);
self.expected_recv_nonce = self.expected_recv_nonce.checked_add(1).ok_or_else(|| {
tracing::error!("FATAL: Recv nonce sequence exhausted (2^64 frames). Session must be terminated to prevent AEAD keystream reuse!");
ProtocolError::Crypto("recv nonce sequence exhausted".to_string())
})?;
self.last_recv_advance = Instant::now();
// Drain continuous queue
while let Some(buffered_action) = self.reorder_buffer.remove(&self.expected_recv_nonce) {
app_actions.push(buffered_action);
self.expected_recv_nonce = self.expected_recv_nonce.checked_add(1).ok_or_else(|| {
tracing::error!("FATAL: Recv nonce sequence exhausted (2^64 frames). Session must be terminated to prevent AEAD keystream reuse!");
ProtocolError::Crypto("recv nonce sequence exhausted".to_string())
})?;
}
self.last_recv_advance = Instant::now();
} else {
// Gap detected! Buffer current packet and request retransmit of the gap packet.
if self.reorder_buffer.len() < self.max_reorder_buffer {
@ -399,7 +405,6 @@ impl ProtocolMachine {
let nonce = self.send_nonce;
self.send_nonce = self.send_nonce.checked_add(1).ok_or_else(|| {
tracing::error!("FATAL: Send nonce sequence exhausted (2^64 frames). Session must be terminated to prevent AEAD keystream reuse!");
ProtocolError::Crypto("send nonce sequence exhausted".to_string())
})?;
@ -427,6 +432,29 @@ impl ProtocolMachine {
fn handle_tick(&mut self) -> Result<ProtocolAction, ProtocolError> {
let mut actions = Vec::new();
// ── Gap Recovery ──────────────────────────────────────────────
// If expected_recv_nonce hasn't advanced for 5+ seconds and there
// are buffered frames waiting, the sender likely evicted the lost
// frame from sent_history. Skip the gap to restore data flow.
// This trades a small amount of data loss for connection liveness.
if !self.reorder_buffer.is_empty()
&& self.last_recv_advance.elapsed() > Duration::from_secs(5)
{
if let Some(&first_buffered) = self.reorder_buffer.keys().next() {
// Skip expected_recv_nonce forward to the first buffered frame
self.expected_recv_nonce = first_buffered;
self.last_recv_advance = Instant::now();
// Drain all continuous frames from the reorder buffer
while let Some(buffered_action) = self.reorder_buffer.remove(&self.expected_recv_nonce) {
actions.push(buffered_action);
self.expected_recv_nonce = self.expected_recv_nonce.saturating_add(1);
}
self.ack_pending = true;
}
}
// ── Pending ACK flush ─────────────────────────────────────────
if let Some(ack_frame) = self.build_ack_if_due()? {
actions.push(ProtocolAction::SendDatagram(ack_frame));
}
@ -434,24 +462,24 @@ impl ProtocolMachine {
let now = Instant::now();
let base_rto_ms = self.rto.as_millis().max(1) as u64;
// Evict zombie frames that exceeded max_retries + grace period.
// Without eviction, unacknowledged frames accumulate forever, consuming memory
// and wasting bandwidth on retransmits that will never be acknowledged.
let grace = self.max_retries.saturating_add(4);
// ── Zombie frame eviction ────────────────────────────────────
// Evict frames that exceeded max_retries + 2 grace retries.
// Shorter grace period than before (was +4) to free memory faster
// after high-throughput bursts.
let grace = self.max_retries.saturating_add(2);
self.sent_history.retain(|f| !f.is_retransmittable || f.retries <= grace);
// ── Retransmit expired frames ────────────────────────────────
// Limit retransmits per tick to prevent bandwidth saturation
let mut retransmit_budget: usize = 8;
for frame in self.sent_history.iter_mut() {
if retransmit_budget == 0 {
break;
}
if !frame.is_retransmittable {
continue;
}
if frame.retries >= self.max_retries {
tracing::warn!(
"Frame nonce={} retry {}/{} (backoff active)",
frame.nonce, frame.retries, self.max_retries
);
}
let retry_over = frame.retries.saturating_sub(self.max_retries);
let backoff_factor = 1u64 << retry_over.min(6);
let effective_rto = Duration::from_millis(base_rto_ms.saturating_mul(backoff_factor));
@ -460,6 +488,7 @@ impl ProtocolMachine {
frame.last_sent = now;
frame.retries = frame.retries.saturating_add(1);
actions.push(ProtocolAction::SendDatagram(frame.bytes.clone()));
retransmit_budget -= 1;
}
}

View File

@ -132,12 +132,12 @@ pub async fn run_server(
max_padding: 256,
padding_strategy: PaddingStrategy::Adaptive,
obfuscation_key: [0u8; 8],
max_reorder: 262144,
max_reorder_buffer: 32768,
ack_delay_ms: 5, // Reduced to 5ms for drastically faster ACK loopback throughput
rto_ms: 100, // Reduced to 100ms for aggressive, low-latency packet recovery
max_reorder: 16384,
max_reorder_buffer: 8192,
ack_delay_ms: 5,
rto_ms: 100,
max_retries: 8,
max_sent_history: 65536,
max_sent_history: 32768,
};
let dispatcher = Dispatcher::new(protocol_config, shared_keys.clone());