mirror of https://github.com/ospab/ostp.git
fix(client): wait for handshake response in dial_tcp before sending data
This commit is contained in:
parent
01d7d19b11
commit
6eb7b369a0
|
|
@ -63,6 +63,8 @@ fn random_session_id() -> u32 {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn dial_tcp(
|
pub async fn dial_tcp(
|
||||||
|
target_host: &str,
|
||||||
|
target_port: u16,
|
||||||
server: &str,
|
server: &str,
|
||||||
port: u16,
|
port: u16,
|
||||||
access_key: &str,
|
access_key: &str,
|
||||||
|
|
@ -80,12 +82,46 @@ pub async fn dial_tcp(
|
||||||
let config = make_initiator_config(session_id, access_key, transport_cfg);
|
let config = make_initiator_config(session_id, access_key, transport_cfg);
|
||||||
let mut machine = ProtocolMachine::new(config).unwrap();
|
let mut machine = ProtocolMachine::new(config).unwrap();
|
||||||
|
|
||||||
|
let target_host_str = target_host.to_string();
|
||||||
|
|
||||||
// Spawn bridge task
|
// Spawn bridge task
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
// Send initial handshake
|
// Send initial handshake
|
||||||
if let Ok(action) = machine.on_event(OstpEvent::Start) {
|
if let Ok(action) = machine.on_event(OstpEvent::Start) {
|
||||||
handle_action(action, &transport, &mut server_stream).await;
|
handle_action(action, &transport, &mut server_stream).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait for handshake response (server sends HandshakePayload back)
|
||||||
|
let mut buf = [0u8; 8192];
|
||||||
|
let mut handshake_success = false;
|
||||||
|
match tokio::time::timeout(
|
||||||
|
std::time::Duration::from_millis(3000),
|
||||||
|
transport.recv(&mut buf),
|
||||||
|
).await {
|
||||||
|
Ok(Ok(n)) => {
|
||||||
|
if let Ok(action) = machine.on_event(OstpEvent::Inbound(bytes::Bytes::copy_from_slice(&buf[..n]))) {
|
||||||
|
handle_action(action, &transport, &mut server_stream).await;
|
||||||
|
handshake_success = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
tracing::warn!("TCP handshake timeout for {}:{}", server, port);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !handshake_success {
|
||||||
|
tracing::warn!("TCP handshake failed or protocol machine error");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send connection request
|
||||||
|
let connect_msg = ostp_core::relay::RelayMessage::Connect(format!("{}:{}", target_host_str, target_port));
|
||||||
|
let connect_encoded = connect_msg.encode();
|
||||||
|
if let Ok(action) = machine.on_event(OstpEvent::Outbound(1, bytes::Bytes::from(connect_encoded))) {
|
||||||
|
handle_action(action, &transport, &mut server_stream).await;
|
||||||
|
}
|
||||||
|
|
||||||
let mut buf = [0u8; 65535];
|
let mut buf = [0u8; 65535];
|
||||||
let mut udp_buf = [0u8; 65535];
|
let mut udp_buf = [0u8; 65535];
|
||||||
|
|
||||||
|
|
@ -93,7 +129,9 @@ pub async fn dial_tcp(
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
Ok(n) = server_stream.read(&mut buf) => {
|
Ok(n) = server_stream.read(&mut buf) => {
|
||||||
if n == 0 { break; }
|
if n == 0 { break; }
|
||||||
if let Ok(action) = machine.on_event(OstpEvent::Outbound(1, bytes::Bytes::copy_from_slice(&buf[..n]))) {
|
let data_msg = ostp_core::relay::RelayMessage::Data(buf[..n].to_vec());
|
||||||
|
let encoded = data_msg.encode();
|
||||||
|
if let Ok(action) = machine.on_event(OstpEvent::Outbound(1, bytes::Bytes::from(encoded))) {
|
||||||
handle_action(action, &transport, &mut server_stream).await;
|
handle_action(action, &transport, &mut server_stream).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -162,16 +200,17 @@ pub async fn handle_udp(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send relay connect + data
|
// Send relay UdpAssociate + data
|
||||||
let relay_msg = ostp_core::relay::RelayMessage::Connect(
|
let assoc_msg = ostp_core::relay::RelayMessage::UdpAssociate;
|
||||||
format!("{}:{}", target_dst.ip(), target_dst.port())
|
let encoded = assoc_msg.encode();
|
||||||
);
|
|
||||||
let encoded = relay_msg.encode();
|
|
||||||
if let Ok(action) = machine.on_event(OstpEvent::Outbound(1, bytes::Bytes::from(encoded))) {
|
if let Ok(action) = machine.on_event(OstpEvent::Outbound(1, bytes::Bytes::from(encoded))) {
|
||||||
handle_udp_action(action, &transport).await;
|
handle_udp_action(action, &transport).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
let data_msg = ostp_core::relay::RelayMessage::Data(payload.to_vec());
|
let data_msg = ostp_core::relay::RelayMessage::UdpData(
|
||||||
|
format!("{}:{}", target_dst.ip(), target_dst.port()),
|
||||||
|
payload.to_vec()
|
||||||
|
);
|
||||||
let encoded = data_msg.encode();
|
let encoded = data_msg.encode();
|
||||||
if let Ok(action) = machine.on_event(OstpEvent::Outbound(1, bytes::Bytes::from(encoded))) {
|
if let Ok(action) = machine.on_event(OstpEvent::Outbound(1, bytes::Bytes::from(encoded))) {
|
||||||
handle_udp_action(action, &transport).await;
|
handle_udp_action(action, &transport).await;
|
||||||
|
|
@ -184,7 +223,10 @@ pub async fn handle_udp(
|
||||||
transport.recv(&mut buf),
|
transport.recv(&mut buf),
|
||||||
).await {
|
).await {
|
||||||
Ok(Ok(n)) => {
|
Ok(Ok(n)) => {
|
||||||
let _ = machine.on_event(OstpEvent::Inbound(bytes::Bytes::copy_from_slice(&buf[..n])));
|
if let Ok(action) = machine.on_event(OstpEvent::Inbound(bytes::Bytes::copy_from_slice(&buf[..n]))) {
|
||||||
|
// Just process incoming UDP response internally
|
||||||
|
let _ = action;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
_ => break,
|
_ => break,
|
||||||
}
|
}
|
||||||
|
|
@ -238,15 +280,24 @@ async fn handle_action(action: ProtocolAction, transport: &crate::transport::Tra
|
||||||
let _ = transport.send(&data).await;
|
let _ = transport.send(&data).await;
|
||||||
}
|
}
|
||||||
ProtocolAction::DeliverApp(_stream_id, payload) => {
|
ProtocolAction::DeliverApp(_stream_id, payload) => {
|
||||||
let _ = server_stream.write_all(&payload).await;
|
if let Ok(msg) = ostp_core::relay::RelayMessage::decode(&payload) {
|
||||||
|
match msg {
|
||||||
|
ostp_core::relay::RelayMessage::Data(data) => {
|
||||||
|
let _ = server_stream.write_all(&data).await;
|
||||||
|
}
|
||||||
|
ostp_core::relay::RelayMessage::ConnectOk => {
|
||||||
|
tracing::debug!("TCP Connection established successfully");
|
||||||
|
}
|
||||||
|
ostp_core::relay::RelayMessage::Error(err) => {
|
||||||
|
tracing::warn!("Server returned TCP connection error: {}", err);
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
ProtocolAction::Multiple(actions) => {
|
ProtocolAction::Multiple(actions) => {
|
||||||
for a in actions {
|
for a in actions {
|
||||||
match a {
|
Box::pin(handle_action(a, transport, server_stream)).await;
|
||||||
ProtocolAction::SendDatagram(data) => { let _ = transport.send(&data).await; }
|
|
||||||
ProtocolAction::DeliverApp(_stream_id, payload) => { let _ = server_stream.write_all(&payload).await; }
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue