fix(client): fix catastrophic channel loopback in UoT transport that echoed packets locally

This commit is contained in:
ospab 2026-05-21 18:24:48 +03:00
parent 6c4006c48c
commit 3ffa057d03
1 changed files with 5 additions and 6 deletions

View File

@ -93,13 +93,12 @@ where
R: tokio::io::AsyncRead + Unpin + Send + 'static, R: tokio::io::AsyncRead + Unpin + Send + 'static,
W: tokio::io::AsyncWrite + Unpin + Send + 'static, W: tokio::io::AsyncWrite + Unpin + Send + 'static,
{ {
let (app_tx, bridge_rx) = mpsc::channel::<Bytes>(1024); let (app_tx, mut tx_rx) = mpsc::channel::<Bytes>(1024);
let (bridge_tx, app_rx) = mpsc::channel::<Bytes>(1024); let (rx_tx, app_rx) = mpsc::channel::<Bytes>(1024);
// TX Loop (App -> UoT -> Network): prefix each frame with u16 BE length // TX Loop (App -> UoT -> Network): prefix each frame with u16 BE length
tokio::spawn(async move { tokio::spawn(async move {
let mut rx = bridge_rx; while let Some(frame) = tx_rx.recv().await {
while let Some(frame) = rx.recv().await {
let len = frame.len() as u16; let len = frame.len() as u16;
if net_tx.write_u16(len).await.is_err() { break; } if net_tx.write_u16(len).await.is_err() { break; }
if net_tx.write_all(&frame).await.is_err() { break; } if net_tx.write_all(&frame).await.is_err() { break; }
@ -128,11 +127,11 @@ where
} }
let packet = buffer.split_to(2 + len); let packet = buffer.split_to(2 + len);
if app_tx.send(Bytes::from(packet[2..].to_vec())).await.is_err() { if rx_tx.send(Bytes::from(packet[2..].to_vec())).await.is_err() {
break; break;
} }
} }
}); });
Ok((bridge_tx, Arc::new(tokio::sync::Mutex::new(app_rx)))) Ok((app_tx, Arc::new(tokio::sync::Mutex::new(app_rx))))
} }