fix: resolve severe server-side head-of-line blocking under high connection concurrency by delegating TCP connection establishments and stream writing to asynchronous spawned tasks

This commit is contained in:
ospab 2026-05-17 02:01:40 +03:00
parent 6a474c8f00
commit b1dfb335c9
2 changed files with 63 additions and 50 deletions

10
Cargo.lock generated
View File

@ -745,7 +745,7 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381"
[[package]]
name = "ostp"
version = "0.1.53"
version = "0.1.54"
dependencies = [
"anyhow",
"base64",
@ -762,7 +762,7 @@ dependencies = [
[[package]]
name = "ostp-client"
version = "0.1.53"
version = "0.1.54"
dependencies = [
"anyhow",
"bytes",
@ -779,7 +779,7 @@ dependencies = [
[[package]]
name = "ostp-core"
version = "0.1.53"
version = "0.1.54"
dependencies = [
"anyhow",
"async-trait",
@ -812,7 +812,7 @@ dependencies = [
[[package]]
name = "ostp-server"
version = "0.1.53"
version = "0.1.54"
dependencies = [
"anyhow",
"bytes",
@ -826,7 +826,7 @@ dependencies = [
[[package]]
name = "ostp-tun-helper"
version = "0.1.53"
version = "0.1.54"
dependencies = [
"anyhow",
"chrono",

View File

@ -181,7 +181,7 @@ pub async fn run_server(
}
struct RemoteState {
writer: OwnedWriteHalf,
data_tx: mpsc::UnboundedSender<Bytes>,
cancel_tx: mpsc::Sender<()>,
}
@ -197,6 +197,7 @@ async fn run_server_loop(
) -> Result<()> {
let mut remotes: HashMap<(u32, u16), RemoteState> = HashMap::new();
let (stream_tx, mut stream_rx) = mpsc::channel::<(u32, u16, Vec<u8>)>(10000);
let (connect_tx, mut connect_rx) = mpsc::unbounded_channel::<(u32, u16, String, Result<(tokio::net::tcp::OwnedWriteHalf, mpsc::Sender<()>), String>)>();
let socket = std::sync::Arc::new(socket);
let (udp_tx, mut udp_rx) = mpsc::channel(10000);
@ -288,6 +289,7 @@ async fn run_server_loop(
&mut remotes,
&ui_event_tx,
stream_tx.clone(),
connect_tx.clone(),
outbound.clone(),
debug,
).await?;
@ -309,6 +311,28 @@ async fn run_server_loop(
let _ = send_relay_to_stream(session_id, stream_id, RelayMessage::Data(data), &mut dispatcher, &socket, &ui_event_tx).await;
}
}
Some((session_id, stream_id, target, res)) = connect_rx.recv() => {
match res {
Ok((writer, cancel_tx)) => {
let (data_tx, mut data_rx) = mpsc::unbounded_channel::<Bytes>();
let mut writer_task = writer;
tokio::spawn(async move {
while let Some(data) = data_rx.recv().await {
if tokio::io::AsyncWriteExt::write_all(&mut writer_task, &data).await.is_err() {
break;
}
}
});
remotes.insert((session_id, stream_id), RemoteState { data_tx, cancel_tx });
let _ = send_relay_to_stream(session_id, stream_id, RelayMessage::ConnectOk, &mut dispatcher, &socket, &ui_event_tx).await;
let _ = ui_event_tx.send(UiEvent::Log(format!("Relay CONNECT ok for [{session_id}:{stream_id}] -> {target}")));
}
Err(err) => {
let _ = ui_event_tx.send(UiEvent::Log(format!("Relay CONNECT failed for [{session_id}:{stream_id}] -> {target}: {err}")));
let _ = send_relay_to_stream(session_id, stream_id, RelayMessage::Error(format!("connect failed: {err}")), &mut dispatcher, &socket, &ui_event_tx).await;
}
}
}
_ = retransmit_tick.tick() => {
let now = Instant::now();
for (peer_ip, last_seen) in peer_last_seen.iter() {
@ -353,66 +377,55 @@ async fn handle_relay_message(
remotes: &mut HashMap<(u32, u16), RemoteState>,
ui_event_tx: &mpsc::UnboundedSender<UiEvent>,
stream_tx: mpsc::Sender<(u32, u16, Vec<u8>)>,
connect_tx: mpsc::UnboundedSender<(u32, u16, String, Result<(tokio::net::tcp::OwnedWriteHalf, mpsc::Sender<()>), String>)>,
outbound: Option<OutboundConfig>,
debug: bool,
) -> Result<()> {
match RelayMessage::decode(&payload)? {
RelayMessage::Connect(target) => {
let _ = ui_event_tx.send(UiEvent::Log(format!("Relay CONNECT start for [{session_id}:{stream_id}] -> {target}")));
let stream = connect_target(&target, outbound.as_ref(), debug).await;
match stream {
Ok(stream) => {
let (mut reader, writer) = stream.into_split();
let (cancel_tx, mut cancel_rx) = mpsc::channel::<()>(1);
let tx_clone = stream_tx.clone();
tokio::spawn(async move {
let mut buf = [0_u8; 1024];
loop {
tokio::select! {
_ = cancel_rx.recv() => {
break;
}
read_res = reader.read(&mut buf) => {
match read_res {
Ok(0) => {
let _ = tx_clone.send((session_id, stream_id, Vec::new())).await;
break;
}
Ok(n) => {
if tx_clone.send((session_id, stream_id, buf[..n].to_vec())).await.is_err() {
let target_clone = target.clone();
let connect_tx_clone = connect_tx.clone();
let stream_tx_clone = stream_tx.clone();
let outbound_clone = outbound.clone();
tokio::spawn(async move {
let stream_res = connect_target(&target_clone, outbound_clone.as_ref(), debug).await;
match stream_res {
Ok(stream) => {
let (mut reader, writer) = stream.into_split();
let (cancel_tx, mut cancel_rx) = mpsc::channel::<()>(1);
tokio::spawn(async move {
let mut buf = [0_u8; 4096];
loop {
tokio::select! {
_ = cancel_rx.recv() => break,
read_res = reader.read(&mut buf) => {
match read_res {
Ok(0) | Err(_) => {
let _ = stream_tx_clone.send((session_id, stream_id, Vec::new())).await;
break;
}
}
Err(_) => {
let _ = tx_clone.send((session_id, stream_id, Vec::new())).await;
break;
Ok(n) => {
if stream_tx_clone.send((session_id, stream_id, buf[..n].to_vec())).await.is_err() {
break;
}
}
}
}
}
}
}
});
remotes.insert((session_id, stream_id), RemoteState { writer, cancel_tx });
send_relay_to_stream(session_id, stream_id, RelayMessage::ConnectOk, dispatcher, socket, ui_event_tx).await?;
let _ = ui_event_tx.send(UiEvent::Log(format!("Relay CONNECT ok for [{session_id}:{stream_id}] -> {target}")));
});
let _ = connect_tx_clone.send((session_id, stream_id, target_clone, Ok((writer, cancel_tx))));
}
Err(e) => {
let _ = connect_tx_clone.send((session_id, stream_id, target_clone, Err(e.to_string())));
}
}
Err(err) => {
let _ = ui_event_tx.send(UiEvent::Log(format!("Relay CONNECT failed for [{session_id}:{stream_id}] -> {target}: {err}")));
send_relay_to_stream(
session_id,
stream_id,
RelayMessage::Error(format!("connect failed: {err}")),
dispatcher,
socket,
ui_event_tx,
)
.await?;
}
}
});
}
RelayMessage::Data(data) => {
if let Some(remote) = remotes.get_mut(&(session_id, stream_id)) {
let _ = remote.writer.write_all(&data).await;
let _ = remote.data_tx.send(bytes::Bytes::from(data));
} else {
let _ = ui_event_tx.send(UiEvent::Log(format!("Relay DATA for unknown stream [{session_id}:{stream_id}] ({})", data.len())));
}