From b1dfb335c9a7b6b1131ca3918e43e4350a44c5a9 Mon Sep 17 00:00:00 2001 From: ospab Date: Sun, 17 May 2026 02:01:40 +0300 Subject: [PATCH] fix: resolve severe server-side head-of-line blocking under high connection concurrency by delegating TCP connection establishments and stream writing to asynchronous spawned tasks --- Cargo.lock | 10 ++-- ostp-server/src/lib.rs | 103 +++++++++++++++++++++++------------------ 2 files changed, 63 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e8e1669..d06d0ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -745,7 +745,7 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" [[package]] name = "ostp" -version = "0.1.53" +version = "0.1.54" dependencies = [ "anyhow", "base64", @@ -762,7 +762,7 @@ dependencies = [ [[package]] name = "ostp-client" -version = "0.1.53" +version = "0.1.54" dependencies = [ "anyhow", "bytes", @@ -779,7 +779,7 @@ dependencies = [ [[package]] name = "ostp-core" -version = "0.1.53" +version = "0.1.54" dependencies = [ "anyhow", "async-trait", @@ -812,7 +812,7 @@ dependencies = [ [[package]] name = "ostp-server" -version = "0.1.53" +version = "0.1.54" dependencies = [ "anyhow", "bytes", @@ -826,7 +826,7 @@ dependencies = [ [[package]] name = "ostp-tun-helper" -version = "0.1.53" +version = "0.1.54" dependencies = [ "anyhow", "chrono", diff --git a/ostp-server/src/lib.rs b/ostp-server/src/lib.rs index 83a3403..1838c92 100644 --- a/ostp-server/src/lib.rs +++ b/ostp-server/src/lib.rs @@ -181,7 +181,7 @@ pub async fn run_server( } struct RemoteState { - writer: OwnedWriteHalf, + data_tx: mpsc::UnboundedSender, cancel_tx: mpsc::Sender<()>, } @@ -197,6 +197,7 @@ async fn run_server_loop( ) -> Result<()> { let mut remotes: HashMap<(u32, u16), RemoteState> = HashMap::new(); let (stream_tx, mut stream_rx) = mpsc::channel::<(u32, u16, Vec)>(10000); + let (connect_tx, mut connect_rx) = mpsc::unbounded_channel::<(u32, u16, String, Result<(tokio::net::tcp::OwnedWriteHalf, mpsc::Sender<()>), String>)>(); let socket = std::sync::Arc::new(socket); let (udp_tx, mut udp_rx) = mpsc::channel(10000); @@ -288,6 +289,7 @@ async fn run_server_loop( &mut remotes, &ui_event_tx, stream_tx.clone(), + connect_tx.clone(), outbound.clone(), debug, ).await?; @@ -309,6 +311,28 @@ async fn run_server_loop( let _ = send_relay_to_stream(session_id, stream_id, RelayMessage::Data(data), &mut dispatcher, &socket, &ui_event_tx).await; } } + Some((session_id, stream_id, target, res)) = connect_rx.recv() => { + match res { + Ok((writer, cancel_tx)) => { + let (data_tx, mut data_rx) = mpsc::unbounded_channel::(); + let mut writer_task = writer; + tokio::spawn(async move { + while let Some(data) = data_rx.recv().await { + if tokio::io::AsyncWriteExt::write_all(&mut writer_task, &data).await.is_err() { + break; + } + } + }); + remotes.insert((session_id, stream_id), RemoteState { data_tx, cancel_tx }); + let _ = send_relay_to_stream(session_id, stream_id, RelayMessage::ConnectOk, &mut dispatcher, &socket, &ui_event_tx).await; + let _ = ui_event_tx.send(UiEvent::Log(format!("Relay CONNECT ok for [{session_id}:{stream_id}] -> {target}"))); + } + Err(err) => { + let _ = ui_event_tx.send(UiEvent::Log(format!("Relay CONNECT failed for [{session_id}:{stream_id}] -> {target}: {err}"))); + let _ = send_relay_to_stream(session_id, stream_id, RelayMessage::Error(format!("connect failed: {err}")), &mut dispatcher, &socket, &ui_event_tx).await; + } + } + } _ = retransmit_tick.tick() => { let now = Instant::now(); for (peer_ip, last_seen) in peer_last_seen.iter() { @@ -353,66 +377,55 @@ async fn handle_relay_message( remotes: &mut HashMap<(u32, u16), RemoteState>, ui_event_tx: &mpsc::UnboundedSender, stream_tx: mpsc::Sender<(u32, u16, Vec)>, + connect_tx: mpsc::UnboundedSender<(u32, u16, String, Result<(tokio::net::tcp::OwnedWriteHalf, mpsc::Sender<()>), String>)>, outbound: Option, debug: bool, ) -> Result<()> { match RelayMessage::decode(&payload)? { RelayMessage::Connect(target) => { let _ = ui_event_tx.send(UiEvent::Log(format!("Relay CONNECT start for [{session_id}:{stream_id}] -> {target}"))); - let stream = connect_target(&target, outbound.as_ref(), debug).await; - match stream { - Ok(stream) => { - let (mut reader, writer) = stream.into_split(); - let (cancel_tx, mut cancel_rx) = mpsc::channel::<()>(1); - let tx_clone = stream_tx.clone(); - tokio::spawn(async move { - let mut buf = [0_u8; 1024]; - loop { - tokio::select! { - _ = cancel_rx.recv() => { - break; - } - read_res = reader.read(&mut buf) => { - match read_res { - Ok(0) => { - let _ = tx_clone.send((session_id, stream_id, Vec::new())).await; - break; - } - Ok(n) => { - if tx_clone.send((session_id, stream_id, buf[..n].to_vec())).await.is_err() { + let target_clone = target.clone(); + let connect_tx_clone = connect_tx.clone(); + let stream_tx_clone = stream_tx.clone(); + let outbound_clone = outbound.clone(); + tokio::spawn(async move { + let stream_res = connect_target(&target_clone, outbound_clone.as_ref(), debug).await; + match stream_res { + Ok(stream) => { + let (mut reader, writer) = stream.into_split(); + let (cancel_tx, mut cancel_rx) = mpsc::channel::<()>(1); + tokio::spawn(async move { + let mut buf = [0_u8; 4096]; + loop { + tokio::select! { + _ = cancel_rx.recv() => break, + read_res = reader.read(&mut buf) => { + match read_res { + Ok(0) | Err(_) => { + let _ = stream_tx_clone.send((session_id, stream_id, Vec::new())).await; break; } - } - Err(_) => { - let _ = tx_clone.send((session_id, stream_id, Vec::new())).await; - break; + Ok(n) => { + if stream_tx_clone.send((session_id, stream_id, buf[..n].to_vec())).await.is_err() { + break; + } + } } } } } - } - }); - remotes.insert((session_id, stream_id), RemoteState { writer, cancel_tx }); - send_relay_to_stream(session_id, stream_id, RelayMessage::ConnectOk, dispatcher, socket, ui_event_tx).await?; - let _ = ui_event_tx.send(UiEvent::Log(format!("Relay CONNECT ok for [{session_id}:{stream_id}] -> {target}"))); + }); + let _ = connect_tx_clone.send((session_id, stream_id, target_clone, Ok((writer, cancel_tx)))); + } + Err(e) => { + let _ = connect_tx_clone.send((session_id, stream_id, target_clone, Err(e.to_string()))); + } } - Err(err) => { - let _ = ui_event_tx.send(UiEvent::Log(format!("Relay CONNECT failed for [{session_id}:{stream_id}] -> {target}: {err}"))); - send_relay_to_stream( - session_id, - stream_id, - RelayMessage::Error(format!("connect failed: {err}")), - dispatcher, - socket, - ui_event_tx, - ) - .await?; - } - } + }); } RelayMessage::Data(data) => { if let Some(remote) = remotes.get_mut(&(session_id, stream_id)) { - let _ = remote.writer.write_all(&data).await; + let _ = remote.data_tx.send(bytes::Bytes::from(data)); } else { let _ = ui_event_tx.send(UiEvent::Log(format!("Relay DATA for unknown stream [{session_id}:{stream_id}] ({})", data.len()))); }