feat: implement server-side UoT and MTU tuning

This commit is contained in:
ospab 2026-05-21 02:23:49 +03:00
parent 112ddfee59
commit ceb760e4ce
9 changed files with 253 additions and 6 deletions

3
Cargo.lock generated
View File

@ -1083,12 +1083,15 @@ version = "0.2.6"
dependencies = [
"anyhow",
"axum",
"base64",
"bytes",
"hmac",
"ostp-core",
"portable-atomic",
"rand",
"serde",
"serde_json",
"sha2",
"socket2",
"tokio",
"tower-http",

View File

@ -77,6 +77,7 @@ pub struct Bridge {
pub transport_mode: String,
pub stealth_sni: String,
pub stealth_port: u16,
pub mtu: usize,
metrics: Arc<BridgeMetrics>,
sample_sent: u64,
@ -110,6 +111,7 @@ impl Bridge {
transport_mode: config.transport.mode.clone(),
stealth_sni: config.transport.stealth_sni.clone(),
stealth_port: config.transport.stealth_port,
mtu: config.ostp.mtu,
metrics,
sample_sent: 0,
@ -784,6 +786,7 @@ impl Bridge {
max_sent_history: 32768, // Reduced: gap recovery handles unrecoverable frames
handshake_pad_min: secrets.handshake_pad_min,
handshake_pad_max: secrets.handshake_pad_max,
mtu: self.mtu,
})?;
let resolved_addrs: Vec<std::net::SocketAddr> = match tokio::net::lookup_host(&self.server_addr).await {

View File

@ -46,8 +46,12 @@ pub struct OstpConfig {
pub access_key: String,
pub handshake_timeout_ms: u64,
pub io_timeout_ms: u64,
#[serde(default = "default_mtu")]
pub mtu: usize,
}
fn default_mtu() -> usize { 1350 }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalProxyConfig {
pub bind_addr: String,
@ -100,6 +104,7 @@ impl Default for OstpConfig {
access_key: String::new(),
handshake_timeout_ms: 5000,
io_timeout_ms: 2500,
mtu: default_mtu(),
}
}
}
@ -148,6 +153,7 @@ struct RawUnifiedConfig {
debug: Option<bool>,
server: Option<String>,
access_key: Option<String>,
mtu: Option<usize>,
socks5_bind: Option<String>,
tun: Option<RawTunSection>,
exclude: Option<RawExcludeSection>,
@ -199,6 +205,7 @@ impl ClientConfig {
let is_tun = raw.tun.as_ref().and_then(|t| t.enable).unwrap_or(false);
let server = raw.server.unwrap_or_else(|| "127.0.0.1:50000".to_string());
let key = raw.access_key.unwrap_or_default();
let mtu = raw.mtu.unwrap_or(default_mtu());
let socks5 = raw.socks5_bind.unwrap_or_else(|| "127.0.0.1:1088".to_string());
let exclusions = raw.exclude.unwrap_or(RawExcludeSection {
domains: None,
@ -219,6 +226,7 @@ impl ClientConfig {
access_key: key,
handshake_timeout_ms: 5000,
io_timeout_ms: 2500,
mtu,
},
local_proxy: LocalProxyConfig {
bind_addr: socks5,

View File

@ -38,6 +38,7 @@ pub struct ProtocolConfig {
/// Different access keys produce different handshake packet sizes.
pub handshake_pad_min: usize,
pub handshake_pad_max: usize,
pub mtu: usize,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -129,7 +130,7 @@ impl ProtocolMachine {
sent_history: VecDeque::with_capacity(config.max_sent_history.max(1)),
session_id: config.session_id,
handshake_payload: config.handshake_payload,
padder: AdaptivePadder::new(1200, config.max_padding, config.padding_strategy),
padder: AdaptivePadder::new(config.mtu, config.max_padding, config.padding_strategy),
obfuscation_key: config.obfuscation_key,
max_reorder: config.max_reorder.max(1),
max_reorder_buffer: config.max_reorder_buffer.max(1),
@ -141,7 +142,7 @@ impl ProtocolMachine {
last_ack_sent: Instant::now(),
last_nack_sent: Instant::now() - Duration::from_secs(1),
last_recv_advance: Instant::now(),
cc: CongestionController::new(1200),
cc: CongestionController::new(config.mtu as u64),
handshake_pad_min: config.handshake_pad_min.max(8),
handshake_pad_max: config.handshake_pad_max.max(config.handshake_pad_min + 16),
})

View File

@ -17,3 +17,6 @@ socket2 = "0.6.3"
axum = "0.8"
tower-http = { version = "0.6", features = ["cors"] }
portable-atomic.workspace = true
hmac.workspace = true
sha2.workspace = true
base64 = "0.22"

View File

@ -14,6 +14,7 @@ mod dispatcher;
pub mod outbound;
pub mod api;
pub mod fallback;
pub mod transport;
mod relay;
mod signal;
@ -145,6 +146,7 @@ pub async fn run_server(
// Defaults -- overridden per-session by dispatcher using derive_all_secrets()
handshake_pad_min: 32,
handshake_pad_max: 128,
mtu: 1350,
};
let dispatcher = Dispatcher::new(protocol_config, shared_keys.clone());
@ -212,7 +214,7 @@ pub async fn run_server(
tracing::info!(listeners = bind_addrs.len(), keys = key_count, "server started");
tracing::info!("ARQ config: max_reorder=16384, reorder_buf=8192, sent_history=32768, rto=100ms");
tokio::select! {
res = run_server_loop(primary_socket, sockets, dispatcher, ui_cmd_rx, ui_event_tx, shared_keys, outbound, debug) => {
res = run_server_loop(bind_addrs.clone(), primary_socket, sockets, dispatcher, ui_cmd_rx, ui_event_tx, shared_keys, outbound, debug) => {
if let Err(e) = res {
tracing::error!("Server error: {e}");
}
@ -228,6 +230,7 @@ pub async fn run_server(
// ── Server main loop ─────────────────────────────────────────────────────────
async fn run_server_loop(
bind_addrs: Vec<String>,
primary_socket: std::sync::Arc<UdpSocket>,
sockets: Vec<std::sync::Arc<UdpSocket>>,
mut dispatcher: Dispatcher,
@ -241,6 +244,8 @@ async fn run_server_loop(
let (stream_tx, mut stream_rx) = mpsc::unbounded_channel::<(u32, u16, Vec<u8>)>();
let (connect_tx, mut connect_rx) = mpsc::unbounded_channel::<(u32, u16, String, Result<(tokio::net::tcp::OwnedWriteHalf, mpsc::Sender<()>), String>)>();
let tcp_map = std::sync::Arc::new(tokio::sync::RwLock::new(HashMap::new()));
let socket = primary_socket;
// Spawn a recv task for each socket, all feeding into the same channel
let (udp_tx, mut udp_rx) = mpsc::channel(10000);
@ -262,6 +267,35 @@ async fn run_server_loop(
}
});
}
// Spawn UoT (TCP) listeners
for bind_addr in &bind_addrs {
let addr = bind_addr.parse::<std::net::SocketAddr>().unwrap();
let tcp_map_clone = tcp_map.clone();
let shared_keys_clone = shared_keys.clone();
let udp_tx_clone = udp_tx.clone();
tokio::spawn(async move {
if let Ok(listener) = tokio::net::TcpListener::bind(&addr).await {
tracing::info!("TCP (UoT) listener bound to {}", addr);
loop {
if let Ok((stream, peer_addr)) = listener.accept().await {
let tm = tcp_map_clone.clone();
let keys = shared_keys_clone.clone();
let tx = udp_tx_clone.clone();
tokio::spawn(async move {
if let Err(e) = crate::transport::uot::handle_tcp_connection(stream, peer_addr, keys, tx, tm).await {
tracing::debug!("UoT connection from {} failed: {}", peer_addr, e);
}
});
}
}
} else {
tracing::warn!("Failed to bind TCP (UoT) listener to {}", addr);
}
});
}
drop(udp_tx); // Drop the original sender so the channel closes when all tasks end
if debug {
@ -317,7 +351,17 @@ async fn run_server_loop(
for resp in responses {
let resp_len = resp.len();
let mut sent_tcp = false;
{
let map = tcp_map.read().await;
if let Some(tx) = map.get(&peer_addr) {
let _ = tx.try_send(resp.clone());
sent_tcp = true;
}
}
if !sent_tcp {
let _ = socket.send_to(&resp, peer_addr).await?;
}
let _ = ui_event_tx.send(UiEvent::Tx { peer: peer_ip, bytes: resp_len });
}
@ -391,8 +435,18 @@ async fn run_server_loop(
}
let (frames, dropped_sessions) = dispatcher.on_tick();
for (frame, peer_addr) in frames {
let mut sent_tcp = false;
{
let map = tcp_map.read().await;
if let Some(tx) = map.get(&peer_addr) {
let _ = tx.try_send(frame.clone());
sent_tcp = true;
}
}
if !sent_tcp {
let _ = socket.send_to(&frame, peer_addr).await?;
}
}
for sid in dropped_sessions {
let _ = ui_event_tx.send(UiEvent::Log(format!("Session {sid} expired, releasing resources")));
let mut streams_to_cancel = Vec::new();

View File

@ -0,0 +1 @@
pub mod uot;

View File

@ -0,0 +1,171 @@
use anyhow::{Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use hmac::{Hmac, Mac};
use sha2::Sha256;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, RwLock as StdRwLock};
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::sync::{mpsc, RwLock};
use tracing::{info, warn};
pub async fn handle_tcp_connection(
mut stream: TcpStream,
peer_addr: SocketAddr,
shared_keys: Arc<StdRwLock<HashMap<String, ()>>>,
udp_tx: mpsc::Sender<(Bytes, SocketAddr)>,
tcp_map: Arc<RwLock<HashMap<SocketAddr, mpsc::Sender<Bytes>>>>,
) -> Result<()> {
// 1. Read HTTP Handshake
let mut buf = [0u8; 4096];
let mut header_len = 0;
loop {
let n = stream.read(&mut buf[header_len..]).await?;
if n == 0 {
anyhow::bail!("connection closed before handshake complete");
}
header_len += n;
if buf[..header_len].windows(4).any(|w| w == b"\r\n\r\n") {
break;
}
if header_len == buf.len() {
anyhow::bail!("handshake headers too large");
}
}
let headers_str = String::from_utf8_lossy(&buf[..header_len]);
// Fast-fail scanner bots
if !headers_str.starts_with("GET /stream HTTP/1.1\r\n") {
send_404(&mut stream).await?;
anyhow::bail!("invalid request line");
}
// Extract Authorization or Cookie for signature
let mut signature_base64 = None;
for line in headers_str.lines() {
let lower = line.to_ascii_lowercase();
if lower.starts_with("authorization: bearer ") {
signature_base64 = Some(line[22..].trim().to_string());
} else if lower.starts_with("cookie: ostp_token=") {
signature_base64 = Some(line[19..].trim().to_string());
}
}
let sig_b64 = match signature_base64 {
Some(s) => s,
None => {
send_404(&mut stream).await?;
anyhow::bail!("missing authorization");
}
};
let sig_bytes = match base64::Engine::decode(&base64::engine::general_purpose::STANDARD_NO_PAD, &sig_b64) {
Ok(b) => b,
Err(_) => {
send_404(&mut stream).await?;
anyhow::bail!("invalid base64 signature");
}
};
if sig_bytes.len() < 8 {
send_404(&mut stream).await?;
anyhow::bail!("signature too short");
}
let ts_bytes: [u8; 8] = sig_bytes[0..8].try_into().unwrap();
let client_ts = u64::from_be_bytes(ts_bytes);
let provided_mac = &sig_bytes[8..];
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
if client_ts > now + 30 || client_ts < now.saturating_sub(60) {
send_404(&mut stream).await?;
anyhow::bail!("timestamp out of bounds (replay protection)");
}
// Verify HMAC against known keys
let keys = {
let guard = shared_keys.read().unwrap();
guard.keys().cloned().collect::<Vec<_>>()
};
let mut authenticated = false;
for key in keys {
let mut mac = Hmac::<Sha256>::new_from_slice(key.as_bytes())
.unwrap_or_else(|_| Hmac::<Sha256>::new_from_slice(b"default").unwrap());
mac.update(&ts_bytes);
if mac.verify_slice(provided_mac).is_ok() {
authenticated = true;
break;
}
}
if !authenticated {
send_404(&mut stream).await?;
anyhow::bail!("unauthorized (invalid HMAC)");
}
// Reply 200 OK
let response = "HTTP/1.1 200 OK\r\nConnection: keep-alive\r\n\r\n";
stream.write_all(response.as_bytes()).await?;
info!("UoT client authenticated from {}", peer_addr);
// Register this connection in the map
let (tx, mut rx) = mpsc::channel::<Bytes>(1024);
{
tcp_map.write().await.insert(peer_addr, tx);
}
// Process streams
let (mut read_half, mut write_half) = stream.into_split();
// Spawn writer task
let peer_clone = peer_addr;
let tcp_map_clone = tcp_map.clone();
let writer_task = tokio::spawn(async move {
while let Some(packet) = rx.recv().await {
let mut out = BytesMut::with_capacity(2 + packet.len());
out.put_u16(packet.len() as u16);
out.put_slice(&packet);
if write_half.write_all(&out).await.is_err() {
break;
}
}
// Cleanup on writer exit
tcp_map_clone.write().await.remove(&peer_clone);
});
// Reader loop
let mut len_buf = [0u8; 2];
loop {
if read_half.read_exact(&mut len_buf).await.is_err() {
break;
}
let len = u16::from_be_bytes(len_buf) as usize;
let mut packet_buf = vec![0u8; len];
if read_half.read_exact(&mut packet_buf).await.is_err() {
break;
}
if udp_tx.send((Bytes::from(packet_buf), peer_addr)).await.is_err() {
break;
}
}
writer_task.abort();
tcp_map.write().await.remove(&peer_addr);
Ok(())
}
async fn send_404(stream: &mut TcpStream) -> Result<()> {
let body = "Not Found";
let resp = format!(
"HTTP/1.1 404 Not Found\r\nContent-Type: text/plain\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
body.len(),
body
);
let _ = stream.write_all(resp.as_bytes()).await;
Ok(())
}

View File

@ -59,6 +59,7 @@ fn parse_ostp_link(link: &str) -> Result<ClientConfig> {
Ok(ClientConfig {
server,
access_key,
mtu: None,
socks5_bind: Some("127.0.0.1:1088".to_string()), // Fallback to standard SOCKS5 port
tun: Some(TunConfig {
enable: false, // Default to proxy, configurable via settings GUI
@ -188,6 +189,7 @@ struct FallbackCfg {
struct ClientConfig {
server: String,
access_key: String,
mtu: Option<usize>,
socks5_bind: Option<String>,
tun: Option<TunConfig>,
turn: Option<TurnConfigRaw>,
@ -634,6 +636,7 @@ async fn run_client_directly(client_cfg: ClientConfig) -> Result<()> {
access_key: client_cfg.access_key.clone(),
handshake_timeout_ms: 5000,
io_timeout_ms: 5000,
mtu: client_cfg.mtu.unwrap_or(1350),
},
local_proxy: ostp_client::config::LocalProxyConfig {
bind_addr: client_cfg.socks5_bind.clone().unwrap_or_else(|| "127.0.0.1:1088".to_string()),