import sys import re with open("d:/ospab-projects/ostp/ostp-client/src/bridge.rs", "r", encoding="utf-8") as f: code = f.read() start_idx = code.find(" pub async fn run(") end_idx = -1 brace_count = 0 in_run = False for i in range(start_idx, len(code)): if code[i] == '{': in_run = True brace_count += 1 elif code[i] == '}': if in_run: brace_count -= 1 if brace_count == 0: end_idx = i + 1 break prefix = code[:start_idx] suffix = code[end_idx:] # Define the new run function and helpers new_run_and_helpers = """ pub async fn run( mut self, tx: mpsc::Sender, mut bridge_rx: mpsc::Receiver, mut shutdown: watch::Receiver, mut proxy_rx: mpsc::Receiver, proxy_tx: mpsc::UnboundedSender<(u16, ProxyToClientMsg)>, ) -> Result<()> { let mut metrics_tick = interval(Duration::from_millis(500)); let mut keepalive_tick = tokio::time::interval(Duration::from_secs(self.keepalive_interval_sec.max(1))); let mut retransmit_tick = tokio::time::interval(Duration::from_millis(10)); let init_msg = if self.mode == "tun" { "Bridge initialized (TUN mode)".to_string() } else { "Bridge initialized (proxy mode)".to_string() }; tx.send(UiEvent::Log(init_msg)).await.ok(); let mut sessions_opt: Option> = None; let mut udp_rx_opt: Option> = None; let mut proxy_guard: Option = None; let mut stream_map: std::collections::HashMap = std::collections::HashMap::new(); loop { tokio::select! { biased; _ = shutdown.changed() => { if *shutdown.borrow() { self.running = false; self.metrics.connection_state.store(0, Ordering::Relaxed); proxy_guard = None; sessions_opt = None; udp_rx_opt = None; stream_map.clear(); self.reset_proxy_streams(&tx, &proxy_tx, "manual stop"); break; } } udp_msg = async { match udp_rx_opt.as_mut() { Some(rx) => rx.recv().await, None => std::future::pending().await, } }, if self.running => { self.handle_inbound_udp(udp_msg, &mut sessions_opt, &mut udp_rx_opt, &mut proxy_guard, &mut stream_map, &tx, &proxy_tx).await; } cmd = bridge_rx.recv() => { if !self.handle_bridge_cmd(cmd, &mut sessions_opt, &mut udp_rx_opt, &mut proxy_guard, &mut stream_map, &tx, &proxy_tx).await { break; } } _ = metrics_tick.tick() => { if self.running { self.emit_metrics(&tx).await; } } _ = keepalive_tick.tick() => { if self.running { self.handle_keepalive(&mut sessions_opt, &mut udp_rx_opt, &mut proxy_guard, &mut stream_map, &tx, &proxy_tx, &mut proxy_rx).await; } } _ = retransmit_tick.tick() => { if self.running { self.handle_retransmit(&mut sessions_opt, &mut udp_rx_opt, &mut proxy_guard, &mut stream_map, &tx, &proxy_tx).await; } } proxy_ev = proxy_rx.recv(), if self.running && sessions_opt.as_ref().map(|s| { s.iter().any(|ses| ses.machine.in_flight_count() < ses.machine.cwnd_packets().clamp(16, 16384)) }).unwrap_or(true) => { self.handle_proxy_event(proxy_ev, &mut sessions_opt, &mut stream_map, &tx, &proxy_tx).await; } } } tx.send(UiEvent::Log("Bridge stopped".to_string())).await.ok(); Ok(()) } async fn handle_inbound_udp( &mut self, udp_msg: Option<(usize, Bytes)>, sessions_opt: &mut Option>, udp_rx_opt: &mut Option>, proxy_guard: &mut Option, stream_map: &mut std::collections::HashMap, tx: &mpsc::Sender, proxy_tx: &mpsc::UnboundedSender<(u16, ProxyToClientMsg)>, ) { match udp_msg { Some((session_index, inbound)) => { self.metrics.bytes_recv.fetch_add(inbound.len() as u64, Ordering::Relaxed); self.last_valid_recv = Instant::now(); if let Some(sessions) = sessions_opt.as_mut() { if session_index < sessions.len() { let session = &mut sessions[session_index]; let initial_action = match session.machine.on_event(OstpEvent::Inbound(inbound)) { Ok(a) => a, Err(e) => { let _ = tx.send(UiEvent::Log(format!("Protocol decrypt error: {e}"))).await; tracing::warn!("Inbound protocol error (session {}): {}", session_index, e); return; } }; let mut actions_queue = std::collections::VecDeque::new(); actions_queue.push_back(initial_action); while let Some(current_action) = actions_queue.pop_front() { match current_action { ProtocolAction::Multiple(nested) => { for a in nested { actions_queue.push_back(a); } } ProtocolAction::DeliverApp(stream_id, dec_payload) => { match RelayMessage::decode(&dec_payload) { Ok(relay_msg) => { match relay_msg { RelayMessage::ConnectOk => { let _ = tx.send(UiEvent::Log(format!("Relay CONNECT OK stream_id={stream_id}"))).await; let _ = proxy_tx.send((stream_id, ProxyToClientMsg::ConnectOk)); } RelayMessage::Data(data) => { let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Data(Bytes::from(data)))); } RelayMessage::Close => { let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Close)); } RelayMessage::Error(msg) => { let _ = tx.send(UiEvent::Log(format!("Relay error for stream {stream_id}: {msg}"))).await; let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Error(msg))); } RelayMessage::Pong(ts) => { let now = SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64; self.last_rtt_ms = now.saturating_sub(ts) as f64; self.metrics.rtt_ms.store(self.last_rtt_ms as u32, Ordering::Relaxed); } RelayMessage::UdpAssociate => {} RelayMessage::UdpData(target, data) => { let _ = proxy_tx.send((stream_id, ProxyToClientMsg::UdpData(target, Bytes::from(data)))); } RelayMessage::KeepAlive | RelayMessage::Ping(_) | RelayMessage::Connect(_) => {} } } Err(err) => { let _ = tx.send(UiEvent::Log(format!("Relay decode error for stream {stream_id}: {err}"))).await; let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Error("relay decode failed".to_string()))); } } } ProtocolAction::SendDatagram(frame) => { let _ = send_datagram(&session.socket, &frame, self.transport_mode == "udp" ).await; self.metrics.bytes_sent.fetch_add(frame.len() as u64, Ordering::Relaxed); } _ => {} } } } } } None => { let _ = tx.send(UiEvent::Log("UDP channel closed, resetting connection".to_string())).await; self.running = false; crate::sysproxy::disable_system_proxy(); *sessions_opt = None; *udp_rx_opt = None; stream_map.clear(); self.reset_proxy_streams(&tx, &proxy_tx, "udp reader closed"); let _ = tx.send(UiEvent::TunnelStopped).await; } } } async fn handle_bridge_cmd( &mut self, cmd: Option, sessions_opt: &mut Option>, udp_rx_opt: &mut Option>, proxy_guard: &mut Option, stream_map: &mut std::collections::HashMap, tx: &mpsc::Sender, proxy_tx: &mpsc::UnboundedSender<(u16, ProxyToClientMsg)>, ) -> bool { match cmd { Some(BridgeCommand::ToggleTunnel) => { if self.running { self.running = false; self.metrics.connection_state.store(0, Ordering::Relaxed); *proxy_guard = None; *sessions_opt = None; *udp_rx_opt = None; stream_map.clear(); self.reset_proxy_streams(&tx, &proxy_tx, "manual stop"); tx.send(UiEvent::TunnelStopped).await.ok(); let stop_msg = if self.mode == "tun" { "TUN tunnel stopped" } else { "Bridge stopped" }; tx.send(UiEvent::Log(stop_msg.to_string())).await.ok(); } else { tx.send(UiEvent::Log("Connecting to remote server...".to_string())).await.ok(); tx.send(UiEvent::Metrics { status: ConnectionStatus::Handshaking, rtt_ms: 0.0, throughput_bps: 0 }).await.ok(); self.metrics.connection_state.store(1, Ordering::Relaxed); let session_count = if self.mux_enabled { self.mux_sessions.max(1) } else { 1 }; let (udp_tx, udp_rx) = mpsc::channel(100000); let mut sessions = Vec::with_capacity(session_count); let mut rtt_sum = 0.0; let mut successful_sessions = 0; for idx in 0..session_count { let session_id: u32 = rand::thread_rng().gen(); match self.perform_handshake_with_id(&tx, session_id).await { Ok((sock, mach, rtt)) => { let session_index = sessions.len(); let socket_clone = sock.clone(); let udp_tx_clone = udp_tx.clone(); tokio::spawn(async move { let mut buf = vec![0_u8; 65535]; loop { match socket_clone.recv(&mut buf).await { Ok(n) => { let inbound = Bytes::copy_from_slice(&buf[..n]); if udp_tx_clone.send((session_index, inbound)).await.is_err() { break; } } Err(e) => { tracing::warn!("UDP socket recv error (session {}): {}", session_index, e); tokio::time::sleep(std::time::Duration::from_millis(10)).await; } } } }); sessions.push(SessionState { socket: sock, machine: mach }); rtt_sum += rtt; successful_sessions += 1; } Err(err) => { tx.send(UiEvent::Log(format!("Multiplex session {}/{} handshake failed: {}. Continuing with remaining sessions...", idx + 1, session_count, err))).await.ok(); } } } if sessions.is_empty() { *proxy_guard = None; tx.send(UiEvent::Log("All multiplexed handshake attempts failed. Connection aborted.".to_string())).await.ok(); tx.send(UiEvent::TunnelStopped).await.ok(); self.metrics.connection_state.store(0, Ordering::Relaxed); return True; } *udp_rx_opt = Some(udp_rx); *sessions_opt = Some(sessions); self.last_rtt_ms = rtt_sum / successful_sessions as f64; self.running = true; self.last_sample_at = Instant::now(); self.last_valid_recv = Instant::now(); let sys_proxy_addr = self.proxy_addr.replace("0.0.0.0:", "127.0.0.1:"); *proxy_guard = Some(crate::sysproxy::SystemProxyGuard::enable(&sys_proxy_addr)); tx.send(UiEvent::Metrics { status: ConnectionStatus::Established, rtt_ms: self.last_rtt_ms, throughput_bps: 0, }).await.ok(); self.metrics.connection_state.store(2, Ordering::Relaxed); let start_msg = if self.mode == "tun" { "TUN tunnel established" } else { "Connection established" }; tx.send(UiEvent::Log(start_msg.to_string())).await.ok(); for session in sessions_opt.as_mut().unwrap().iter_mut() { let ts = SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64; let ping_payload = Bytes::from(RelayMessage::Ping(ts).encode()); if let Ok(ProtocolAction::SendDatagram(frame)) = session.machine.on_event(OstpEvent::Outbound(0, ping_payload)) { let _ = send_datagram(&session.socket, &frame, self.transport_mode == "udp").await; self.metrics.bytes_sent.fetch_add(frame.len() as u64, Ordering::Relaxed); } } } } Some(BridgeCommand::NextProfile) => { self.profile = next_profile(self.profile); tx.send(UiEvent::ProfileChanged(self.profile)).await.ok(); tx.send(UiEvent::Log(format!("Obfuscation profile switched to {:?}", self.profile))).await.ok(); } Some(BridgeCommand::NetworkChanged) => { if self.running { let _ = tx.send(UiEvent::Log("Network changed — starting immediate reconnect".to_string())).await; self.metrics.connection_state.store(1, Ordering::Relaxed); self.last_valid_recv = Instant::now() - Duration::from_secs(100); let session_count = if self.mux_enabled { self.mux_sessions.max(1) } else { 1 }; let (udp_tx, udp_rx) = mpsc::channel(100000); let mut new_sessions = Vec::with_capacity(session_count); let mut successful_sessions = 0; let mut rtt_sum = 0.0; for idx in 0..session_count { let session_id: u32 = rand::thread_rng().gen(); match self.perform_handshake_with_id(&tx, session_id).await { Ok((sock, mach, rtt)) => { let session_index = new_sessions.len(); let socket_clone = sock.clone(); let udp_tx_clone = udp_tx.clone(); tokio::spawn(async move { let mut buf = vec![0_u8; 65535]; loop { match socket_clone.recv(&mut buf).await { Ok(n) => { let inbound = Bytes::copy_from_slice(&buf[..n]); if udp_tx_clone.send((session_index, inbound)).await.is_err() { break; } } Err(e) => { tracing::warn!("UDP recv error (network-change session {}): {}", session_index, e); tokio::time::sleep(std::time::Duration::from_millis(10)).await; } } } }); new_sessions.push(SessionState { socket: sock, machine: mach }); rtt_sum += rtt; successful_sessions += 1; } Err(err) => { let _ = tx.send(UiEvent::Log(format!("NetworkChanged reconnect session {}/{} failed: {}", idx + 1, session_count, err))).await; } } } if !new_sessions.is_empty() { *sessions_opt = Some(new_sessions); *udp_rx_opt = Some(udp_rx); self.last_rtt_ms = rtt_sum / successful_sessions as f64; self.last_valid_recv = Instant::now(); stream_map.clear(); self.reset_proxy_streams(&tx, &proxy_tx, "network changed"); self.metrics.connection_state.store(2, Ordering::Relaxed); let _ = tx.send(UiEvent::Log("NetworkChanged reconnect successful!".to_string())).await; } else { let _ = tx.send(UiEvent::Log("NetworkChanged reconnect failed — will retry on keepalive tick".to_string())).await; } } } Some(BridgeCommand::ReloadConfig) => { match ClientConfig::reload_from_json_near_binary() { Ok(cfg) => { self.apply_runtime_config(&cfg); tx.send(UiEvent::Log("Runtime config reloaded".to_string())).await.ok(); if self.running { self.running = false; self.metrics.connection_state.store(0, Ordering::Relaxed); *proxy_guard = None; *sessions_opt = None; stream_map.clear(); self.reset_proxy_streams(&tx, &proxy_tx, "config reload"); let _ = tx.send(UiEvent::TunnelStopped).await; } } Err(err) => { let _ = tx.send(UiEvent::Log(format!("Config reload failed: {err}"))).await; } } } Some(BridgeCommand::Shutdown) | None => { self.running = false; *proxy_guard = None; return False; } } True } async fn handle_keepalive( &mut self, sessions_opt: &mut Option>, udp_rx_opt: &mut Option>, proxy_guard: &mut Option, stream_map: &mut std::collections::HashMap, tx: &mpsc::Sender, proxy_tx: &mpsc::UnboundedSender<(u16, ProxyToClientMsg)>, proxy_rx: &mut mpsc::Receiver, ) { if self.last_valid_recv.elapsed().as_secs() > 25 { let elapsed = self.last_valid_recv.elapsed().as_secs(); if elapsed > 180 { let _ = tx.send(UiEvent::Log("Connection permanently lost (3-minute hard timeout). Stopping tunnel.".into())).await; self.running = false; *proxy_guard = None; *sessions_opt = None; stream_map.clear(); self.reset_proxy_streams(&tx, &proxy_tx, "keepalive hard timeout"); let _ = tx.send(UiEvent::TunnelStopped).await; self.metrics.connection_state.store(0, Ordering::Relaxed); return; } let _ = tx.send(UiEvent::Log(format!("Connection stall detected ({}s silence). Attempting background reconnect...", elapsed))).await; self.metrics.connection_state.store(1, Ordering::Relaxed); let session_count = if self.mux_enabled { self.mux_sessions.max(1) } else { 1 }; let (udp_tx, udp_rx) = mpsc::channel(100000); let mut new_sessions = Vec::with_capacity(session_count); let mut successful_sessions = 0; let mut rtt_sum = 0.0; for idx in 0..session_count { let session_id: u32 = rand::thread_rng().gen(); match self.perform_handshake_with_id(&tx, session_id).await { Ok((sock, mach, rtt)) => { let session_index = new_sessions.len(); let socket_clone = sock.clone(); let udp_tx_clone = udp_tx.clone(); tokio::spawn(async move { let mut buf = vec![0_u8; 65535]; loop { match socket_clone.recv(&mut buf).await { Ok(n) => { let inbound = Bytes::copy_from_slice(&buf[..n]); if udp_tx_clone.send((session_index, inbound)).await.is_err() { break; } } Err(e) => { tracing::warn!("UDP socket recv error (reconnect session {}): {}", session_index, e); tokio::time::sleep(std::time::Duration::from_millis(10)).await; } } } }); new_sessions.push(SessionState { socket: sock, machine: mach }); rtt_sum += rtt; successful_sessions += 1; } Err(err) => { let _ = tx.send(UiEvent::Log(format!("Background reconnect session {}/{} failed: {}", idx + 1, session_count, err))).await; } } } if !new_sessions.is_empty() { *sessions_opt = Some(new_sessions); *udp_rx_opt = Some(udp_rx); self.last_rtt_ms = rtt_sum / successful_sessions as f64; self.last_valid_recv = Instant::now(); self.metrics.connection_state.store(2, Ordering::Relaxed); let _ = tx.send(UiEvent::Log("Background reconnect successful! Connection restored.".into())).await; for session in sessions_opt.as_mut().unwrap().iter_mut() { let ts = SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64; let ping_payload = Bytes::from(RelayMessage::Ping(ts).encode()); if let Ok(ProtocolAction::SendDatagram(frame)) = session.machine.on_event(OstpEvent::Outbound(0, ping_payload)) { let _ = send_datagram(&session.socket, &frame, self.transport_mode == "udp").await; self.metrics.bytes_sent.fetch_add(frame.len() as u64, Ordering::Relaxed); } } stream_map.clear(); self.reset_proxy_streams(&tx, &proxy_tx, "background reconnect"); let mut flushed = 0; while let Ok(stale) = proxy_rx.try_recv() { if let ProxyEvent::NewStream { stream_id, .. } = stale { let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Error("connection reset".into()))); } flushed += 1; } if flushed > 0 { let _ = tx.send(UiEvent::Log(format!("Flushed {} stale proxy messages to prevent UDP burst", flushed))).await; } } else { let _ = tx.send(UiEvent::Log("Background reconnect failed. Will retry on next tick...".into())).await; } } if let Some(sessions) = sessions_opt.as_mut() { for session in sessions.iter_mut() { let ts = SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64; let ping_payload = Bytes::from(RelayMessage::Ping(ts).encode()); if let Ok(ProtocolAction::SendDatagram(frame)) = session.machine.on_event(OstpEvent::Outbound(0, ping_payload)) { let _ = send_datagram(&session.socket, &frame, self.transport_mode == "udp" ).await; self.metrics.bytes_sent.fetch_add(frame.len() as u64, Ordering::Relaxed); } let ka_payload = Bytes::from(RelayMessage::KeepAlive.encode()); if let Ok(ProtocolAction::SendDatagram(frame)) = session.machine.on_event(OstpEvent::Outbound(0, ka_payload)) { let _ = send_datagram(&session.socket, &frame, self.transport_mode == "udp" ).await; self.metrics.bytes_sent.fetch_add(frame.len() as u64, Ordering::Relaxed); } } } } async fn handle_retransmit( &mut self, sessions_opt: &mut Option>, udp_rx_opt: &mut Option>, proxy_guard: &mut Option, stream_map: &mut std::collections::HashMap, tx: &mpsc::Sender, proxy_tx: &mpsc::UnboundedSender<(u16, ProxyToClientMsg)>, ) { let mut fatal_err = None; if let Some(sessions) = sessions_opt.as_mut() { for session in sessions.iter_mut() { match session.machine.on_event(OstpEvent::Tick) { Ok(action) => { let mut queue = vec![action]; while let Some(current_action) = queue.pop() { match current_action { ProtocolAction::Multiple(nested) => { for a in nested { queue.push(a); } } ProtocolAction::SendDatagram(frame) => { let _ = send_datagram(&session.socket, &frame, self.transport_mode == "udp" ).await; self.metrics.bytes_sent.fetch_add(frame.len() as u64, Ordering::Relaxed); } _ => {} } } } Err(e) => { fatal_err = Some(e); break; } } } } if let Some(e) = fatal_err { let _ = tx.send(UiEvent::Log(format!("Protocol tick fatal error: {e}"))).await; self.running = false; *proxy_guard = None; *sessions_opt = None; *udp_rx_opt = None; stream_map.clear(); self.reset_proxy_streams(&tx, &proxy_tx, "protocol fatal error"); let _ = tx.send(UiEvent::TunnelStopped).await; self.metrics.connection_state.store(0, Ordering::Relaxed); } } async fn handle_proxy_event( &mut self, proxy_ev: Option, sessions_opt: &mut Option>, stream_map: &mut std::collections::HashMap, tx: &mpsc::Sender, proxy_tx: &mpsc::UnboundedSender<(u16, ProxyToClientMsg)>, ) { if let Some(ev) = proxy_ev { if let Some(sessions) = sessions_opt.as_mut() { if sessions.is_empty() { if let ProxyEvent::NewStream { stream_id, .. } = ev { let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Error("tunnel stopped".into()))); } return; } let (stream_id, relay_msg, is_close) = match ev { ProxyEvent::NewStream { stream_id, target } => { let _ = tx.send(UiEvent::Log(format!("Proxy CONNECT stream_id={stream_id} target={target}"))).await; (stream_id, RelayMessage::Connect(target), false) } ProxyEvent::UdpAssociate { stream_id } => { let _ = tx.send(UiEvent::Log(format!("Proxy UDP ASSOCIATE stream_id={stream_id}"))).await; (stream_id, RelayMessage::UdpAssociate, false) } ProxyEvent::UdpData { stream_id, target, payload } => { (stream_id, RelayMessage::UdpData(target, payload.to_vec()), false) } ProxyEvent::Data { stream_id, payload } => (stream_id, RelayMessage::Data(payload.to_vec()), false), ProxyEvent::Close { stream_id } => { let _ = tx.send(UiEvent::Log(format!("Proxy CLOSE stream_id={stream_id}"))).await; (stream_id, RelayMessage::Close, true) } }; let len = sessions.len(); let session_index = *stream_map.entry(stream_id).or_insert_with(|| { rand::thread_rng().gen_range(0..len) }); if is_close { stream_map.remove(&stream_id); } let session = &mut sessions[session_index]; let out_payload = Bytes::from(relay_msg.encode()); match session.machine.on_event(OstpEvent::Outbound(stream_id, out_payload)) { Ok(ProtocolAction::SendDatagram(frame)) => { if send_datagram(&session.socket, &frame, self.transport_mode == "udp" ).await.is_ok() { self.metrics.bytes_sent.fetch_add(frame.len() as u64, Ordering::Relaxed); tracing::trace!("Outbound datagram sent stream_id={stream_id} bytes={}", frame.len()); } } Ok(ProtocolAction::Multiple(list)) => { let mut sent = 0usize; for item in list { if let ProtocolAction::SendDatagram(frame) = item { if send_datagram(&session.socket, &frame, self.transport_mode == "udp" ).await.is_ok() { self.metrics.bytes_sent.fetch_add(frame.len() as u64, Ordering::Relaxed); sent += 1; } } } tracing::trace!("Outbound datagram batch stream_id={stream_id} sent={sent}"); } Ok(ProtocolAction::Noop) => { tracing::trace!("Outbound datagram noop stream_id={stream_id}"); } Ok(_) => { tracing::trace!("Outbound datagram unexpected action stream_id={stream_id}"); } Err(e) => { tracing::warn!("Protocol error packing outbound stream_id={}: {}", stream_id, e); let _ = tx.send(UiEvent::Log(format!("Protocol error packing TCP: {e}"))).await; } } } else { if let ProxyEvent::NewStream { stream_id, .. } = ev { let _ = proxy_tx.send((stream_id, ProxyToClientMsg::Error("tunnel stopped".into()))); } } } } """ with open("d:/ospab-projects/ostp/ostp-client/src/bridge.rs", "w", encoding="utf-8") as f: f.write(prefix + new_run_and_helpers + suffix) print("Done")