Patch netstack-smoltcp locally to fix catastrophic UDP tunnel stream crash on invalid packets

This commit is contained in:
ospab 2026-05-30 21:34:31 +03:00
parent 9095f0dacd
commit 95a36e2bdf
28 changed files with 2874 additions and 2 deletions

2
Cargo.lock generated
View File

@ -1289,8 +1289,6 @@ dependencies = [
[[package]]
name = "netstack-smoltcp"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "398691cef792b89eb5d29e6ea6b3999def706b908d355e29815ba8101cf5c4c8"
dependencies = [
"etherparse",
"futures",

View File

@ -26,3 +26,6 @@ tracing = "0.1"
sha2 = "0.10"
hmac = "0.12"
portable-atomic = "1.10"
[patch.crates-io]
netstack-smoltcp = { path = "netstack-smoltcp" }

View File

@ -0,0 +1 @@
{"v":1}

View File

@ -0,0 +1,6 @@
{
"git": {
"sha1": "702f6dfe124c5e4d343cfd3ca5a3efe0446cf6f0"
},
"path_in_vcs": ""
}

View File

@ -0,0 +1,37 @@
name: Setup Android NDK and Rust compiler ENV
description: Setup an Android_NDK_HOME environment by downloading and Rust compiler environment.
inputs:
rust-target:
description: Rust target to build
required: true
sdk-version:
description: Exact SDK version to use
default: "33"
ndk-version:
description: Exact NDK version to use
default: "25"
ndk-platform:
description: Which host platform to use
default: "linux"
runs:
using: "composite"
steps:
- name: Download Android NDK
run: curl --http1.1 -O https://dl.google.com/android/repository/android-ndk-r${{ inputs.ndk-version }}-${{ inputs.ndk-platform }}.zip
shell: bash
- name: Extract Android NDK
run: unzip -q android-ndk-r${{ inputs.ndk-version }}-${{ inputs.ndk-platform }}.zip
shell: bash
- name: Set Rust compiler ENV
run: |
ndk_home=${{ github.workspace }}/android-ndk-r${{ inputs.ndk-version }}
platform=$(ls ${ndk_home}/toolchains/llvm/prebuilt/ | head -1)
ndk_tool=${ndk_home}/toolchains/llvm/prebuilt/${platform}/bin
envvar_suffix=$(echo ${{ inputs.rust-target }} | sed "s/-/_/g")
upper_suffix=$(echo ${envvar_suffix} | tr '[:lower:]' '[:upper:]')
tool_prefix=${{ inputs.rust-target }}${{ inputs.sdk-version }}
echo "ANDROID_NDK_HOME=${ndk_home}" >> $GITHUB_ENV
echo "CC_${envvar_suffix}=${ndk_tool}/${tool_prefix}-clang" >> $GITHUB_ENV
echo "AR_${envvar_suffix}=${ndk_tool}/llvm-ar" >> $GITHUB_ENV
echo "CARGO_TARGET_${upper_suffix}_LINKER=${ndk_tool}/${tool_prefix}-clang" >> $GITHUB_ENV
shell: bash

View File

@ -0,0 +1,80 @@
name: CI
on:
push:
branches:
- '**'
pull_request:
branches:
- '**'
env:
CARGO_INCREMENTAL: 0
CARGO_REGISTRIES_CRATES_IO_PROTOCOL: sparse
jobs:
test:
name: Test
runs-on: ${{ matrix.os }}
strategy:
matrix:
include:
- build: linux-amd64
os: ubuntu-latest
target: x86_64-unknown-linux-gnu
- build: android-arm64
os: ubuntu-latest
target: aarch64-linux-android
no_run: --no-run
- build: android-amd64
os: ubuntu-latest
target: x86_64-linux-android
no_run: --no-run
- build: macos-amd64
os: macos-latest
target: x86_64-apple-darwin
- build: macos-arm64
os: macos-14
target: aarch64-apple-darwin
- build: ios-arm64
os: macos-latest
target: aarch64-apple-ios
no_run: --no-run
- build: windows-amd64
os: windows-latest
target: x86_64-pc-windows-msvc
- build: windows-arm64
os: windows-latest
target: aarch64-pc-windows-msvc
no_run: --no-run
steps:
- uses: actions/checkout@v4
- name: Install Rust (rustup)
run: |
set -euxo pipefail
rustup toolchain install stable --no-self-update --profile minimal --target ${{ matrix.target }}
rustup default stable
shell: bash
- uses: Swatinem/rust-cache@v2
- name: Setup android environment
if: contains(matrix.build, 'android')
uses: ./.github/actions/ndk-dev-rs
with:
rust-target: ${{ matrix.target }}
- run: cargo test ${{ matrix.no_run }} --workspace --target ${{ matrix.target }}
- run: cargo test ${{ matrix.no_run }} --workspace --target ${{ matrix.target }} --release
msrv_n_clippy:
name: MSRV & Clippy & Rustfmt
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- run: cargo fmt -- --check
- run: cargo clippy --all-features -- -D warnings
- run: cargo check --lib -p netstack-smoltcp
- run: cargo check --lib -p netstack-smoltcp --all-features

View File

@ -0,0 +1,15 @@
on:
push:
tags:
- '*'
jobs:
publish:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Publish to crates.io
run: |
cargo publish
env:
CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }}

9
netstack-smoltcp/.gitignore vendored Normal file
View File

