mirror of https://github.com/ospab/ostp.git
Fix empty handshake payload and dummy keys in ostp outbound client
This commit is contained in:
parent
ce9f11a35e
commit
f8f27d366d
|
|
@ -5,6 +5,63 @@ use crate::config::{TransportConfig, MultiplexConfig};
|
|||
use ostp_core::{OstpEvent, ProtocolAction, ProtocolConfig, ProtocolMachine};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
/// Build the handshake payload the server expects:
|
||||
/// [timestamp_u64_be (8 bytes)] [session_id_u32_be (4 bytes)] [access_key bytes]
|
||||
fn build_handshake_payload(session_id: u32, access_key: &str) -> Vec<u8> {
|
||||
let ts = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs();
|
||||
let mut payload = Vec::with_capacity(12 + access_key.len());
|
||||
payload.extend_from_slice(&ts.to_be_bytes());
|
||||
payload.extend_from_slice(&session_id.to_be_bytes());
|
||||
payload.extend_from_slice(access_key.as_bytes());
|
||||
payload
|
||||
}
|
||||
|
||||
/// Build a correctly configured ProtocolConfig for an outgoing OSTP connection.
|
||||
fn make_initiator_config(
|
||||
session_id: u32,
|
||||
access_key: &str,
|
||||
transport_cfg: &TransportConfig,
|
||||
) -> ProtocolConfig {
|
||||
let secrets = ostp_core::crypto::derive_all_secrets(access_key.as_bytes());
|
||||
let payload = build_handshake_payload(session_id, access_key);
|
||||
|
||||
let mtu = match transport_cfg.r#type.as_str() {
|
||||
"dns" => 1100,
|
||||
_ => 1350,
|
||||
};
|
||||
|
||||
ProtocolConfig {
|
||||
role: ostp_core::NoiseRole::Initiator,
|
||||
psk: secrets.psk,
|
||||
session_id,
|
||||
handshake_payload: payload,
|
||||
max_padding: 256,
|
||||
padding_strategy: ostp_core::framing::PaddingStrategy::Adaptive,
|
||||
obfuscation_key: secrets.obfuscation_key,
|
||||
max_reorder: 16384,
|
||||
max_reorder_buffer: 8192,
|
||||
ack_delay_ms: 5,
|
||||
rto_ms: 100,
|
||||
max_retries: 8,
|
||||
max_sent_history: 32768,
|
||||
handshake_pad_min: secrets.handshake_pad_min,
|
||||
handshake_pad_max: secrets.handshake_pad_max,
|
||||
mtu,
|
||||
}
|
||||
}
|
||||
|
||||
fn random_session_id() -> u32 {
|
||||
use std::collections::hash_map::DefaultHasher;
|
||||
use std::hash::{Hash, Hasher};
|
||||
let mut h = DefaultHasher::new();
|
||||
std::time::Instant::now().hash(&mut h);
|
||||
std::thread::current().id().hash(&mut h);
|
||||
h.finish() as u32
|
||||
}
|
||||
|
||||
pub async fn dial_tcp(
|
||||
server: &str,
|
||||
port: u16,
|
||||
|
|
@ -17,48 +74,15 @@ pub async fn dial_tcp(
|
|||
let client_stream = tokio::net::TcpStream::connect(local_addr).await?;
|
||||
let (mut server_stream, _) = listener.accept().await?;
|
||||
|
||||
let transport = match transport_cfg.r#type.as_str() {
|
||||
"dns" => {
|
||||
let domain = transport_cfg.domain.clone().unwrap_or_else(|| "tunnel.example.com".to_string());
|
||||
let resolver = transport_cfg.resolver.clone().unwrap_or_else(|| "8.8.8.8".to_string());
|
||||
crate::transport::dns::start_dns_transport(domain, resolver, transport_cfg.pubkey.clone()).await?
|
||||
}
|
||||
// Fallback to UDP for now if unknown
|
||||
_ => {
|
||||
let udp = tokio::net::UdpSocket::bind("0.0.0.0:0").await?;
|
||||
udp.connect((server, port)).await?;
|
||||
crate::transport::Transport::Udp(std::sync::Arc::new(udp))
|
||||
}
|
||||
};
|
||||
|
||||
let mut psk = [0u8; 32];
|
||||
let key_bytes = access_key.as_bytes();
|
||||
let len = key_bytes.len().min(32);
|
||||
psk[..len].copy_from_slice(&key_bytes[..len]);
|
||||
|
||||
let config = ProtocolConfig {
|
||||
role: ostp_core::NoiseRole::Initiator,
|
||||
psk,
|
||||
session_id: 1,
|
||||
handshake_payload: vec![],
|
||||
max_padding: 0,
|
||||
padding_strategy: ostp_core::framing::PaddingStrategy::Fixed(0),
|
||||
obfuscation_key: [0; 8],
|
||||
max_reorder: 16384,
|
||||
max_reorder_buffer: 8192,
|
||||
ack_delay_ms: 10,
|
||||
rto_ms: 100,
|
||||
max_retries: 5,
|
||||
max_sent_history: 32768,
|
||||
handshake_pad_min: 8,
|
||||
handshake_pad_max: 24,
|
||||
mtu: 1400,
|
||||
};
|
||||
let transport = make_transport(transport_cfg, server, port).await?;
|
||||
|
||||
let session_id = random_session_id();
|
||||
let config = make_initiator_config(session_id, access_key, transport_cfg);
|
||||
let mut machine = ProtocolMachine::new(config).unwrap();
|
||||
|
||||
// Spawn bridge task
|
||||
tokio::spawn(async move {
|
||||
// Send initial handshake
|
||||
if let Ok(action) = machine.on_event(OstpEvent::Start) {
|
||||
handle_action(action, &transport, &mut server_stream).await;
|
||||
}
|
||||
|
|
@ -100,63 +124,53 @@ pub async fn handle_udp(
|
|||
transport_cfg: &TransportConfig,
|
||||
_multiplex: &MultiplexConfig,
|
||||
) -> Result<()> {
|
||||
let transport = match transport_cfg.r#type.as_str() {
|
||||
"dns" => {
|
||||
let domain = transport_cfg.domain.clone().unwrap_or_else(|| "tunnel.example.com".to_string());
|
||||
let resolver = transport_cfg.resolver.clone().unwrap_or_else(|| "8.8.8.8".to_string());
|
||||
crate::transport::dns::start_dns_transport(domain, resolver, transport_cfg.pubkey.clone()).await?
|
||||
let transport = make_transport(transport_cfg, server, port).await?;
|
||||
|
||||
// Derive session_id from client source addr for stable per-flow sessions
|
||||
let ip_bytes = match client_src.ip() {
|
||||
std::net::IpAddr::V4(v4) => {
|
||||
let o = v4.octets();
|
||||
u32::from_be_bytes(o)
|
||||
}
|
||||
_ => {
|
||||
let udp = tokio::net::UdpSocket::bind("0.0.0.0:0").await?;
|
||||
udp.connect((server, port)).await?;
|
||||
crate::transport::Transport::Udp(std::sync::Arc::new(udp))
|
||||
std::net::IpAddr::V6(v6) => {
|
||||
let o = v6.octets();
|
||||
u32::from_be_bytes([o[12], o[13], o[14], o[15]])
|
||||
}
|
||||
};
|
||||
let session_id = ip_bytes ^ (client_src.port() as u32);
|
||||
|
||||
let mut psk = [0u8; 32];
|
||||
let key_bytes = access_key.as_bytes();
|
||||
let len = key_bytes.len().min(32);
|
||||
psk[..len].copy_from_slice(&key_bytes[..len]);
|
||||
|
||||
let config = ProtocolConfig {
|
||||
role: ostp_core::NoiseRole::Initiator,
|
||||
psk,
|
||||
session_id: u32::from_ne_bytes([
|
||||
client_src.ip().to_string().as_bytes().get(0).copied().unwrap_or(0),
|
||||
client_src.ip().to_string().as_bytes().get(1).copied().unwrap_or(0),
|
||||
client_src.ip().to_string().as_bytes().get(2).copied().unwrap_or(0),
|
||||
client_src.ip().to_string().as_bytes().get(3).copied().unwrap_or(0),
|
||||
]),
|
||||
handshake_payload: vec![],
|
||||
max_padding: 0,
|
||||
padding_strategy: ostp_core::framing::PaddingStrategy::Fixed(0),
|
||||
obfuscation_key: [0; 8],
|
||||
max_reorder: 4096,
|
||||
max_reorder_buffer: 2048,
|
||||
ack_delay_ms: 50,
|
||||
rto_ms: 200,
|
||||
max_retries: 3,
|
||||
max_sent_history: 8192,
|
||||
handshake_pad_min: 8,
|
||||
handshake_pad_max: 24,
|
||||
mtu: 1400,
|
||||
};
|
||||
|
||||
let config = make_initiator_config(session_id, access_key, transport_cfg);
|
||||
let mut machine = ProtocolMachine::new(config)?;
|
||||
|
||||
// Send initial packet with UDP payload
|
||||
// Send handshake first
|
||||
if let Ok(action) = machine.on_event(OstpEvent::Start) {
|
||||
handle_udp_action(action, &transport).await;
|
||||
}
|
||||
|
||||
// Send the actual UDP payload
|
||||
let relay_msg = ostp_core::relay::RelayMessage::Connect(format!("{}:{}", target_dst.ip(), target_dst.port()));
|
||||
// Wait for handshake response (server sends HandshakePayload back)
|
||||
let mut buf = [0u8; 8192];
|
||||
match tokio::time::timeout(
|
||||
std::time::Duration::from_millis(2000),
|
||||
transport.recv(&mut buf),
|
||||
).await {
|
||||
Ok(Ok(n)) => {
|
||||
let _ = machine.on_event(OstpEvent::Inbound(bytes::Bytes::copy_from_slice(&buf[..n])));
|
||||
}
|
||||
_ => {
|
||||
tracing::warn!("UDP handshake timeout for {}:{}", server, port);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// Send relay connect + data
|
||||
let relay_msg = ostp_core::relay::RelayMessage::Connect(
|
||||
format!("{}:{}", target_dst.ip(), target_dst.port())
|
||||
);
|
||||
let encoded = relay_msg.encode();
|
||||
if let Ok(action) = machine.on_event(OstpEvent::Outbound(1, bytes::Bytes::from(encoded))) {
|
||||
handle_udp_action(action, &transport).await;
|
||||
}
|
||||
|
||||
// Send data packet
|
||||
let data_msg = ostp_core::relay::RelayMessage::Data(payload.to_vec());
|
||||
let encoded = data_msg.encode();
|
||||
if let Ok(action) = machine.on_event(OstpEvent::Outbound(1, bytes::Bytes::from(encoded))) {
|
||||
|
|
@ -165,10 +179,9 @@ pub async fn handle_udp(
|
|||
|
||||
// Keep-alive for a short time to receive response
|
||||
for _ in 0..5 {
|
||||
let mut buf = [0u8; 8192];
|
||||
match tokio::time::timeout(
|
||||
std::time::Duration::from_millis(100),
|
||||
transport.recv(&mut buf)
|
||||
transport.recv(&mut buf),
|
||||
).await {
|
||||
Ok(Ok(n)) => {
|
||||
let _ = machine.on_event(OstpEvent::Inbound(bytes::Bytes::copy_from_slice(&buf[..n])));
|
||||
|
|
@ -180,6 +193,27 @@ pub async fn handle_udp(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn make_transport(
|
||||
transport_cfg: &TransportConfig,
|
||||
server: &str,
|
||||
port: u16,
|
||||
) -> Result<crate::transport::Transport> {
|
||||
match transport_cfg.r#type.as_str() {
|
||||
"dns" => {
|
||||
let domain = transport_cfg.domain.clone()
|
||||
.unwrap_or_else(|| "tunnel.example.com".to_string());
|
||||
let resolver = transport_cfg.resolver.clone()
|
||||
.unwrap_or_else(|| "8.8.8.8".to_string());
|
||||
crate::transport::dns::start_dns_transport(domain, resolver, transport_cfg.pubkey.clone()).await
|
||||
}
|
||||
_ => {
|
||||
let udp = tokio::net::UdpSocket::bind("0.0.0.0:0").await?;
|
||||
udp.connect((server, port)).await?;
|
||||
Ok(crate::transport::Transport::Udp(std::sync::Arc::new(udp)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_udp_action(action: ProtocolAction, transport: &crate::transport::Transport) {
|
||||
match action {
|
||||
ProtocolAction::SendDatagram(data) => {
|
||||
|
|
|
|||
|
|
@ -392,12 +392,34 @@ impl ProtocolMachine {
|
|||
self.last_recv_advance = Instant::now();
|
||||
} else {
|
||||
// Gap detected
|
||||
if self.reorder_buffer.len() >= self.max_reorder_buffer {
|
||||
tracing::warn!("Reorder buffer full ({}/{}), forcing gap recovery to prevent packet drops",
|
||||
self.reorder_buffer.len(), self.max_reorder_buffer
|
||||
);
|
||||
if let Some(&first_buffered) = self.reorder_buffer.keys().next() {
|
||||
let skipped = first_buffered.saturating_sub(self.expected_recv_nonce);
|
||||
self.expected_recv_nonce = first_buffered;
|
||||
self.last_recv_advance = Instant::now();
|
||||
|
||||
let mut delivered = 0u64;
|
||||
while let Some(buffered_action) = self.reorder_buffer.remove(&self.expected_recv_nonce) {
|
||||
app_actions.push(buffered_action);
|
||||
self.expected_recv_nonce = self.expected_recv_nonce.saturating_add(1);
|
||||
delivered += 1;
|
||||
}
|
||||
self.ack_pending = true;
|
||||
tracing::debug!("Forced Gap recovery: skipped {} lost frames, delivered {} buffered frames", skipped, delivered);
|
||||
}
|
||||
}
|
||||
|
||||
if nonce >= self.expected_recv_nonce {
|
||||
if self.reorder_buffer.len() < self.max_reorder_buffer {
|
||||
self.reorder_buffer.insert(nonce, action);
|
||||
} else {
|
||||
tracing::warn!("Reorder buffer full ({}/{}), dropping frame nonce={}",
|
||||
self.reorder_buffer.len(), self.max_reorder_buffer, nonce
|
||||
);
|
||||
tracing::warn!("Reorder buffer still full after gap recovery, dropping frame nonce={}", nonce);
|
||||
}
|
||||
} else {
|
||||
tracing::debug!("Frame nonce={} arrived too late after gap recovery, dropping", nonce);
|
||||
}
|
||||
|
||||
// Rate-limited NACK: send at most once per 30ms to prevent retransmit storms.
|
||||
|
|
|
|||
Loading…
Reference in New Issue