diff --git a/ostp-client/src/tunnel/outbounds/ostp.rs b/ostp-client/src/tunnel/outbounds/ostp.rs index f5efbf1..b975907 100644 --- a/ostp-client/src/tunnel/outbounds/ostp.rs +++ b/ostp-client/src/tunnel/outbounds/ostp.rs @@ -63,6 +63,8 @@ fn random_session_id() -> u32 { } pub async fn dial_tcp( + target_host: &str, + target_port: u16, server: &str, port: u16, access_key: &str, @@ -80,12 +82,46 @@ pub async fn dial_tcp( let config = make_initiator_config(session_id, access_key, transport_cfg); let mut machine = ProtocolMachine::new(config).unwrap(); + let target_host_str = target_host.to_string(); + // Spawn bridge task tokio::spawn(async move { // Send initial handshake if let Ok(action) = machine.on_event(OstpEvent::Start) { handle_action(action, &transport, &mut server_stream).await; } + + // Wait for handshake response (server sends HandshakePayload back) + let mut buf = [0u8; 8192]; + let mut handshake_success = false; + match tokio::time::timeout( + std::time::Duration::from_millis(3000), + transport.recv(&mut buf), + ).await { + Ok(Ok(n)) => { + if let Ok(action) = machine.on_event(OstpEvent::Inbound(bytes::Bytes::copy_from_slice(&buf[..n]))) { + handle_action(action, &transport, &mut server_stream).await; + handshake_success = true; + } + } + _ => { + tracing::warn!("TCP handshake timeout for {}:{}", server, port); + return; + } + } + + if !handshake_success { + tracing::warn!("TCP handshake failed or protocol machine error"); + return; + } + + // Send connection request + let connect_msg = ostp_core::relay::RelayMessage::Connect(format!("{}:{}", target_host_str, target_port)); + let connect_encoded = connect_msg.encode(); + if let Ok(action) = machine.on_event(OstpEvent::Outbound(1, bytes::Bytes::from(connect_encoded))) { + handle_action(action, &transport, &mut server_stream).await; + } + let mut buf = [0u8; 65535]; let mut udp_buf = [0u8; 65535]; @@ -93,7 +129,9 @@ pub async fn dial_tcp( tokio::select! { Ok(n) = server_stream.read(&mut buf) => { if n == 0 { break; } - if let Ok(action) = machine.on_event(OstpEvent::Outbound(1, bytes::Bytes::copy_from_slice(&buf[..n]))) { + let data_msg = ostp_core::relay::RelayMessage::Data(buf[..n].to_vec()); + let encoded = data_msg.encode(); + if let Ok(action) = machine.on_event(OstpEvent::Outbound(1, bytes::Bytes::from(encoded))) { handle_action(action, &transport, &mut server_stream).await; } } @@ -162,16 +200,17 @@ pub async fn handle_udp( } } - // Send relay connect + data - let relay_msg = ostp_core::relay::RelayMessage::Connect( - format!("{}:{}", target_dst.ip(), target_dst.port()) - ); - let encoded = relay_msg.encode(); + // Send relay UdpAssociate + data + let assoc_msg = ostp_core::relay::RelayMessage::UdpAssociate; + let encoded = assoc_msg.encode(); if let Ok(action) = machine.on_event(OstpEvent::Outbound(1, bytes::Bytes::from(encoded))) { handle_udp_action(action, &transport).await; } - let data_msg = ostp_core::relay::RelayMessage::Data(payload.to_vec()); + let data_msg = ostp_core::relay::RelayMessage::UdpData( + format!("{}:{}", target_dst.ip(), target_dst.port()), + payload.to_vec() + ); let encoded = data_msg.encode(); if let Ok(action) = machine.on_event(OstpEvent::Outbound(1, bytes::Bytes::from(encoded))) { handle_udp_action(action, &transport).await; @@ -184,7 +223,10 @@ pub async fn handle_udp( transport.recv(&mut buf), ).await { Ok(Ok(n)) => { - let _ = machine.on_event(OstpEvent::Inbound(bytes::Bytes::copy_from_slice(&buf[..n]))); + if let Ok(action) = machine.on_event(OstpEvent::Inbound(bytes::Bytes::copy_from_slice(&buf[..n]))) { + // Just process incoming UDP response internally + let _ = action; + } } _ => break, } @@ -238,15 +280,24 @@ async fn handle_action(action: ProtocolAction, transport: &crate::transport::Tra let _ = transport.send(&data).await; } ProtocolAction::DeliverApp(_stream_id, payload) => { - let _ = server_stream.write_all(&payload).await; + if let Ok(msg) = ostp_core::relay::RelayMessage::decode(&payload) { + match msg { + ostp_core::relay::RelayMessage::Data(data) => { + let _ = server_stream.write_all(&data).await; + } + ostp_core::relay::RelayMessage::ConnectOk => { + tracing::debug!("TCP Connection established successfully"); + } + ostp_core::relay::RelayMessage::Error(err) => { + tracing::warn!("Server returned TCP connection error: {}", err); + } + _ => {} + } + } } ProtocolAction::Multiple(actions) => { for a in actions { - match a { - ProtocolAction::SendDatagram(data) => { let _ = transport.send(&data).await; } - ProtocolAction::DeliverApp(_stream_id, payload) => { let _ = server_stream.write_all(&payload).await; } - _ => {} - } + Box::pin(handle_action(a, transport, server_stream)).await; } } _ => {}