@ -0,0 +1,9 @@
/target
/Cargo.lock
.idea
.VSCodeCounter/
.vscode
.DS_Store
*.iml
**/*.log

136
netstack-smoltcp/Cargo.toml Normal file
View File

@ -0,0 +1,136 @@
# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO
#
# When uploading crates to the registry Cargo will automatically
# "normalize" Cargo.toml files for maximal compatibility
# with all versions of Cargo and also rewrite `path` dependencies
# to registry (e.g., crates.io) dependencies.
#
# If you are reading this file be aware that the original Cargo.toml
# will likely look very different (and much more reasonable).
# See Cargo.toml.orig for the original contents.
[package]
edition = "2021"
rust-version = "1.75.0"
name = "netstack-smoltcp"
version = "0.2.2"
authors = ["cavivie <cavivie@gmail.com>"]
build = false
autolib = false
autobins = false
autoexamples = false
autotests = false
autobenches = false
description = """
A netstack for the special purpose of turning packets from/to a TUN interface
into TCP streams and UDP packets. It uses smoltcp-rs as the backend netstack.
"""
homepage = "https://github.com/cavivie/netstack-smoltcp"
documentation = "https://docs.rs/netstack-smoltcp"
readme = "README.md"
keywords = [
"netstack",
"smoltcp",
"network",
"ip",
"tun",
]
categories = ["network-programming"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/cavivie/netstack-smoltcp"
[lib]
name = "netstack_smoltcp"
path = "src/lib.rs"
[[example]]
name = "forward"
path = "examples/forward.rs"
[[example]]
name = "forward-offload-linux"
path = "examples/forward-offload-linux.rs"
[[test]]
name = "regression"
path = "tests/regression.rs"
[dependencies.etherparse]
version = "0.16"
[dependencies.futures]
version = "0.3"
[dependencies.rand]
version = "0.8"
[dependencies.smoltcp]
version = "0.12"
features = [
"std",
"log",
"medium-ip",
"proto-ipv4",
"proto-ipv6",
"socket-icmp",
"socket-udp",
"socket-tcp",
]
default-features = false
[dependencies.spin]
version = "0.9"
[dependencies.tokio]
version = "1"
features = [
"sync",
"time",
"rt",
"macros",
]
[dependencies.tokio-util]
version = "0.7.10"
[dependencies.tracing]
version = "0.1"
features = ["std"]
default-features = false
[dev-dependencies.socket2]
version = "0.5.6"
[dev-dependencies.socket2-ext]
version = "0.1"
[dev-dependencies.structopt]
version = "0.3"
[dev-dependencies.tokio]
version = "1"
features = [
"rt",
"macros",
"rt-multi-thread",
"io-util",
]
[dev-dependencies.tracing]
version = "0.1"
features = ["std"]
default-features = false
[dev-dependencies.tracing-subscriber]
version = "0.3.18"
[dev-dependencies.tun-rs]
version = "2"
features = [
"async",
"async_framed",
]
[dev-dependencies.tun2]
version = "3"
features = ["async"]

View File

@ -0,0 +1,51 @@
[package]
name = "netstack-smoltcp"
version = "0.2.2"
edition = "2021"
authors = ["cavivie <cavivie@gmail.com>"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/cavivie/netstack-smoltcp"
homepage = "https://github.com/cavivie/netstack-smoltcp"
documentation = "https://docs.rs/netstack-smoltcp"
keywords = ["netstack", "smoltcp", "network", "ip", "tun"]
categories = ["network-programming"]
description = """
A netstack for the special purpose of turning packets from/to a TUN interface
into TCP streams and UDP packets. It uses smoltcp-rs as the backend netstack.
"""
rust-version = "1.75.0"
[dependencies]
tracing = { version = "0.1", default-features = false, features = ["std"] }
tokio = { version = "1", features = ["sync", "time", "rt", "macros"] }
tokio-util = "0.7.10"
etherparse = "0.16"
futures = "0.3"
rand = "0.8"
spin = "0.9"
smoltcp = { version = "0.12", default-features = false, features = [
"std",
"log",
"medium-ip",
"proto-ipv4",
"proto-ipv6",
"socket-icmp",
"socket-udp",
"socket-tcp",
] }
[dev-dependencies]
tun2 = { version = "3", features = ["async"] }
# has better performance on linux than tun2
tun-rs = { version = "2", features = ["async", "async_framed"] }
tokio = { version = "1", features = [
"rt",
"macros",
"rt-multi-thread",
"io-util",
] }
tracing = { version = "0.1", default-features = false, features = ["std"] }
tracing-subscriber = "0.3.18"
structopt = "0.3"
socket2 = "0.5.6"
socket2-ext = { version = "0.1" }

View File

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1,25 @@
Copyright (c) 2024 cavivie and netstack-smoltcp Contributors
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

136
netstack-smoltcp/README.md Normal file
View File

@ -0,0 +1,136 @@
# Netstack Smoltcp
A netstack for the special purpose of turning packets from/to a TUN interface into TCP streams and UDP packets. It uses smoltcp-rs as the backend netstack.
[![Crates.io][crates-badge]][crates-url]
[![MIT licensed][mit-badge]][mit-url]
[![Apache licensed, Version 2.0][apache-badge]][apache-url]
[![Build Status][actions-badge]][actions-url]
[crates-badge]: https://img.shields.io/crates/v/netstack-smoltcp.svg
[crates-url]: https://crates.io/crates/netstack-smoltcp
[mit-badge]: https://img.shields.io/badge/license-MIT-blue.svg
[mit-url]: https://github.com/automesh-network/netstack-smoltcp/blob/master/LICENSE-MIT
[apache-badge]: https://img.shields.io/badge/license-APACHE2.0-blue.svg
[apache-url]: https://github.com/automesh-network/netstack-smoltcp/blob/master/LICENSE-APACHE
[actions-badge]: https://github.com/automesh-network/netstack-smoltcp/workflows/CI/badge.svg
[actions-url]: https://github.com/automesh-network/netstack-smoltcp/actions?query=workflow%3ACI+branch%3Amain
## Features
- Supports Future Send and non-Send, mostly pepole use Send.
- Supports ICMP protocol drive by TCP runner to use ICMP ping.
- Supports filtering packets by source and destination IP addresses.
- Can read IP packets from netstack, write IP packets to netstack.
- Can receive TcpStream from TcpListener exposed from netstack.
- Can receive UDP datagram from UdpSocket exposed from netstack.
- Implements popular future streaming traits and asynchronous IO traits:
* TcpListener implements futures Stream/Sink trait
* TcpStream implements tokio AsyncRead/AsyncWrite trait
* UdpSocket(ReadHalf/WriteHalf) implements futures Stream/Sink trait.
## Platforms
This crate provides lightweight netstack support for Linux, iOS, macOS, Android and Windows.
Currently, it works on most targets, but mainly tested the popular platforms which includes:
- linux-amd64: x86_64-unknown-linux-gnu
- android-arm64: aarch64-linux-android
- android-amd64: x86_64-linux-android
- macos-amd64: x86_64-apple-darwin
- macos-arm64: aarch64-apple-darwin
- ios-arm64: aarch64-apple-ios
- windows-amd64: x86_64-pc-windows-msvc
- windows-arm64: aarch64-pc-windows-msvc
## Example
```rust
// let device = tun2::create_as_async(&cfg)?;
// let framed = device.into_framed();
let (stack, runner, udp_socket, tcp_listener) = netstack_smoltcp::StackBuilder::default()
.stack_buffer_size(512)
.tcp_buffer_size(4096)
.enable_udp(true)
.enable_tcp(true)
.enable_icmp(true)
.mtu(9000) // virtual device usually benefits from larger MTU
.build()
.unwrap();
let mut udp_socket = udp_socket.unwrap(); // udp enabled
let mut tcp_listener = tcp_listener.unwrap(); // tcp/icmp enabled
if let Some(runner) = runner {
tokio::spawn(runner);
}
let (mut stack_sink, mut stack_stream) = stack.split();
let (mut tun_sink, mut tun_stream) = framed.split();
// Reads packet from stack and sends to TUN.
tokio::spawn(async move {
while let Some(pkt) = stack_stream.next().await {
if let Ok(pkt) = pkt {
tun_sink.send(pkt).await.unwrap();
}
}
});
// Reads packet from TUN and sends to stack.
tokio::spawn(async move {
while let Some(pkt) = tun_stream.next().await {
if let Ok(pkt) = pkt {
stack_sink.send(pkt).await.unwrap();
}
}
});
// Extracts TCP connections from stack and sends them to the dispatcher.
tokio::spawn(async move {
handle_inbound_stream(tcp_listener).await;
});
// Receive and send UDP packets between netstack and NAT manager. The NAT
// manager would maintain UDP sessions and send them to the dispatcher.
tokio::spawn(async move {
handle_inbound_datagram(udp_socket).await;
});
```
## Performance
Typically, `netstack-smoltcp` will be used with an tun device, so a careful choice of TUN crate matters.
[tun-rs](https://github.com/tun-rs/tun-rs) have better performance on **Linux** than [rust-tun](https://github.com/meh/rust-tun/) due to GSO/GRO which allow you to process the packets in batches.
`bash scripts/bench-offload.sh` could tell that `tun-rs` boosts the performance by 4x. Try it out on your Linux machine!
The example for using `tun-rs` with `netstack-smoltcp` could be found at [forward-offload-linux.rs](examples/forward-offload-linux.rs)
For further tuning, refer to `tun-rs`'s detailed [README](https://github.com/tun-rs/tun-rs/blob/main/README.md)
## License
This project is licensed under either of
* Apache License, Version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
https://www.apache.org/licenses/LICENSE-2.0)
* MIT license ([LICENSE-MIT](LICENSE-MIT) or
https://opensource.org/licenses/MIT)
at your option.
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in netstack-smoltcp by you, as defined in the Apache-2.0 license,
shall be dual licensed as above, without any additional terms or conditions.
## Inspired By
Special thanks to these amazing projects that inspired netstack-smoltcp (in no particular order):
- [shadowsocks-rust](https://github.com/shadowsocks/shadowsocks-rust/)
- [netstack-lwip](https://github.com/eycorsican/netstack-lwip/)
- [rust-tun-active](https://github.com/tun2proxy/rust-tun)
- [rust-tun](https://github.com/meh/rust-tun/)
- [tun-rs](https://github.com/tun-rs/tun-rs)
- [smoltcp](https://github.com/smoltcp-rs/smoltcp)

View File

@ -0,0 +1,239 @@
#[cfg(target_os = "linux")]
mod inner {
use futures::{SinkExt, StreamExt};
use netstack_smoltcp::{StackBuilder, TcpListener, UdpSocket};
use std::{net::SocketAddr, sync::Arc};
use structopt::StructOpt;
use tokio::net::{TcpSocket, TcpStream};
use tracing::{error, info, warn};
use tun_rs::{DeviceBuilder, IDEAL_BATCH_SIZE, VIRTIO_NET_HDR_LEN};
// Patched forward example: tun2 → tun-rs with Linux GRO/GSO offload.
// For further reading, check out https://blog.cloudflare.com/virtual-networking-101-understanding-tap
//
// Key changes vs forward.rs:
// 1. Use tun-rs DeviceBuilder with .offload(true) on Linux (enables
// IFF_VNET_HDR + TUN_F_CSUM/TSO4/TSO6/USO4/USO6).
// 2. TX (stack → TUN): prepend 10-byte zero virtio_net_hdr (GSO_NONE)
// so the kernel accepts the write when IFF_VNET_HDR is set.
// 3. RX (TUN → stack): use recv_multiple() for batch GSO splitting;
// buffers sized to 1600 to fit smoltcp's 1504-byte MTU segments.
#[derive(Debug, StructOpt)]
#[structopt(name = "forward", about = "Simply forward tun tcp/udp traffic.")]
struct Opt {
/// Outbound interface to bind forwarded connections to.
#[structopt(short = "i", long = "interface")]
interface: String,
/// Name of the TUN device.
#[structopt(short = "n", long = "name", default_value = "utun8")]
name: String,
/// Tracing log level.
#[structopt(long = "log-level", default_value = "debug")]
log_level: tracing::Level,
/// Use current-thread Tokio runtime (default: multi-thread).
#[structopt(long = "current-thread")]
current_thread: bool,
/// Use spawn_local instead of spawn.
#[structopt(long = "local-task")]
local_task: bool,
}
pub(super) fn main() {
let opt = Opt::from_args();
let rt = if opt.current_thread {
tokio::runtime::Builder::new_current_thread()
} else {
tokio::runtime::Builder::new_multi_thread()
}
.enable_all()
.build()
.unwrap();
rt.block_on(main_exec(opt));
}
async fn main_exec(opt: Opt) {
macro_rules! tokio_spawn {
($fut:expr) => {
if opt.local_task {
tokio::task::spawn_local($fut)
} else {
tokio::task::spawn($fut)
}
};
}
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(opt.log_level)
.finish(),
)
.unwrap();
// Build TUN device with GRO/GSO offload on Linux.
let builder = DeviceBuilder::new()
.name(opt.name)
.ipv4("10.10.10.2", 24, Some("10.10.10.1"))
.mtu(9000);
let builder = builder.offload(true);
let dev = Arc::new(builder.build_async().unwrap());
let (stack, runner, udp_socket, tcp_listener) = StackBuilder::default()
.enable_tcp(true)
.enable_udp(true)
.enable_icmp(true)
.build()
.unwrap();
let udp_socket = udp_socket.unwrap();
let tcp_listener = tcp_listener.unwrap();
if let Some(runner) = runner {
tokio_spawn!(runner);
}
let (mut stack_sink, mut stack_stream) = stack.split();
let mut futs = vec![];
// stack → TUN
// With IFF_VNET_HDR every write must start with a virtio_net_hdr.
// We use all-zero (gso_type = GSO_NONE, flags = 0): plain packet,
// checksum already valid (smoltcp always computes checksums itself).
let dev1 = dev.clone();
futs.push(tokio_spawn!(async move {
while let Some(pkt) = stack_stream.next().await {
if let Ok(pkt) = pkt {
let result = {
let mut buf = vec![0u8; VIRTIO_NET_HDR_LEN + pkt.len()];
buf[VIRTIO_NET_HDR_LEN..].copy_from_slice(&pkt);
dev1.send(&buf).await
};
if let Err(e) = result {
warn!("failed to send packet to TUN: {:?}", e);
}
}
}
}));
// TUN → stack
// recv_multiple() does one read() syscall and returns N individual IP
// packets after splitting any incoming GRO super-packet.
// Buffer size 1600 > smoltcp MTU (1504) to avoid an out-of-bounds panic
// when the kernel segments at MSS=1464 with 40-byte IP+TCP headers.
futs.push(tokio_spawn!(async move {
let mut orig = vec![0u8; VIRTIO_NET_HDR_LEN + 65535];
let mut bufs = vec![vec![0u8; 1600]; IDEAL_BATCH_SIZE];
let mut sizes = vec![0usize; IDEAL_BATCH_SIZE];
while let Ok(n) = dev.recv_multiple(&mut orig, &mut bufs, &mut sizes, 0).await {
for i in 0..n {
let pkt = &bufs[i][..sizes[i]];
if let Err(e) = stack_sink.send(pkt.to_vec()).await {
warn!("failed to send packet to stack: {:?}", e);
}
}
}
}));
futs.push(tokio_spawn!({
let iface = opt.interface.clone();
async move {
handle_inbound_stream(tcp_listener, iface).await;
}
}));
futs.push(tokio_spawn!(async move {
handle_inbound_datagram(udp_socket, opt.interface).await;
}));
futures::future::join_all(futs).await.iter().for_each(|r| {
if let Err(e) = r {
error!("{:?}", e);
}
});
}
async fn handle_inbound_stream(mut tcp_listener: TcpListener, interface: String) {
while let Some((mut stream, local, remote)) = tcp_listener.next().await {
let interface = interface.clone();
tokio::spawn(async move {
info!("tcp: {:?} => {:?}", local, remote);
match new_tcp_stream(remote, &interface).await {
Ok(mut r) => {
if let Err(e) = tokio::io::copy_bidirectional(&mut stream, &mut r).await {
warn!(
"failed to copy tcp stream {:?}=>{:?}: {:?}",
local, remote, e
);
}
}
Err(e) => warn!(
"failed to open tcp stream {:?}=>{:?}: {:?}",
local, remote, e
),
}
});
}
}
async fn handle_inbound_datagram(udp_socket: UdpSocket, interface: String) {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let (mut read_half, mut write_half) = udp_socket.split();
tokio::spawn(async move {
while let Some((data, local, remote)) = rx.recv().await {
let _ = write_half.send((data, remote, local)).await;
}
});
while let Some((data, local, remote)) = read_half.next().await {
let tx = tx.clone();
let interface = interface.clone();
tokio::spawn(async move {
match new_udp_packet(remote, &interface).await {
Ok(sock) => {
let _ = sock.send(&data).await;
loop {
let mut buf = vec![0; 1024];
match sock.recv_from(&mut buf).await {
Ok((n, _)) => {
let _ = tx.send((buf[..n].to_vec(), local, remote));
}
Err(e) => {
warn!("udp recv {:?}: {:?}", remote, e);
break;
}
}
}
}
Err(e) => warn!("failed to open udp socket {:?}: {:?}", remote, e),
}
});
}
}
async fn new_tcp_stream(addr: SocketAddr, iface: &str) -> std::io::Result<TcpStream> {
use socket2_ext::{AddressBinding, BindDeviceOption};
let s = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, None)?;
s.bind_to_device(BindDeviceOption::v4(iface))?;
s.set_keepalive(true)?;
s.set_nodelay(true)?;
s.set_nonblocking(true)?;
Ok(TcpSocket::from_std_stream(s.into()).connect(addr).await?)
}
async fn new_udp_packet(
addr: SocketAddr,
iface: &str,
) -> std::io::Result<tokio::net::UdpSocket> {
use socket2_ext::{AddressBinding, BindDeviceOption};
let s = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::DGRAM, None)?;
s.bind_to_device(BindDeviceOption::v4(iface))?;
s.set_nonblocking(true)?;
let sock = tokio::net::UdpSocket::from_std(s.into())?;
sock.connect(addr).await?;
Ok(sock)
}
}
#[cfg(not(target_os = "linux"))]
mod inner {
pub(super) fn main() {}
}
fn main() {
inner::main();
}

View File

@ -0,0 +1,326 @@
use std::net::{IpAddr, SocketAddr};
use futures::{SinkExt, StreamExt};
use netstack_smoltcp::{StackBuilder, TcpListener, UdpSocket};
use structopt::StructOpt;
use tokio::net::{TcpSocket, TcpStream};
use tracing::{error, info, warn};
// to run this example, you should set the policy routing **after the start of the main program**
//
// linux:
// with bind device:
// `curl 1.1.1.1 --interface utun8`
// with default route:
// `bash scripts/route-linux.sh add`
// `curl 1.1.1.1`
// with single route:
// `ip rule add to 1.1.1.1 table 200`
// `ip route add default dev utun8 table 200`
// `curl 1.1.1.1`
//
// macos:
// with default route:
// `bash scripts/route-macos.sh add`
// `curl 1.1.1.1`
//
// windows:
// with default route:
// tun2 set default route automatically, won't set agian
// # `powershell.exe scripts/route-windows.ps1 add`
// `curl 1.1.1.1`
//
// currently, the example only supports the TCP stream, and the UDP packet will be dropped.
#[derive(Debug, StructOpt)]
#[structopt(name = "forward", about = "Simply forward tun tcp/udp traffic.")]
struct Opt {
/// Default binding interface, default by guessed.
/// Specify but doesn't exist, no device is bound.
#[structopt(short = "i", long = "interface")]
interface: String,
/// name of the tun device, default to rtun8.
#[structopt(short = "n", long = "name", default_value = "utun8")]
name: String,
/// Tracing subscriber log level.
#[structopt(long = "log-level", default_value = "debug")]
log_level: tracing::Level,
/// Tokio current-thread runtime, default to multi-thread.
#[structopt(long = "current-thread")]
current_thread: bool,
/// Tokio task spawn_local, default to spwan.
#[structopt(long = "local-task")]
local_task: bool,
}
fn main() {
let opt = Opt::from_args();
let rt = if opt.current_thread {
tokio::runtime::Builder::new_current_thread()
} else {
tokio::runtime::Builder::new_multi_thread()
}
.enable_all()
.build()
.unwrap();
rt.block_on(main_exec(opt));
}
async fn main_exec(opt: Opt) {
macro_rules! tokio_spawn {
($fut: expr) => {
if opt.local_task {
tokio::task::spawn_local($fut)
} else {
tokio::task::spawn($fut)
}
};
}
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(opt.log_level)
.finish(),
)
.unwrap();
let mut cfg = tun2::Configuration::default();
cfg.layer(tun2::Layer::L3);
let fd = -1;
if fd >= 0 {
cfg.raw_fd(fd);
} else {
cfg.tun_name(&opt.name)
.address("10.10.10.2")
.destination("10.10.10.1")
.mtu(tun2::DEFAULT_MTU);
#[cfg(not(any(target_arch = "mips", target_arch = "mips64",)))]
{
cfg.netmask("255.255.255.0");
}
cfg.up();
}
let device = tun2::create_as_async(&cfg).unwrap();
let mut builder = StackBuilder::default()
.enable_tcp(true)
.enable_udp(true)
.enable_icmp(true)
.mtu(9000);
if let Some(device_broadcast) = get_device_broadcast(&device) {
builder = builder
// .add_ip_filter(Box::new(move |src, dst| *src != device_broadcast && *dst != device_broadcast));
.add_ip_filter_fn(move |src, dst| *src != device_broadcast && *dst != device_broadcast);
}
let (stack, runner, udp_socket, tcp_listener) = builder.build().unwrap();
let udp_socket = udp_socket.unwrap(); // udp enabled
let tcp_listener = tcp_listener.unwrap(); // tcp enabled or icmp enabled
if let Some(runner) = runner {
tokio_spawn!(runner);
}
let framed = device.into_framed();
let (mut tun_sink, mut tun_stream) = framed.split();
let (mut stack_sink, mut stack_stream) = stack.split();
let mut futs = vec![];
// Reads packet from stack and sends to TUN.
futs.push(tokio_spawn!(async move {
while let Some(pkt) = stack_stream.next().await {
if let Ok(pkt) = pkt {
match tun_sink.send(pkt).await {
Ok(_) => {}
Err(e) => warn!("failed to send packet to TUN, err: {:?}", e),
}
}
}
}));
// Reads packet from TUN and sends to stack.
futs.push(tokio_spawn!(async move {
while let Some(pkt) = tun_stream.next().await {
if let Ok(pkt) = pkt {
match stack_sink.send(pkt).await {
Ok(_) => {}
Err(e) => warn!("failed to send packet to stack, err: {:?}", e),
};
}
}
}));
// Extracts TCP connections from stack and sends them to the dispatcher.
futs.push(tokio_spawn!({
let interface = opt.interface.clone();
async move {
handle_inbound_stream(tcp_listener, interface).await;
}
}));
// Receive and send UDP packets between netstack and NAT manager. The NAT
// manager would maintain UDP sessions and send them to the dispatcher.
futs.push(tokio_spawn!(async move {
handle_inbound_datagram(udp_socket, opt.interface).await;
}));
futures::future::join_all(futs)
.await
.iter()
.for_each(|res| {
if let Err(e) = res {
error!("error: {:?}", e);
}
});
}
/// simply forward tcp stream
async fn handle_inbound_stream(mut tcp_listener: TcpListener, interface: String) {
while let Some((mut stream, local, remote)) = tcp_listener.next().await {
let interface = interface.clone();
tokio::spawn(async move {
info!("new tcp connection: {:?} => {:?}", local, remote);
match new_tcp_stream(remote, &interface).await {
Ok(mut remote_stream) => {
// pipe between two tcp stream
match tokio::io::copy_bidirectional(&mut stream, &mut remote_stream).await {
Ok(_) => {}
Err(e) => warn!(
"failed to copy tcp stream {:?}=>{:?}, err: {:?}",
local, remote, e
),
}
}
Err(e) => warn!(
"failed to new tcp stream {:?}=>{:?}, err: {:?}",
local, remote, e
),
}
});
}
}
/// simply forward udp datagram
async fn handle_inbound_datagram(udp_socket: UdpSocket, interface: String) {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let (mut read_half, mut write_half) = udp_socket.split();
tokio::spawn(async move {
while let Some((data, local, remote)) = rx.recv().await {
let _ = write_half.send((data, remote, local)).await;
}
});
while let Some((data, local, remote)) = read_half.next().await {
let tx = tx.clone();
let interface = interface.clone();
tokio::spawn(async move {
info!("new udp datagram: {:?} => {:?}", local, remote);
match new_udp_packet(remote, &interface).await {
Ok(remote_socket) => {
// pipe between two udp sockets
let _ = remote_socket.send(&data).await;
loop {
let mut buf = vec![0; 1024];
match remote_socket.recv_from(&mut buf).await {
Ok((len, _)) => {
let _ = tx.send((buf[..len].to_vec(), local, remote));
}
Err(e) => {
warn!(
"failed to recv udp datagram {:?}<->{:?}: {:?}",
local, remote, e
);
break;
}
}
}
}
Err(e) => warn!(
"failed to new udp socket {:?}=>{:?}, err: {:?}",
local, remote, e
),
}
});
}
}
async fn new_tcp_stream<'a>(addr: SocketAddr, iface: &str) -> std::io::Result<TcpStream> {
use socket2_ext::{AddressBinding, BindDeviceOption};
let socket = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, None)?;
socket.bind_to_device(BindDeviceOption::v4(iface))?;
socket.set_keepalive(true)?;
socket.set_nodelay(true)?;
socket.set_nonblocking(true)?;
let stream = TcpSocket::from_std_stream(socket.into())
.connect(addr)
.await?;
Ok(stream)
}
async fn new_udp_packet(addr: SocketAddr, iface: &str) -> std::io::Result<tokio::net::UdpSocket> {
use socket2_ext::{AddressBinding, BindDeviceOption};
let socket = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::DGRAM, None)?;
socket.bind_to_device(BindDeviceOption::v4(iface))?;
socket.set_nonblocking(true)?;
let socket = tokio::net::UdpSocket::from_std(socket.into());
if let Ok(ref socket) = socket {
socket.connect(addr).await?;
}
socket
}
fn get_device_broadcast(device: &tun2::AsyncDevice) -> Option<std::net::Ipv4Addr> {
use tun2::AbstractDevice;
let mtu = device.mtu().unwrap_or(tun2::DEFAULT_MTU);
let address = match device.address() {
Ok(a) => match a {
IpAddr::V4(v4) => v4,
IpAddr::V6(_) => return None,
},
Err(_) => return None,
};
let netmask = match device.netmask() {
Ok(n) => match n {
IpAddr::V4(v4) => v4,
IpAddr::V6(_) => return None,
},
Err(_) => return None,
};
match smoltcp::wire::Ipv4Cidr::from_netmask(address, netmask) {
Ok(address_net) => match address_net.broadcast() {
Some(broadcast) => {
info!(
"tun device network: {} (address: {}, netmask: {}, broadcast: {}, mtu: {})",
address_net, address, netmask, broadcast, mtu,
);
Some(broadcast)
}
None => {
error!("invalid tun address {}, netmask {}", address, netmask);
None
}
},
Err(err) => {
error!(
"invalid tun address {}, netmask {}, error: {}",
address, netmask, err
);
None
}
}
}

View File

@ -0,0 +1,174 @@
#!/usr/bin/env bash
# bench-offload.sh
#
# Benchmarks netstack-smoltcp's forward examples with 2-stream iperf3.
# Compares:
# - examples/forward (tun2, no GRO/GSO offload)
# - examples/forward-offload-linux (tun-rs, Linux GRO/GSO offload via IFF_VNET_HDR)
#
# Setup: creates a veth pair + network namespace; iperf3 server runs inside
# the namespace, the forward proxy bridges traffic through a TUN device.
#
# Requirements: cargo, iperf3, ip (iproute2), root/CAP_NET_ADMIN
#
# Usage:
# sudo bash scripts/bench-offload.sh
#
# Run from the root of the netstack-smoltcp repository.
set -euo pipefail
# ── config ────────────────────────────────────────────────────────────────────
REPO_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
NS=bench
VETH_HOST=veth-host
VETH_NS=veth-bench
HOST_IP=172.19.0.1
NS_IP=172.19.0.2
PREFIX=24
TUN_NAME=utun8
TUN_IP=10.10.10.2
IPERF_PORT=5201
DURATION=15
STREAMS=2
# ── helpers ───────────────────────────────────────────────────────────────────
die() { echo "ERROR: $*" >&2; exit 1; }
require() { command -v "$1" &>/dev/null || die "'$1' not found"; }
cleanup() {
pkill -f "forward-" 2>/dev/null || true
ip netns exec "$NS" pkill iperf3 2>/dev/null || true
ip route del "${NS_IP}/32" dev "$TUN_NAME" 2>/dev/null || true
ip tuntap del dev "$TUN_NAME" mode tun 2>/dev/null || true
ip link del "$VETH_HOST" 2>/dev/null || true
ip netns del "$NS" 2>/dev/null || true
}
trap cleanup EXIT
# ── preflight ─────────────────────────────────────────────────────────────────
require cargo
require iperf3
require ip
[[ $EUID -eq 0 ]] || die "run as root (needs CAP_NET_ADMIN for TUN + netns)"
[[ -f "$REPO_DIR/Cargo.toml" ]] || die "run from the netstack-smoltcp repo root"
grep -q 'name = "netstack-smoltcp"' "$REPO_DIR/Cargo.toml" \
|| die "Cargo.toml does not look like netstack-smoltcp"
# ── network setup ─────────────────────────────────────────────────────────────
echo "[net] setting up namespace '$NS' and veth pair..."
cleanup 2>/dev/null || true
sleep 0.5
ip netns add "$NS"
ip link add "$VETH_HOST" type veth peer name "$VETH_NS"
ip link set "$VETH_NS" netns "$NS"
ip addr add "${HOST_IP}/${PREFIX}" dev "$VETH_HOST"
ip link set "$VETH_HOST" up
ip netns exec "$NS" ip addr add "${NS_IP}/${PREFIX}" dev "$VETH_NS"
ip netns exec "$NS" ip link set "$VETH_NS" up
ip netns exec "$NS" ip link set lo up
echo "[net] ${HOST_IP} <──veth──> ${NS_IP} (ns:${NS})"
# ── build: forward (tun2, no offload) ────────────────────────────────────────
echo ""
echo "[build] examples/forward (tun2, no GRO/GSO offload)..."
(
cd "$REPO_DIR"
cargo build --example forward --release --quiet
cp target/release/examples/forward /tmp/forward-tun2
)
echo "[build] done → /tmp/forward-tun2"
# ── build: forward-offload-linux (tun-rs, GRO/GSO offload) ───────────────────
echo ""
echo "[build] examples/forward-offload-linux (tun-rs, GRO/GSO offload)..."
(
cd "$REPO_DIR"
cargo build --example forward-offload-linux --release --quiet
cp target/release/examples/forward-offload-linux /tmp/forward-tun-rs
)
echo "[build] done → /tmp/forward-tun-rs"
# ── benchmark runner ──────────────────────────────────────────────────────────
run_bench() {
local label="$1" binary="$2"
# clean any leftover state
pkill -f "forward-" 2>/dev/null || true
ip netns exec "$NS" pkill iperf3 2>/dev/null || true
ip route del "${NS_IP}/32" dev "$TUN_NAME" 2>/dev/null || true
ip tuntap del dev "$TUN_NAME" mode tun 2>/dev/null || true
sleep 0.8
# start iperf3 server inside namespace
ip netns exec "$NS" iperf3 -s -p "$IPERF_PORT" -D \
--logfile /tmp/iperf3-bench-server.log
# start proxy
"$binary" -i "$VETH_HOST" -n "$TUN_NAME" --log-level warn &
sleep 2
ip link show "$TUN_NAME" &>/dev/null \
|| { echo " [!] TUN not up, skipping"; return 1; }
# route iperf3 traffic through TUN (more-specific /32 overrides /24 via veth)
ip route add "${NS_IP}/32" dev "$TUN_NAME"
echo " running iperf3: ${STREAMS} streams × ${DURATION}s …"
local out
out=$(iperf3 -c "$NS_IP" -p "$IPERF_PORT" \
-t "$DURATION" -P "$STREAMS" 2>&1)
local sender receiver
sender=$(echo "$out" | grep "SUM.*sender" | awk '{print $6, $7}')
receiver=$(echo "$out" | grep "SUM.*receiver" | awk '{print $6, $7}')
if [[ -z "$sender" ]]; then
echo " result: FAILED"
echo "$out" | tail -5 | sed 's/^/ /'
else
printf " sender: %s\n" "$sender"
printf " receiver: %s\n" "$receiver"
fi
pkill -f "forward-" 2>/dev/null || true
ip netns exec "$NS" pkill iperf3 2>/dev/null || true
ip route del "${NS_IP}/32" dev "$TUN_NAME" 2>/dev/null || true
ip tuntap del dev "$TUN_NAME" mode tun 2>/dev/null || true
sleep 0.8
}
# ── direct baseline ───────────────────────────────────────────────────────────
echo ""
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
echo " BASELINE: direct veth (no TUN, no proxy)"
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
ip netns exec "$NS" pkill iperf3 2>/dev/null || true; sleep 0.3
ip netns exec "$NS" iperf3 -s -p "$IPERF_PORT" -D \
--logfile /tmp/iperf3-bench-server.log; sleep 0.3
echo " running iperf3: ${STREAMS} streams × ${DURATION}s …"
baseline_out=$(iperf3 -c "$NS_IP" -p "$IPERF_PORT" \
-t "$DURATION" -P "$STREAMS" 2>&1)
echo "$baseline_out" | grep "SUM.*sender" | awk '{printf " sender: %s %s\n", $6, $7}'
echo "$baseline_out" | grep "SUM.*receiver" | awk '{printf " receiver: %s %s\n", $6, $7}'
ip netns exec "$NS" pkill iperf3 2>/dev/null || true; sleep 0.5
# ── tun2 ─────────────────────────────────────────────────────────────────────
echo ""
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
echo " tun2 (main branch — no GRO/GSO offload)"
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
run_bench "tun2" /tmp/forward-tun2
# ── tun-rs + offload ──────────────────────────────────────────────────────────
echo ""
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
echo " tun-rs (patched — GRO/GSO offload via IFF_VNET_HDR)"
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
run_bench "tun-rs+offload" /tmp/forward-tun-rs
echo ""
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
echo " done."
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"

View File

@ -0,0 +1,26 @@
#!/bin/bash
#__author__: cavivie
DEFAULT_TUN_NAME="utun8"
function do_route() {
local route_op="${1}"
local tun_name="${2:-$DEFAULT_TUN_NAME}"
ip route ${route_op} 0.0.0.0/1 dev ${tun_name}
ip route ${route_op} 128.0.0.0/1 dev ${tun_name}
}
function usage(){
echo "Usage:
route add add tun routes to system route table
route del delete routes from system route table
route help display all usages of the shell script"
}
# START MAIN-OPTIONS
case $1 in
add) do_route add $2;;
del) do_route delete $2;;
*) usage ;;
esac
# END MAIN-OPTIONS

View File

@ -0,0 +1,36 @@
#!/bin/bash
#__author__: cavivie
DEFAULT_TUN_ADDR="10.10.10.2/24"
DEFAULT_TUN_DEST="10.10.10.1"
function do_route() {
local route_op="${1}"
local tun_addr="${2:-$DEFAULT_TUN_ADDR}"
local tun_dest="${3:-$DEFAULT_TUN_DEST}"
sudo route ${route_op} -net 1.0.0.0/8 ${tun_dest}
sudo route ${route_op} -net 2.0.0.0/7 ${tun_dest}
sudo route ${route_op} -net 4.0.0.0/6 ${tun_dest}
sudo route ${route_op} -net 8.0.0.0/5 ${tun_dest}
sudo route ${route_op} -net 16.0.0.0/4 ${tun_dest}
sudo route ${route_op} -net 32.0.0.0/3 ${tun_dest}
sudo route ${route_op} -net 64.0.0.0/2 ${tun_dest}
sudo route ${route_op} -net 128.0.0.0/1 ${tun_dest}
# tun2 do like this automatically
sudo route ${route_op} -net ${tun_addr} ${tun_dest}
}
function usage(){
echo "Usage:
route add add tun routes to system route table
route del delete routes from system route table
route help display all usages of the shell script"
}
# START MAIN-OPTIONS
case $1 in
add) do_route add $2 $3;;
del) do_route delete $2 $3;;
*) usage ;;
esac
# END MAIN-OPTIONS

View File

@ -0,0 +1,30 @@
#__author__: cavivie
param(
[string]$Cmd = "help",
[string]$TunName = "utun8",
[string]$TunGateway = "10.10.10.1"
)
$ErrorActionPreference = "Stop"
# START MAIN-OPTIONS
switch ($Cmd) {
"add" {
# tun2 do like this automatically
New-NetRoute -DestinationPrefix "0.0.0.0/1" -InterfaceAlias $TunName -NextHop "$TunGateway"
New-NetRoute -DestinationPrefix "128.0.0.0/1" -InterfaceAlias $TunName -NextHop "$TunGateway"
}
"del" {
# tun2 do like this automatically
Get-NetRoute -DestinationPrefix "0.0.0.0/1" -InterfaceAlias $TunName | Remove-NetRoute
Get-NetRoute -DestinationPrefix "128.0.0.0/1" -InterfaceAlias $TunName | Remove-NetRoute
}
default {
Write-Host "Usage:
route add add tun routes to system route table
route del delete routes from system route table
route help display all usages of the shell script"
}
}
# END MAIN-OPTIONS

View File

@ -0,0 +1,101 @@
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use smoltcp::{
phy::{Device, DeviceCapabilities, Medium, RxToken, TxToken},
time::Instant,
};
use tokio::sync::mpsc::{unbounded_channel, Permit, Sender, UnboundedReceiver, UnboundedSender};
use crate::packet::AnyIpPktFrame;
pub(super) struct VirtualDevice {
in_buf_avail: Arc<AtomicBool>,
in_buf: UnboundedReceiver<Vec<u8>>,
out_buf: Sender<AnyIpPktFrame>,
mtu: usize,
}
impl VirtualDevice {
pub(super) fn new(
iface_egress_tx: Sender<AnyIpPktFrame>,
mtu: usize,
) -> (Self, UnboundedSender<Vec<u8>>, Arc<AtomicBool>) {
let iface_ingress_tx_avail = Arc::new(AtomicBool::new(false));
let (iface_ingress_tx, iface_ingress_rx) = unbounded_channel();
(
Self {
in_buf_avail: iface_ingress_tx_avail.clone(),
in_buf: iface_ingress_rx,
out_buf: iface_egress_tx,
mtu,
},
iface_ingress_tx,
iface_ingress_tx_avail,
)
}
}
impl Device for VirtualDevice {
type RxToken<'a> = VirtualRxToken;
type TxToken<'a> = VirtualTxToken<'a>;
fn receive(&mut self, _timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> {
let Ok(buffer) = self.in_buf.try_recv() else {
self.in_buf_avail.store(false, Ordering::Release);
return None;
};
let Ok(permit) = self.out_buf.try_reserve() else {
self.in_buf_avail.store(false, Ordering::Release);
return None;
};
Some((Self::RxToken { buffer }, Self::TxToken { permit }))
}
fn transmit(&mut self, _timestamp: Instant) -> Option<Self::TxToken<'_>> {
match self.out_buf.try_reserve() {
Ok(permit) => Some(Self::TxToken { permit }),
Err(_) => None,
}
}
fn capabilities(&self) -> DeviceCapabilities {
let mut capabilities = DeviceCapabilities::default();
capabilities.medium = Medium::Ip;
capabilities.max_transmission_unit = self.mtu;
capabilities
}
}
pub(super) struct VirtualRxToken {
buffer: Vec<u8>,
}
impl RxToken for VirtualRxToken {
fn consume<R, F>(self, f: F) -> R
where
F: FnOnce(&[u8]) -> R,
{
f(&self.buffer[..])
}
}
pub(super) struct VirtualTxToken<'a> {
permit: Permit<'a, Vec<u8>>,
}
impl<'a> TxToken for VirtualTxToken<'a> {
fn consume<R, F>(self, len: usize, f: F) -> R
where
F: FnOnce(&mut [u8]) -> R,
{
let mut buffer = vec![0u8; len];
let result = f(&mut buffer);
self.permit.send(buffer);
result
}
}

View File

@ -0,0 +1,56 @@
use std::net::IpAddr;
pub type IpFilter<'a> = Box<dyn Fn(&IpAddr, &IpAddr) -> bool + Send + Sync + 'a>;
pub struct IpFilters<'a> {
filters: Vec<IpFilter<'a>>,
}
impl<'a> Default for IpFilters<'a> {
fn default() -> Self {
Self::new()
}
}
impl<'a> IpFilters<'a> {
pub fn new() -> Self {
Self {
filters: Default::default(),
}
}
pub fn with_non_broadcast() -> Self {
macro_rules! non_broadcast {
($addr:ident) => {
match $addr {
IpAddr::V4(a) => !(a.is_broadcast() || a.is_multicast() || a.is_unspecified()),
IpAddr::V6(a) => !(a.is_multicast() || a.is_unspecified()),
}
};
}
Self {
filters: vec![Box::new(|src, dst| {
non_broadcast!(src) && non_broadcast!(dst)
})],
}
}
pub fn add(&mut self, filter: IpFilter<'a>) {
self.filters.push(filter);
}
pub fn add_fn<F>(&mut self, filter: F)
where
F: Fn(&IpAddr, &IpAddr) -> bool + Send + Sync + 'a,
{
self.filters.push(Box::new(filter));
}
pub fn add_all<I: IntoIterator<Item = IpFilter<'a>>>(&mut self, filters: I) {
self.filters.extend(filters);
}
pub fn is_allowed(&self, src: &IpAddr, dst: &IpAddr) -> bool {
self.filters.iter().all(|filter| filter(src, dst))
}
}

View File

@ -0,0 +1,22 @@
mod device;
mod runner;
pub use runner::Runner;
mod packet;
pub use packet::AnyIpPktFrame;
mod filter;
pub use filter::{IpFilter, IpFilters};
pub mod udp;
pub use udp::UdpSocket;
pub mod tcp;
pub use tcp::{TcpListener, TcpStream};
pub mod stack;
pub use stack::{Stack, StackBuilder};
/// Re-export
pub use smoltcp;

View File

@ -0,0 +1,53 @@
use std::net::IpAddr;
use smoltcp::wire::{IpProtocol, IpVersion, Ipv4Packet, Ipv6Packet};
pub type AnyIpPktFrame = Vec<u8>;
#[derive(Debug)]
pub(super) enum IpPacket<T: AsRef<[u8]>> {
Ipv4(Ipv4Packet<T>),
Ipv6(Ipv6Packet<T>),
}
impl<T: AsRef<[u8]> + Copy> IpPacket<T> {
pub fn new_checked(packet: T) -> smoltcp::wire::Result<IpPacket<T>> {
let buffer = packet.as_ref();
match IpVersion::of_packet(buffer)? {
IpVersion::Ipv4 => Ok(IpPacket::Ipv4(Ipv4Packet::new_checked(packet)?)),
IpVersion::Ipv6 => Ok(IpPacket::Ipv6(Ipv6Packet::new_checked(packet)?)),
}
}
pub fn src_addr(&self) -> IpAddr {
match *self {
IpPacket::Ipv4(ref packet) => IpAddr::from(packet.src_addr()),
IpPacket::Ipv6(ref packet) => IpAddr::from(packet.src_addr()),
}
}
pub fn dst_addr(&self) -> IpAddr {
match *self {
IpPacket::Ipv4(ref packet) => IpAddr::from(packet.dst_addr()),
IpPacket::Ipv6(ref packet) => IpAddr::from(packet.dst_addr()),
}
}
pub fn protocol(&self) -> IpProtocol {
match *self {
IpPacket::Ipv4(ref packet) => packet.next_header(),
IpPacket::Ipv6(ref packet) => packet.next_header(),
}
}
}
impl<'a, T: AsRef<[u8]> + ?Sized> IpPacket<&'a T> {
/// Return a pointer to the payload.
#[inline]
pub fn payload(&self) -> &'a [u8] {
match *self {
IpPacket::Ipv4(ref packet) => packet.payload(),
IpPacket::Ipv6(ref packet) => packet.payload(),
}
}
}

View File

@ -0,0 +1,41 @@
use std::{
future::{Future, IntoFuture},
pin::Pin,
task::{Context, Poll},
};
/// BoxFuture acts the same as the [BoxFuture in crate futures utils],
/// which is an owned dynamically typed Future for use in cases where you
/// cant statically type your result or need to add some indirection.
/// But the difference of this structure is that it will conditionally
/// implement Send according to the properties of type T, which does not
/// require two sets of API interfaces in single-threaded and multi-threaded.
///
/// [BoxFuture in crate futures utils]: https://docs.rs/futures-util/latest/futures_util/future/type.BoxFuture.html
pub struct BoxFuture<'a, T>(Pin<Box<dyn Future<Output = T> + 'a>>);
impl<'a, T> BoxFuture<'a, T> {
pub fn new<F>(f: F) -> BoxFuture<'a, T>
where
F: IntoFuture<Output = T> + 'a,
{
BoxFuture(Box::pin(f.into_future()))
}
#[allow(unused)]
pub fn wrap(f: Pin<Box<dyn Future<Output = T> + 'a>>) -> BoxFuture<'a, T> {
BoxFuture(f)
}
}
unsafe impl<T: Send> Send for BoxFuture<'_, T> {}
impl<T> Future for BoxFuture<'_, T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
self.0.as_mut().poll(context)
}
}
pub type Runner = BoxFuture<'static, std::io::Result<()>>;

View File

@ -0,0 +1,279 @@
use std::{
net::IpAddr,
pin::Pin,
task::{ready, Context, Poll},
};
use futures::{Sink, Stream};
use smoltcp::wire::IpProtocol;
use tokio::sync::mpsc::{channel, Receiver};
use tokio_util::sync::PollSender;
use tracing::{debug, trace};
use crate::{
filter::{IpFilter, IpFilters},
packet::{AnyIpPktFrame, IpPacket},
runner::Runner,
tcp::TcpListener,
udp::UdpSocket,
};
pub struct StackBuilder {
enable_udp: bool,
enable_tcp: bool,
enable_icmp: bool,
stack_buffer_size: usize,
udp_buffer_size: usize,
tcp_buffer_size: usize,
mtu: usize,
ip_filters: IpFilters<'static>,
}
impl Default for StackBuilder {
fn default() -> Self {
Self {
enable_udp: false,
enable_tcp: false,
enable_icmp: false,
stack_buffer_size: 1024,
udp_buffer_size: 512,
tcp_buffer_size: 512,
mtu: 1504, // 1500 for Ethernet + 4 for VLAN
ip_filters: IpFilters::with_non_broadcast(),
}
}
}
#[allow(unused)]
impl StackBuilder {
pub fn enable_udp(mut self, enable: bool) -> Self {
self.enable_udp = enable;
self
}
pub fn enable_tcp(mut self, enable: bool) -> Self {
self.enable_tcp = enable;
self
}
pub fn enable_icmp(mut self, enable: bool) -> Self {
self.enable_icmp = enable;
self
}
pub fn stack_buffer_size(mut self, size: usize) -> Self {
self.stack_buffer_size = size;
self
}
pub fn udp_buffer_size(mut self, size: usize) -> Self {
self.udp_buffer_size = size;
self
}
pub fn tcp_buffer_size(mut self, size: usize) -> Self {
self.tcp_buffer_size = size;
self
}
pub fn set_ip_filters(mut self, filters: IpFilters<'static>) -> Self {
self.ip_filters = filters;
self
}
pub fn add_ip_filter(mut self, filter: IpFilter<'static>) -> Self {
self.ip_filters.add(filter);
self
}
pub fn add_ip_filter_fn<F>(mut self, filter: F) -> Self
where
F: Fn(&IpAddr, &IpAddr) -> bool + Send + Sync + 'static,
{
self.ip_filters.add_fn(filter);
self
}
pub fn mtu(mut self, mtu: usize) -> Self {
self.mtu = mtu;
self
}
#[allow(clippy::type_complexity)]
pub fn build(
self,
) -> std::io::Result<(
Stack,
Option<Runner>,
Option<UdpSocket>,
Option<TcpListener>,
)> {
let (stack_tx, stack_rx) = channel(self.stack_buffer_size);
let (udp_tx, udp_rx) = if self.enable_udp {
let (udp_tx, udp_rx) = channel(self.udp_buffer_size);
(Some(PollSender::new(udp_tx)), Some(udp_rx))
} else {
(None, None)
};
let (tcp_tx, tcp_rx) = if self.enable_tcp {
let (tcp_tx, tcp_rx) = channel(self.tcp_buffer_size);
(Some(PollSender::new(tcp_tx)), Some(tcp_rx))
} else {
(None, None)
};
// ICMP is handled by TCP's Interface.
// smoltcp's interface will always send replies to EchoRequest
if self.enable_icmp && !self.enable_tcp {
use std::io::{Error, ErrorKind::InvalidInput};
return Err(Error::new(InvalidInput, "ICMP requires TCP"));
}
let icmp_tx = if self.enable_icmp {
tcp_tx.clone()
} else {
None
};
let udp_socket = udp_rx.map(|udp_rx| UdpSocket::new(udp_rx, stack_tx.clone()));
let (tcp_runner, tcp_listener) = if let Some(tcp_rx) = tcp_rx {
let (tcp_runner, tcp_listener) = TcpListener::new(tcp_rx, stack_tx, self.mtu)?;
(Some(tcp_runner), Some(tcp_listener))
} else {
(None, None)
};
let stack = Stack {
ip_filters: self.ip_filters,
stack_rx,
sink_buf: None,
udp_tx,
tcp_tx,
icmp_tx,
};
Ok((stack, tcp_runner, udp_socket, tcp_listener))
}
}
pub struct Stack {
ip_filters: IpFilters<'static>,
sink_buf: Option<(AnyIpPktFrame, IpProtocol)>,
udp_tx: Option<PollSender<AnyIpPktFrame>>,
tcp_tx: Option<PollSender<AnyIpPktFrame>>,
icmp_tx: Option<PollSender<AnyIpPktFrame>>,
stack_rx: Receiver<AnyIpPktFrame>,
}
impl Stack {
fn poll_send(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
let (item, proto) = match self.sink_buf.take() {
Some(val) => val,
None => return Poll::Ready(Ok(())),
};
let tx = match proto {
IpProtocol::Tcp => self.tcp_tx.as_mut(),
IpProtocol::Udp => self.udp_tx.as_mut(),
IpProtocol::Icmp | IpProtocol::Icmpv6 => self.icmp_tx.as_mut(),
_ => unreachable!(),
};
let Some(tx) = tx else {
return Poll::Ready(Ok(()));
};
match tx.poll_reserve(cx) {
Poll::Pending => {
self.sink_buf = Some((item, proto));
Poll::Pending
}
Poll::Ready(Err(_)) => Poll::Ready(Err(channel_closed_err("channel is closed"))),
Poll::Ready(Ok(_)) => match tx.send_item(item) {
Ok(()) => Poll::Ready(Ok(())),
Err(_) => Poll::Ready(Err(channel_closed_err("channel is closed"))),
},
}
}
}
// Recv from stack.
impl Stream for Stack {
type Item = std::io::Result<AnyIpPktFrame>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.stack_rx.poll_recv(cx) {
Poll::Ready(Some(pkt)) => Poll::Ready(Some(Ok(pkt))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
// Send to stack.
impl Sink<AnyIpPktFrame> for Stack {
type Error = std::io::Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// If a buffered item exists, try to flush it first. This also properly
// registers the waker via poll_reserve so we get woken when the channel
// has capacity. Without this, returning Pending here with _cx unused
// means the task never gets rescheduled.
if self.sink_buf.is_some() {
ready!(self.poll_send(cx))?;
}
Poll::Ready(Ok(()))
}
fn start_send(mut self: Pin<&mut Self>, item: AnyIpPktFrame) -> Result<(), Self::Error> {
if item.is_empty() {
return Ok(());
}
use std::io::{Error, ErrorKind::InvalidInput};
let packet = IpPacket::new_checked(item.as_slice())
.map_err(|err| Error::new(InvalidInput, format!("invalid IP packet: {err}")))?;
let src_ip = packet.src_addr();
let dst_ip = packet.dst_addr();
let addr_allowed = self.ip_filters.is_allowed(&src_ip, &dst_ip);
if !addr_allowed {
trace!("IP packet {src_ip} -> {dst_ip} (allowed? {addr_allowed}) throwing away",);
return Ok(());
}
let protocol = packet.protocol();
if matches!(
protocol,
IpProtocol::Tcp | IpProtocol::Udp | IpProtocol::Icmp | IpProtocol::Icmpv6
) {
self.sink_buf.replace((item, protocol));
} else {
debug!("tun IP packet ignored (protocol: {:?})", protocol);
}
Ok(())
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_send(cx)
}
fn poll_close(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.stack_rx.close();
Poll::Ready(Ok(()))
}
}
fn channel_closed_err<E>(err: E) -> std::io::Error
where
E: Into<Box<dyn std::error::Error + Send + Sync>>,
{
std::io::Error::new(std::io::ErrorKind::BrokenPipe, err)
}

561
netstack-smoltcp/src/tcp.rs Normal file
View File

@ -0,0 +1,561 @@
use std::{
collections::HashMap,
net::SocketAddr,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
task::{Context, Poll, Waker},
};
use futures::Stream;
use smoltcp::{
iface::{Config as InterfaceConfig, Interface, SocketHandle, SocketSet},
phy::Device,
socket::tcp::{Socket as TcpSocket, SocketBuffer as TcpSocketBuffer, State as TcpState},
storage::RingBuffer,
time::{Duration, Instant},
wire::{HardwareAddress, IpAddress, IpCidr, IpProtocol, Ipv4Address, Ipv6Address, TcpPacket},
};
use spin::Mutex as SpinMutex;
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
sync::{
mpsc::{unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender},
Notify,
},
};
use tracing::{error, trace};
use crate::{
device::VirtualDevice,
packet::{AnyIpPktFrame, IpPacket},
Runner,
};
// NOTE: Default buffer could contain 20 AEAD packets
const DEFAULT_TCP_SEND_BUFFER_SIZE: u32 = 0x3FFF * 20;
const DEFAULT_TCP_RECV_BUFFER_SIZE: u32 = 0x3FFF * 20;
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
enum TcpSocketState {
Normal,
Close,
Closing,
Closed,
}
struct TcpSocketControl {
send_buffer: RingBuffer<'static, u8>,
send_waker: Option<Waker>,
recv_buffer: RingBuffer<'static, u8>,
recv_waker: Option<Waker>,
recv_state: TcpSocketState,
send_state: TcpSocketState,
}
struct TcpSocketCreation {
control: SharedControl,
socket: TcpSocket<'static>,
}
type SharedNotify = Arc<Notify>;
type SharedControl = Arc<SpinMutex<TcpSocketControl>>;
struct TcpListenerRunner;
impl TcpListenerRunner {
fn create(
device: VirtualDevice,
iface: Interface,
iface_ingress_tx: UnboundedSender<Vec<u8>>,
iface_ingress_tx_avail: Arc<AtomicBool>,
tcp_rx: Receiver<AnyIpPktFrame>,
stream_tx: UnboundedSender<TcpStream>,
sockets: HashMap<SocketHandle, SharedControl>,
) -> Runner {
Runner::new(async move {
let notify = Arc::new(Notify::new());
let (socket_tx, socket_rx) = unbounded_channel::<TcpSocketCreation>();
let res = tokio::select! {
v = Self::handle_packet(notify.clone(), iface_ingress_tx, iface_ingress_tx_avail.clone(), tcp_rx, stream_tx, socket_tx) => v,
v = Self::handle_socket(notify, device, iface, iface_ingress_tx_avail, sockets, socket_rx) => v,
};
res?;
trace!("VirtDevice::poll thread exited");
Ok(())
})
}
async fn handle_packet(
notify: SharedNotify,
iface_ingress_tx: UnboundedSender<Vec<u8>>,
iface_ingress_tx_avail: Arc<AtomicBool>,
mut tcp_rx: Receiver<AnyIpPktFrame>,
stream_tx: UnboundedSender<TcpStream>,
socket_tx: UnboundedSender<TcpSocketCreation>,
) -> std::io::Result<()> {
while let Some(frame) = tcp_rx.recv().await {
let packet = match IpPacket::new_checked(frame.as_slice()) {
Ok(p) => p,
Err(err) => {
error!("invalid TCP IP packet: {:?}", err,);
continue;
}
};
// Specially handle icmp packet by TCP interface.
if matches!(packet.protocol(), IpProtocol::Icmp | IpProtocol::Icmpv6) {
iface_ingress_tx
.send(frame)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))?;
iface_ingress_tx_avail.store(true, Ordering::Release);
notify.notify_one();
continue;
}
let src_ip = packet.src_addr();
let dst_ip = packet.dst_addr();
let payload = packet.payload();
let packet = match TcpPacket::new_checked(payload) {
Ok(p) => p,
Err(err) => {
error!("invalid TCP err: {err}, src_ip: {src_ip}, dst_ip: {dst_ip}, payload: {payload:?}");
continue;
}
};
let src_port = packet.src_port();
let dst_port = packet.dst_port();
let src_addr = SocketAddr::new(src_ip, src_port);
let dst_addr = SocketAddr::new(dst_ip, dst_port);
// TCP first handshake packet, create a new Connection
if packet.syn() && !packet.ack() {
let mut socket = TcpSocket::new(
TcpSocketBuffer::new(vec![0u8; DEFAULT_TCP_RECV_BUFFER_SIZE as usize]),
TcpSocketBuffer::new(vec![0u8; DEFAULT_TCP_SEND_BUFFER_SIZE as usize]),
);
socket.set_keep_alive(Some(Duration::from_secs(28)));
// FIXME: It should follow system's setting. 7200 is Linux's default.
socket.set_timeout(Some(Duration::from_secs(7200)));
// NO ACK delay
// socket.set_ack_delay(None);
if let Err(err) = socket.listen(dst_addr) {
error!("listen error: {:?}", err);
continue;
}
trace!("created TCP connection for {} <-> {}", src_addr, dst_addr);
let control = Arc::new(SpinMutex::new(TcpSocketControl {
send_buffer: RingBuffer::new(vec![0u8; DEFAULT_TCP_SEND_BUFFER_SIZE as usize]),
send_waker: None,
recv_buffer: RingBuffer::new(vec![0u8; DEFAULT_TCP_RECV_BUFFER_SIZE as usize]),
recv_waker: None,
recv_state: TcpSocketState::Normal,
send_state: TcpSocketState::Normal,
}));
stream_tx
.send(TcpStream {
src_addr,
dst_addr,
notify: notify.clone(),
control: control.clone(),
})
.map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))?;
socket_tx
.send(TcpSocketCreation { control, socket })
.map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))?;
}
// Pipeline tcp stream packet
iface_ingress_tx
.send(frame)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))?;
iface_ingress_tx_avail.store(true, Ordering::Release);
notify.notify_one();
}
Ok(())
}
async fn handle_socket(
notify: SharedNotify,
mut device: VirtualDevice,
mut iface: Interface,
iface_ingress_tx_avail: Arc<AtomicBool>,
mut sockets: HashMap<SocketHandle, SharedControl>,
mut socket_rx: UnboundedReceiver<TcpSocketCreation>,
) -> std::io::Result<()> {
let mut socket_set = SocketSet::new(vec![]);
loop {
while let Ok(TcpSocketCreation { control, socket }) = socket_rx.try_recv() {
let handle = socket_set.add(socket);
sockets.insert(handle, control);
}
let before_poll = Instant::now();
let updated_sockets = iface.poll(before_poll, &mut device, &mut socket_set);
if matches!(
updated_sockets,
smoltcp::iface::PollResult::SocketStateChanged
) {
trace!("VirtDevice::poll costed {}", Instant::now() - before_poll);
}
// Check all the sockets' status
let mut sockets_to_remove = Vec::new();
for (socket_handle, control) in sockets.iter() {
let socket_handle = *socket_handle;
let socket = socket_set.get_mut::<TcpSocket>(socket_handle);
let mut control = control.lock();
// Remove the socket only when it is in the closed state.
if socket.state() == TcpState::Closed {
sockets_to_remove.push(socket_handle);
control.send_state = TcpSocketState::Closed;
control.recv_state = TcpSocketState::Closed;
if let Some(waker) = control.send_waker.take() {
waker.wake();
}
if let Some(waker) = control.recv_waker.take() {
waker.wake();
}
trace!("closed TCP connection");
continue;
}
// SHUT_WR — only close once the send_buffer has been fully
// drained into the smoltcp socket. Closing earlier transitions
// the socket to FIN_WAIT_1, making can_send() return false, so
// the send loop below never runs and the remaining data is lost.
if matches!(control.send_state, TcpSocketState::Close)
&& control.send_buffer.is_empty()
{
trace!("closing TCP Write Half, {:?}", socket.state());
socket.close();
control.send_state = TcpSocketState::Closing;
}
// Check if readable
let mut wake_receiver = false;
while socket.can_recv() && !control.recv_buffer.is_full() {
let result = socket.recv(|buffer| {
let n = control.recv_buffer.enqueue_slice(buffer);
(n, ())
});
match result {
Ok(..) => wake_receiver = true,
Err(err) => {
error!("socket recv error: {:?}, {:?}", err, socket.state());
// Don't know why. Abort the connection.
socket.abort();
if matches!(control.recv_state, TcpSocketState::Normal) {
control.recv_state = TcpSocketState::Closed;
}
wake_receiver = true;
// The socket will be recycled in the next poll.
break;
}
}
}
// If socket is not in ESTABLISH, FIN-WAIT-1, FIN-WAIT-2,
// the local client have closed our receiver.
let states = [
TcpState::Listen,
TcpState::SynReceived,
TcpState::Established,
TcpState::FinWait1,
TcpState::FinWait2,
];
if matches!(control.recv_state, TcpSocketState::Normal)
&& !socket.may_recv()
&& !states.contains(&socket.state())
{
trace!("closed TCP Read Half, {:?}", socket.state());
// Let TcpStream::poll_read returns EOF.
control.recv_state = TcpSocketState::Closed;
wake_receiver = true;
}
if wake_receiver && control.recv_waker.is_some() {
if let Some(waker) = control.recv_waker.take() {
waker.wake();
}
}
// Check if writable
let mut wake_sender = false;
while socket.can_send() && !control.send_buffer.is_empty() {
let result = socket.send(|buffer| {
let n = control.send_buffer.dequeue_slice(buffer);
(n, ())
});
match result {
Ok(..) => wake_sender = true,
Err(err) => {
error!("socket send error: {:?}, {:?}", err, socket.state());
// Don't know why. Abort the connection.
socket.abort();
if matches!(control.send_state, TcpSocketState::Normal) {
control.send_state = TcpSocketState::Closed;
}
wake_sender = true;
// The socket will be recycled in the next poll.
break;
}
}
}
if wake_sender && control.send_waker.is_some() {
if let Some(waker) = control.send_waker.take() {
waker.wake();
}
}
}
for socket_handle in sockets_to_remove {
sockets.remove(&socket_handle);
socket_set.remove(socket_handle);
}
if !iface_ingress_tx_avail.load(Ordering::Acquire) {
let next_duration = iface
.poll_delay(before_poll, &socket_set)
.unwrap_or(Duration::from_millis(5));
if next_duration != Duration::ZERO {
let _ = tokio::time::timeout(
tokio::time::Duration::from(next_duration),
notify.notified(),
)
.await;
}
}
}
}
}
pub struct TcpListener {
stream_rx: UnboundedReceiver<TcpStream>,
}
impl TcpListener {
pub(super) fn new(
tcp_rx: Receiver<AnyIpPktFrame>,
stack_tx: Sender<AnyIpPktFrame>,
mtu: usize,
) -> std::io::Result<(Runner, Self)> {
let (mut device, iface_ingress_tx, iface_ingress_tx_avail) =
VirtualDevice::new(stack_tx, mtu);
let iface = Self::create_interface(&mut device)?;
let (stream_tx, stream_rx) = unbounded_channel();
let runner = TcpListenerRunner::create(
device,
iface,
iface_ingress_tx,
iface_ingress_tx_avail,
tcp_rx,
stream_tx,
HashMap::new(),
);
Ok((runner, Self { stream_rx }))
}
fn create_interface<D>(device: &mut D) -> std::io::Result<Interface>
where
D: Device + ?Sized,
{
let mut iface_config = InterfaceConfig::new(HardwareAddress::Ip);
iface_config.random_seed = rand::random();
let mut iface = Interface::new(iface_config, device, Instant::now());
iface.update_ip_addrs(|ip_addrs| {
ip_addrs
.push(IpCidr::new(IpAddress::v4(0, 0, 0, 1), 0))
.expect("iface IPv4");
ip_addrs
.push(IpCidr::new(IpAddress::v6(0, 0, 0, 0, 0, 0, 0, 1), 0))
.expect("iface IPv6");
});
iface
.routes_mut()
.add_default_ipv4_route(Ipv4Address::new(0, 0, 0, 1))
.map_err(|e| std::io::Error::new(std::io::ErrorKind::AddrNotAvailable, e))?;
iface
.routes_mut()
.add_default_ipv6_route(Ipv6Address::new(0, 0, 0, 0, 0, 0, 0, 1))
.map_err(|e| std::io::Error::new(std::io::ErrorKind::AddrNotAvailable, e))?;
iface.set_any_ip(true);
Ok(iface)
}
}
impl Stream for TcpListener {
type Item = (TcpStream, SocketAddr, SocketAddr);
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.stream_rx.poll_recv(cx).map(|stream| {
stream.map(|stream| {
let local_addr = *stream.local_addr();
let remote_addr: SocketAddr = *stream.remote_addr();
(stream, local_addr, remote_addr)
})
})
}
}
pub struct TcpStream {
src_addr: SocketAddr,
dst_addr: SocketAddr,
notify: SharedNotify,
control: SharedControl,
}
impl Drop for TcpStream {
fn drop(&mut self) {
let mut control = self.control.lock();
if matches!(control.recv_state, TcpSocketState::Normal) {
control.recv_state = TcpSocketState::Close;
}
if matches!(control.send_state, TcpSocketState::Normal) {
control.send_state = TcpSocketState::Close;
}
self.notify.notify_one();
}
}
impl TcpStream {
pub fn local_addr(&self) -> &SocketAddr {
&self.src_addr
}
pub fn remote_addr(&self) -> &SocketAddr {
&self.dst_addr
}
}
impl AsyncRead for TcpStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let mut control = self.control.lock();
// Read from buffer
if control.recv_buffer.is_empty() {
// If socket is already closed / half closed, just return EOF directly.
if matches!(control.recv_state, TcpSocketState::Closed) {
return Ok(()).into();
}
// Nothing could be read. Wait for notify.
if let Some(old_waker) = control.recv_waker.replace(cx.waker().clone()) {
if !old_waker.will_wake(cx.waker()) {
old_waker.wake();
}
}
return Poll::Pending;
}
let recv_buf = buf.initialize_unfilled();
let n = control.recv_buffer.dequeue_slice(recv_buf);
buf.advance(n);
if n > 0 {
self.notify.notify_one();
}
Ok(()).into()
}
}
impl AsyncWrite for TcpStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
let mut control = self.control.lock();
// If state == Close | Closing | Closed, the TCP stream WR half is closed.
if !matches!(control.send_state, TcpSocketState::Normal) {
return Err(std::io::ErrorKind::BrokenPipe.into()).into();
}
// Write to buffer
if control.send_buffer.is_full() {
if let Some(old_waker) = control.send_waker.replace(cx.waker().clone()) {
if !old_waker.will_wake(cx.waker()) {
old_waker.wake();
}
}
return Poll::Pending;
}
let n = control.send_buffer.enqueue_slice(buf);
if n > 0 {
self.notify.notify_one();
}
Ok(n).into()
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Ok(()).into()
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
let mut control = self.control.lock();
if matches!(control.send_state, TcpSocketState::Closed) {
return Ok(()).into();
}
// SHUT_WR
if matches!(control.send_state, TcpSocketState::Normal) {
control.send_state = TcpSocketState::Close;
}
if let Some(old_waker) = control.send_waker.replace(cx.waker().clone()) {
if !old_waker.will_wake(cx.waker()) {
old_waker.wake();
}
}
self.notify.notify_one();
Poll::Pending
}
}

155
netstack-smoltcp/src/udp.rs Normal file
View File

@ -0,0 +1,155 @@
use std::{
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};
use etherparse::PacketBuilder;
use futures::{ready, Sink, SinkExt, Stream};
use smoltcp::wire::UdpPacket;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio_util::sync::PollSender;
use tracing::{error, trace};
use crate::packet::{AnyIpPktFrame, IpPacket};
pub type UdpMsg = (
Vec<u8>, /* payload */
SocketAddr, /* local */
SocketAddr, /* remote */
);
pub struct UdpSocket {
udp_rx: Receiver<AnyIpPktFrame>,
stack_tx: PollSender<AnyIpPktFrame>,
}
impl UdpSocket {
pub(super) fn new(udp_rx: Receiver<AnyIpPktFrame>, stack_tx: Sender<AnyIpPktFrame>) -> Self {
Self {
udp_rx,
stack_tx: PollSender::new(stack_tx),
}
}
pub fn split(self) -> (ReadHalf, WriteHalf) {
(
ReadHalf {
udp_rx: self.udp_rx,
},
WriteHalf {
stack_tx: self.stack_tx,
},
)
}
}
pub struct ReadHalf {
udp_rx: Receiver<AnyIpPktFrame>,
}
pub struct WriteHalf {
stack_tx: PollSender<AnyIpPktFrame>,
}
impl Stream for ReadHalf {
type Item = UdpMsg;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
loop {
match ready!(self.udp_rx.poll_recv(cx)) {
Some(frame) => {
let packet = match IpPacket::new_checked(frame.as_slice()) {
Ok(p) => p,
Err(err) => {
error!("invalid IP packet: {}", err);
continue;
}
};
let src_ip = packet.src_addr();
let dst_ip = packet.dst_addr();
let payload = packet.payload();
let packet = match UdpPacket::new_checked(payload) {
Ok(p) => p,
Err(err) => {
error!("invalid err: {err}, src_ip: {src_ip}, dst_ip: {dst_ip}, payload: {payload:?}");
continue;
}
};
let src_port = packet.src_port();
let dst_port = packet.dst_port();
let src_addr = SocketAddr::new(src_ip, src_port);
let dst_addr = SocketAddr::new(dst_ip, dst_port);
trace!("created UDP socket for {} <-> {}", src_addr, dst_addr);
return Poll::Ready(Some((packet.payload().to_vec(), src_addr, dst_addr)));
}
None => return Poll::Ready(None),
}
}
}
}
impl Sink<UdpMsg> for WriteHalf {
type Error = std::io::Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match ready!(self.stack_tx.poll_ready_unpin(cx)) {
Ok(()) => Poll::Ready(Ok(())),
Err(err) => Poll::Ready(Err(std::io::Error::other(err))),
}
}
fn start_send(mut self: Pin<&mut Self>, item: UdpMsg) -> Result<(), Self::Error> {
use std::io::{Error, ErrorKind::InvalidData};
let (data, src_addr, dst_addr) = item;
if data.is_empty() {
return Ok(());
}
let builder = match (src_addr, dst_addr) {
(SocketAddr::V4(src), SocketAddr::V4(dst)) => {
PacketBuilder::ipv4(src.ip().octets(), dst.ip().octets(), 20)
.udp(src_addr.port(), dst_addr.port())
}
(SocketAddr::V6(src), SocketAddr::V6(dst)) => {
PacketBuilder::ipv6(src.ip().octets(), dst.ip().octets(), 20)
.udp(src_addr.port(), dst_addr.port())
}
_ => {
return Err(Error::new(InvalidData, "src or destination type unmatch"));
}
};
let mut ip_packet_writer = Vec::with_capacity(builder.size(data.len()));
builder
.write(&mut ip_packet_writer, &data)
.map_err(|err| Error::other(format!("PacketBuilder::write: {err}")))?;
match self.stack_tx.start_send_unpin(ip_packet_writer) {
Ok(()) => Ok(()),
Err(err) => Err(Error::other(format!("send error: {err}"))),
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
use std::io::Error;
match ready!(self.stack_tx.poll_flush_unpin(cx)) {
Ok(()) => Poll::Ready(Ok(())),
Err(err) => Poll::Ready(Err(Error::other(format!("flush error: {err}")))),
}
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
use std::io::Error;
match ready!(self.stack_tx.poll_close_unpin(cx)) {
Ok(()) => Poll::Ready(Ok(())),
Err(err) => Poll::Ready(Err(Error::other(format!("close error: {err}")))),
}
}
}

View File

@ -0,0 +1,75 @@
//! Regression tests that reproduce the bugs found in the static analysis.
use std::time::Duration;
use etherparse::{IpNumber, Ipv4Header, UdpHeader};
use futures::SinkExt;
use tokio::time::timeout;
use netstack_smoltcp::StackBuilder;
fn make_udp_ipv4(
src_ip: [u8; 4],
src_port: u16,
dst_ip: [u8; 4],
dst_port: u16,
payload: &[u8],
) -> Vec<u8> {
let udp_hdr = UdpHeader::with_ipv4_checksum(
src_port,
dst_port,
&Ipv4Header::new(
(UdpHeader::LEN + payload.len()) as u16,
64,
IpNumber::UDP,
src_ip,
dst_ip,
)
.unwrap(),
payload,
)
.unwrap();
let ip_hdr = Ipv4Header::new(
(UdpHeader::LEN + payload.len()) as u16,
64,
IpNumber::UDP,
src_ip,
dst_ip,
)
.unwrap();
let mut buf = Vec::with_capacity(Ipv4Header::MIN_LEN + UdpHeader::LEN + payload.len());
ip_hdr.write(&mut buf).unwrap();
udp_hdr.write(&mut buf).unwrap();
buf.extend_from_slice(payload);
buf
}
/// before(include) a15e0b72bfc72cb032e67138070da01e325d66f8
/// sink_buf is used in `Stack` to hold a slot for sending any pkt
///
/// the original assumption is that the `poll_ready` -> `start_send` -> `poll_flush`
/// are called sequentially so the slot could be reused and will never get blocked.
///
/// but once the user calls `send_all` on `Stack`, which will not immediate flush the pkt(call `poll_flush`),
/// then `sink_buf` is could be Some(pkt), then it will trigger `Poll::Pending` branch in `Stack::poll_ready`,
/// who did not register the waker correctly, so it will got hanged forever.
#[tokio::test(flavor = "current_thread")]
async fn bug1_poll_ready_waker_registered_via_send_all() {
let (mut stack, _runner, _udp_socket, _tcp) = StackBuilder::default()
.enable_udp(true)
.udp_buffer_size(64)
.stack_buffer_size(64)
.build()
.unwrap();
let pkt1 = make_udp_ipv4([1, 2, 3, 4], 1111, [5, 6, 7, 8], 9999, b"first");
let pkt2 = make_udp_ipv4([1, 2, 3, 4], 1111, [5, 6, 7, 8], 9999, b"second");
let mut stream = futures::stream::iter([Ok(pkt1), Ok(pkt2)]);
let result = timeout(Duration::from_secs(1), stack.send_all(&mut stream)).await;
// should be ok after the fix
assert!(result.is_ok());
}