From e90bfc7510c0449629e000fc803fe006d2687e3d Mon Sep 17 00:00:00 2001 From: ospab Date: Sat, 30 May 2026 21:34:31 +0300 Subject: [PATCH] Patch netstack-smoltcp locally to fix catastrophic UDP tunnel stream crash on invalid packets --- Cargo.lock | 2 - Cargo.toml | 3 + netstack-smoltcp/.cargo-ok | 1 + netstack-smoltcp/.cargo_vcs_info.json | 6 + .../.github/actions/ndk-dev-rs/action.yml | 37 ++ netstack-smoltcp/.github/workflows/main.yml | 80 +++ .../.github/workflows/publish.yml | 15 + netstack-smoltcp/.gitignore | 9 + netstack-smoltcp/Cargo.toml | 136 +++++ netstack-smoltcp/Cargo.toml.orig | 51 ++ netstack-smoltcp/LICENSE-APACHE | 201 +++++++ netstack-smoltcp/LICENSE-MIT | 25 + netstack-smoltcp/README.md | 136 +++++ .../examples/forward-offload-linux.rs | 239 ++++++++ netstack-smoltcp/examples/forward.rs | 326 ++++++++++ netstack-smoltcp/scripts/bench-offload.sh | 174 ++++++ netstack-smoltcp/scripts/route-linux.sh | 26 + netstack-smoltcp/scripts/route-macos.sh | 36 ++ netstack-smoltcp/scripts/route-windows.ps1 | 30 + netstack-smoltcp/src/device.rs | 101 ++++ netstack-smoltcp/src/filter.rs | 56 ++ netstack-smoltcp/src/lib.rs | 22 + netstack-smoltcp/src/packet.rs | 53 ++ netstack-smoltcp/src/runner.rs | 41 ++ netstack-smoltcp/src/stack.rs | 279 +++++++++ netstack-smoltcp/src/tcp.rs | 561 ++++++++++++++++++ netstack-smoltcp/src/udp.rs | 155 +++++ netstack-smoltcp/tests/regression.rs | 75 +++ 28 files changed, 2874 insertions(+), 2 deletions(-) create mode 100644 netstack-smoltcp/.cargo-ok create mode 100644 netstack-smoltcp/.cargo_vcs_info.json create mode 100644 netstack-smoltcp/.github/actions/ndk-dev-rs/action.yml create mode 100644 netstack-smoltcp/.github/workflows/main.yml create mode 100644 netstack-smoltcp/.github/workflows/publish.yml create mode 100644 netstack-smoltcp/.gitignore create mode 100644 netstack-smoltcp/Cargo.toml create mode 100644 netstack-smoltcp/Cargo.toml.orig create mode 100644 netstack-smoltcp/LICENSE-APACHE create mode 100644 netstack-smoltcp/LICENSE-MIT create mode 100644 netstack-smoltcp/README.md create mode 100644 netstack-smoltcp/examples/forward-offload-linux.rs create mode 100644 netstack-smoltcp/examples/forward.rs create mode 100644 netstack-smoltcp/scripts/bench-offload.sh create mode 100644 netstack-smoltcp/scripts/route-linux.sh create mode 100644 netstack-smoltcp/scripts/route-macos.sh create mode 100644 netstack-smoltcp/scripts/route-windows.ps1 create mode 100644 netstack-smoltcp/src/device.rs create mode 100644 netstack-smoltcp/src/filter.rs create mode 100644 netstack-smoltcp/src/lib.rs create mode 100644 netstack-smoltcp/src/packet.rs create mode 100644 netstack-smoltcp/src/runner.rs create mode 100644 netstack-smoltcp/src/stack.rs create mode 100644 netstack-smoltcp/src/tcp.rs create mode 100644 netstack-smoltcp/src/udp.rs create mode 100644 netstack-smoltcp/tests/regression.rs diff --git a/Cargo.lock b/Cargo.lock index 5012e3c..17a07c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1289,8 +1289,6 @@ dependencies = [ [[package]] name = "netstack-smoltcp" version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "398691cef792b89eb5d29e6ea6b3999def706b908d355e29815ba8101cf5c4c8" dependencies = [ "etherparse", "futures", diff --git a/Cargo.toml b/Cargo.toml index 8a618c3..ed7ec3b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,3 +26,6 @@ tracing = "0.1" sha2 = "0.10" hmac = "0.12" portable-atomic = "1.10" + +[patch.crates-io] +netstack-smoltcp = { path = "netstack-smoltcp" } diff --git a/netstack-smoltcp/.cargo-ok b/netstack-smoltcp/.cargo-ok new file mode 100644 index 0000000..5f8b795 --- /dev/null +++ b/netstack-smoltcp/.cargo-ok @@ -0,0 +1 @@ +{"v":1} \ No newline at end of file diff --git a/netstack-smoltcp/.cargo_vcs_info.json b/netstack-smoltcp/.cargo_vcs_info.json new file mode 100644 index 0000000..444d072 --- /dev/null +++ b/netstack-smoltcp/.cargo_vcs_info.json @@ -0,0 +1,6 @@ +{ + "git": { + "sha1": "702f6dfe124c5e4d343cfd3ca5a3efe0446cf6f0" + }, + "path_in_vcs": "" +} \ No newline at end of file diff --git a/netstack-smoltcp/.github/actions/ndk-dev-rs/action.yml b/netstack-smoltcp/.github/actions/ndk-dev-rs/action.yml new file mode 100644 index 0000000..93b1eea --- /dev/null +++ b/netstack-smoltcp/.github/actions/ndk-dev-rs/action.yml @@ -0,0 +1,37 @@ +name: Setup Android NDK and Rust compiler ENV +description: Setup an Android_NDK_HOME environment by downloading and Rust compiler environment. +inputs: + rust-target: + description: Rust target to build + required: true + sdk-version: + description: Exact SDK version to use + default: "33" + ndk-version: + description: Exact NDK version to use + default: "25" + ndk-platform: + description: Which host platform to use + default: "linux" +runs: + using: "composite" + steps: + - name: Download Android NDK + run: curl --http1.1 -O https://dl.google.com/android/repository/android-ndk-r${{ inputs.ndk-version }}-${{ inputs.ndk-platform }}.zip + shell: bash + - name: Extract Android NDK + run: unzip -q android-ndk-r${{ inputs.ndk-version }}-${{ inputs.ndk-platform }}.zip + shell: bash + - name: Set Rust compiler ENV + run: | + ndk_home=${{ github.workspace }}/android-ndk-r${{ inputs.ndk-version }} + platform=$(ls ${ndk_home}/toolchains/llvm/prebuilt/ | head -1) + ndk_tool=${ndk_home}/toolchains/llvm/prebuilt/${platform}/bin + envvar_suffix=$(echo ${{ inputs.rust-target }} | sed "s/-/_/g") + upper_suffix=$(echo ${envvar_suffix} | tr '[:lower:]' '[:upper:]') + tool_prefix=${{ inputs.rust-target }}${{ inputs.sdk-version }} + echo "ANDROID_NDK_HOME=${ndk_home}" >> $GITHUB_ENV + echo "CC_${envvar_suffix}=${ndk_tool}/${tool_prefix}-clang" >> $GITHUB_ENV + echo "AR_${envvar_suffix}=${ndk_tool}/llvm-ar" >> $GITHUB_ENV + echo "CARGO_TARGET_${upper_suffix}_LINKER=${ndk_tool}/${tool_prefix}-clang" >> $GITHUB_ENV + shell: bash diff --git a/netstack-smoltcp/.github/workflows/main.yml b/netstack-smoltcp/.github/workflows/main.yml new file mode 100644 index 0000000..02a9d3c --- /dev/null +++ b/netstack-smoltcp/.github/workflows/main.yml @@ -0,0 +1,80 @@ +name: CI + +on: + push: + branches: + - '**' + pull_request: + branches: + - '**' + +env: + CARGO_INCREMENTAL: 0 + CARGO_REGISTRIES_CRATES_IO_PROTOCOL: sparse + +jobs: + test: + name: Test + runs-on: ${{ matrix.os }} + strategy: + matrix: + include: + - build: linux-amd64 + os: ubuntu-latest + target: x86_64-unknown-linux-gnu + - build: android-arm64 + os: ubuntu-latest + target: aarch64-linux-android + no_run: --no-run + - build: android-amd64 + os: ubuntu-latest + target: x86_64-linux-android + no_run: --no-run + - build: macos-amd64 + os: macos-latest + target: x86_64-apple-darwin + - build: macos-arm64 + os: macos-14 + target: aarch64-apple-darwin + - build: ios-arm64 + os: macos-latest + target: aarch64-apple-ios + no_run: --no-run + - build: windows-amd64 + os: windows-latest + target: x86_64-pc-windows-msvc + - build: windows-arm64 + os: windows-latest + target: aarch64-pc-windows-msvc + no_run: --no-run + steps: + - uses: actions/checkout@v4 + - name: Install Rust (rustup) + run: | + set -euxo pipefail + rustup toolchain install stable --no-self-update --profile minimal --target ${{ matrix.target }} + rustup default stable + shell: bash + - uses: Swatinem/rust-cache@v2 + - name: Setup android environment + if: contains(matrix.build, 'android') + uses: ./.github/actions/ndk-dev-rs + with: + rust-target: ${{ matrix.target }} + - run: cargo test ${{ matrix.no_run }} --workspace --target ${{ matrix.target }} + - run: cargo test ${{ matrix.no_run }} --workspace --target ${{ matrix.target }} --release + + msrv_n_clippy: + name: MSRV & Clippy & Rustfmt + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + steps: + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@stable + - run: cargo fmt -- --check + - run: cargo clippy --all-features -- -D warnings + - run: cargo check --lib -p netstack-smoltcp + - run: cargo check --lib -p netstack-smoltcp --all-features diff --git a/netstack-smoltcp/.github/workflows/publish.yml b/netstack-smoltcp/.github/workflows/publish.yml new file mode 100644 index 0000000..9097444 --- /dev/null +++ b/netstack-smoltcp/.github/workflows/publish.yml @@ -0,0 +1,15 @@ +on: + push: + tags: + - '*' + +jobs: + publish: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Publish to crates.io + run: | + cargo publish + env: + CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }} diff --git a/netstack-smoltcp/.gitignore b/netstack-smoltcp/.gitignore new file mode 100644 index 0000000..154e981 --- /dev/null +++ b/netstack-smoltcp/.gitignore @@ -0,0 +1,9 @@ +/target +/Cargo.lock + +.idea +.VSCodeCounter/ +.vscode +.DS_Store +*.iml +**/*.log diff --git a/netstack-smoltcp/Cargo.toml b/netstack-smoltcp/Cargo.toml new file mode 100644 index 0000000..587bb65 --- /dev/null +++ b/netstack-smoltcp/Cargo.toml @@ -0,0 +1,136 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies. +# +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. + +[package] +edition = "2021" +rust-version = "1.75.0" +name = "netstack-smoltcp" +version = "0.2.2" +authors = ["cavivie "] +build = false +autolib = false +autobins = false +autoexamples = false +autotests = false +autobenches = false +description = """ +A netstack for the special purpose of turning packets from/to a TUN interface +into TCP streams and UDP packets. It uses smoltcp-rs as the backend netstack. +""" +homepage = "https://github.com/cavivie/netstack-smoltcp" +documentation = "https://docs.rs/netstack-smoltcp" +readme = "README.md" +keywords = [ + "netstack", + "smoltcp", + "network", + "ip", + "tun", +] +categories = ["network-programming"] +license = "MIT OR Apache-2.0" +repository = "https://github.com/cavivie/netstack-smoltcp" + +[lib] +name = "netstack_smoltcp" +path = "src/lib.rs" + +[[example]] +name = "forward" +path = "examples/forward.rs" + +[[example]] +name = "forward-offload-linux" +path = "examples/forward-offload-linux.rs" + +[[test]] +name = "regression" +path = "tests/regression.rs" + +[dependencies.etherparse] +version = "0.16" + +[dependencies.futures] +version = "0.3" + +[dependencies.rand] +version = "0.8" + +[dependencies.smoltcp] +version = "0.12" +features = [ + "std", + "log", + "medium-ip", + "proto-ipv4", + "proto-ipv6", + "socket-icmp", + "socket-udp", + "socket-tcp", +] +default-features = false + +[dependencies.spin] +version = "0.9" + +[dependencies.tokio] +version = "1" +features = [ + "sync", + "time", + "rt", + "macros", +] + +[dependencies.tokio-util] +version = "0.7.10" + +[dependencies.tracing] +version = "0.1" +features = ["std"] +default-features = false + +[dev-dependencies.socket2] +version = "0.5.6" + +[dev-dependencies.socket2-ext] +version = "0.1" + +[dev-dependencies.structopt] +version = "0.3" + +[dev-dependencies.tokio] +version = "1" +features = [ + "rt", + "macros", + "rt-multi-thread", + "io-util", +] + +[dev-dependencies.tracing] +version = "0.1" +features = ["std"] +default-features = false + +[dev-dependencies.tracing-subscriber] +version = "0.3.18" + +[dev-dependencies.tun-rs] +version = "2" +features = [ + "async", + "async_framed", +] + +[dev-dependencies.tun2] +version = "3" +features = ["async"] diff --git a/netstack-smoltcp/Cargo.toml.orig b/netstack-smoltcp/Cargo.toml.orig new file mode 100644 index 0000000..627b82f --- /dev/null +++ b/netstack-smoltcp/Cargo.toml.orig @@ -0,0 +1,51 @@ +[package] +name = "netstack-smoltcp" +version = "0.2.2" +edition = "2021" +authors = ["cavivie "] +license = "MIT OR Apache-2.0" +repository = "https://github.com/cavivie/netstack-smoltcp" +homepage = "https://github.com/cavivie/netstack-smoltcp" +documentation = "https://docs.rs/netstack-smoltcp" +keywords = ["netstack", "smoltcp", "network", "ip", "tun"] +categories = ["network-programming"] +description = """ +A netstack for the special purpose of turning packets from/to a TUN interface +into TCP streams and UDP packets. It uses smoltcp-rs as the backend netstack. +""" +rust-version = "1.75.0" + +[dependencies] +tracing = { version = "0.1", default-features = false, features = ["std"] } +tokio = { version = "1", features = ["sync", "time", "rt", "macros"] } +tokio-util = "0.7.10" +etherparse = "0.16" +futures = "0.3" +rand = "0.8" +spin = "0.9" +smoltcp = { version = "0.12", default-features = false, features = [ + "std", + "log", + "medium-ip", + "proto-ipv4", + "proto-ipv6", + "socket-icmp", + "socket-udp", + "socket-tcp", +] } + +[dev-dependencies] +tun2 = { version = "3", features = ["async"] } +# has better performance on linux than tun2 +tun-rs = { version = "2", features = ["async", "async_framed"] } +tokio = { version = "1", features = [ + "rt", + "macros", + "rt-multi-thread", + "io-util", +] } +tracing = { version = "0.1", default-features = false, features = ["std"] } +tracing-subscriber = "0.3.18" +structopt = "0.3" +socket2 = "0.5.6" +socket2-ext = { version = "0.1" } diff --git a/netstack-smoltcp/LICENSE-APACHE b/netstack-smoltcp/LICENSE-APACHE new file mode 100644 index 0000000..f49a4e1 --- /dev/null +++ b/netstack-smoltcp/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/netstack-smoltcp/LICENSE-MIT b/netstack-smoltcp/LICENSE-MIT new file mode 100644 index 0000000..ede7ee9 --- /dev/null +++ b/netstack-smoltcp/LICENSE-MIT @@ -0,0 +1,25 @@ +Copyright (c) 2024 cavivie and netstack-smoltcp Contributors + +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/netstack-smoltcp/README.md b/netstack-smoltcp/README.md new file mode 100644 index 0000000..178e1be --- /dev/null +++ b/netstack-smoltcp/README.md @@ -0,0 +1,136 @@ +# Netstack Smoltcp + +A netstack for the special purpose of turning packets from/to a TUN interface into TCP streams and UDP packets. It uses smoltcp-rs as the backend netstack. + +[![Crates.io][crates-badge]][crates-url] +[![MIT licensed][mit-badge]][mit-url] +[![Apache licensed, Version 2.0][apache-badge]][apache-url] +[![Build Status][actions-badge]][actions-url] + +[crates-badge]: https://img.shields.io/crates/v/netstack-smoltcp.svg +[crates-url]: https://crates.io/crates/netstack-smoltcp +[mit-badge]: https://img.shields.io/badge/license-MIT-blue.svg +[mit-url]: https://github.com/automesh-network/netstack-smoltcp/blob/master/LICENSE-MIT +[apache-badge]: https://img.shields.io/badge/license-APACHE2.0-blue.svg +[apache-url]: https://github.com/automesh-network/netstack-smoltcp/blob/master/LICENSE-APACHE +[actions-badge]: https://github.com/automesh-network/netstack-smoltcp/workflows/CI/badge.svg +[actions-url]: https://github.com/automesh-network/netstack-smoltcp/actions?query=workflow%3ACI+branch%3Amain + +## Features + +- Supports Future Send and non-Send, mostly pepole use Send. +- Supports ICMP protocol drive by TCP runner to use ICMP ping. +- Supports filtering packets by source and destination IP addresses. +- Can read IP packets from netstack, write IP packets to netstack. +- Can receive TcpStream from TcpListener exposed from netstack. +- Can receive UDP datagram from UdpSocket exposed from netstack. +- Implements popular future streaming traits and asynchronous IO traits: + * TcpListener implements futures Stream/Sink trait + * TcpStream implements tokio AsyncRead/AsyncWrite trait + * UdpSocket(ReadHalf/WriteHalf) implements futures Stream/Sink trait. + +## Platforms + +This crate provides lightweight netstack support for Linux, iOS, macOS, Android and Windows. +Currently, it works on most targets, but mainly tested the popular platforms which includes: +- linux-amd64: x86_64-unknown-linux-gnu +- android-arm64: aarch64-linux-android +- android-amd64: x86_64-linux-android +- macos-amd64: x86_64-apple-darwin +- macos-arm64: aarch64-apple-darwin +- ios-arm64: aarch64-apple-ios +- windows-amd64: x86_64-pc-windows-msvc +- windows-arm64: aarch64-pc-windows-msvc + +## Example + +```rust +// let device = tun2::create_as_async(&cfg)?; +// let framed = device.into_framed(); + +let (stack, runner, udp_socket, tcp_listener) = netstack_smoltcp::StackBuilder::default() + .stack_buffer_size(512) + .tcp_buffer_size(4096) + .enable_udp(true) + .enable_tcp(true) + .enable_icmp(true) + .mtu(9000) // virtual device usually benefits from larger MTU + .build() + .unwrap(); +let mut udp_socket = udp_socket.unwrap(); // udp enabled +let mut tcp_listener = tcp_listener.unwrap(); // tcp/icmp enabled +if let Some(runner) = runner { + tokio::spawn(runner); +} + +let (mut stack_sink, mut stack_stream) = stack.split(); +let (mut tun_sink, mut tun_stream) = framed.split(); + +// Reads packet from stack and sends to TUN. +tokio::spawn(async move { + while let Some(pkt) = stack_stream.next().await { + if let Ok(pkt) = pkt { + tun_sink.send(pkt).await.unwrap(); + } + } +}); + +// Reads packet from TUN and sends to stack. +tokio::spawn(async move { + while let Some(pkt) = tun_stream.next().await { + if let Ok(pkt) = pkt { + stack_sink.send(pkt).await.unwrap(); + } + } +}); + +// Extracts TCP connections from stack and sends them to the dispatcher. +tokio::spawn(async move { + handle_inbound_stream(tcp_listener).await; +}); + +// Receive and send UDP packets between netstack and NAT manager. The NAT +// manager would maintain UDP sessions and send them to the dispatcher. +tokio::spawn(async move { + handle_inbound_datagram(udp_socket).await; +}); +``` + +## Performance + +Typically, `netstack-smoltcp` will be used with an tun device, so a careful choice of TUN crate matters. + +[tun-rs](https://github.com/tun-rs/tun-rs) have better performance on **Linux** than [rust-tun](https://github.com/meh/rust-tun/) due to GSO/GRO which allow you to process the packets in batches. + +`bash scripts/bench-offload.sh` could tell that `tun-rs` boosts the performance by 4x. Try it out on your Linux machine! + +The example for using `tun-rs` with `netstack-smoltcp` could be found at [forward-offload-linux.rs](examples/forward-offload-linux.rs) + +For further tuning, refer to `tun-rs`'s detailed [README](https://github.com/tun-rs/tun-rs/blob/main/README.md) + +## License + +This project is licensed under either of + + * Apache License, Version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or + https://www.apache.org/licenses/LICENSE-2.0) + * MIT license ([LICENSE-MIT](LICENSE-MIT) or + https://opensource.org/licenses/MIT) + +at your option. + +### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in netstack-smoltcp by you, as defined in the Apache-2.0 license, +shall be dual licensed as above, without any additional terms or conditions. + +## Inspired By + +Special thanks to these amazing projects that inspired netstack-smoltcp (in no particular order): +- [shadowsocks-rust](https://github.com/shadowsocks/shadowsocks-rust/) +- [netstack-lwip](https://github.com/eycorsican/netstack-lwip/) +- [rust-tun-active](https://github.com/tun2proxy/rust-tun) +- [rust-tun](https://github.com/meh/rust-tun/) +- [tun-rs](https://github.com/tun-rs/tun-rs) +- [smoltcp](https://github.com/smoltcp-rs/smoltcp) \ No newline at end of file diff --git a/netstack-smoltcp/examples/forward-offload-linux.rs b/netstack-smoltcp/examples/forward-offload-linux.rs new file mode 100644 index 0000000..573ba53 --- /dev/null +++ b/netstack-smoltcp/examples/forward-offload-linux.rs @@ -0,0 +1,239 @@ +#[cfg(target_os = "linux")] +mod inner { + use futures::{SinkExt, StreamExt}; + use netstack_smoltcp::{StackBuilder, TcpListener, UdpSocket}; + use std::{net::SocketAddr, sync::Arc}; + use structopt::StructOpt; + use tokio::net::{TcpSocket, TcpStream}; + use tracing::{error, info, warn}; + use tun_rs::{DeviceBuilder, IDEAL_BATCH_SIZE, VIRTIO_NET_HDR_LEN}; + + // Patched forward example: tun2 → tun-rs with Linux GRO/GSO offload. + // For further reading, check out https://blog.cloudflare.com/virtual-networking-101-understanding-tap + // + // Key changes vs forward.rs: + // 1. Use tun-rs DeviceBuilder with .offload(true) on Linux (enables + // IFF_VNET_HDR + TUN_F_CSUM/TSO4/TSO6/USO4/USO6). + // 2. TX (stack → TUN): prepend 10-byte zero virtio_net_hdr (GSO_NONE) + // so the kernel accepts the write when IFF_VNET_HDR is set. + // 3. RX (TUN → stack): use recv_multiple() for batch GSO splitting; + // buffers sized to 1600 to fit smoltcp's 1504-byte MTU segments. + #[derive(Debug, StructOpt)] + #[structopt(name = "forward", about = "Simply forward tun tcp/udp traffic.")] + struct Opt { + /// Outbound interface to bind forwarded connections to. + #[structopt(short = "i", long = "interface")] + interface: String, + /// Name of the TUN device. + #[structopt(short = "n", long = "name", default_value = "utun8")] + name: String, + /// Tracing log level. + #[structopt(long = "log-level", default_value = "debug")] + log_level: tracing::Level, + /// Use current-thread Tokio runtime (default: multi-thread). + #[structopt(long = "current-thread")] + current_thread: bool, + /// Use spawn_local instead of spawn. + #[structopt(long = "local-task")] + local_task: bool, + } + + pub(super) fn main() { + let opt = Opt::from_args(); + let rt = if opt.current_thread { + tokio::runtime::Builder::new_current_thread() + } else { + tokio::runtime::Builder::new_multi_thread() + } + .enable_all() + .build() + .unwrap(); + rt.block_on(main_exec(opt)); + } + + async fn main_exec(opt: Opt) { + macro_rules! tokio_spawn { + ($fut:expr) => { + if opt.local_task { + tokio::task::spawn_local($fut) + } else { + tokio::task::spawn($fut) + } + }; + } + tracing::subscriber::set_global_default( + tracing_subscriber::FmtSubscriber::builder() + .with_max_level(opt.log_level) + .finish(), + ) + .unwrap(); + + // Build TUN device with GRO/GSO offload on Linux. + let builder = DeviceBuilder::new() + .name(opt.name) + .ipv4("10.10.10.2", 24, Some("10.10.10.1")) + .mtu(9000); + let builder = builder.offload(true); + let dev = Arc::new(builder.build_async().unwrap()); + + let (stack, runner, udp_socket, tcp_listener) = StackBuilder::default() + .enable_tcp(true) + .enable_udp(true) + .enable_icmp(true) + .build() + .unwrap(); + let udp_socket = udp_socket.unwrap(); + let tcp_listener = tcp_listener.unwrap(); + if let Some(runner) = runner { + tokio_spawn!(runner); + } + let (mut stack_sink, mut stack_stream) = stack.split(); + + let mut futs = vec![]; + + // stack → TUN + // With IFF_VNET_HDR every write must start with a virtio_net_hdr. + // We use all-zero (gso_type = GSO_NONE, flags = 0): plain packet, + // checksum already valid (smoltcp always computes checksums itself). + let dev1 = dev.clone(); + futs.push(tokio_spawn!(async move { + while let Some(pkt) = stack_stream.next().await { + if let Ok(pkt) = pkt { + let result = { + let mut buf = vec![0u8; VIRTIO_NET_HDR_LEN + pkt.len()]; + buf[VIRTIO_NET_HDR_LEN..].copy_from_slice(&pkt); + dev1.send(&buf).await + }; + if let Err(e) = result { + warn!("failed to send packet to TUN: {:?}", e); + } + } + } + })); + + // TUN → stack + // recv_multiple() does one read() syscall and returns N individual IP + // packets after splitting any incoming GRO super-packet. + // Buffer size 1600 > smoltcp MTU (1504) to avoid an out-of-bounds panic + // when the kernel segments at MSS=1464 with 40-byte IP+TCP headers. + futs.push(tokio_spawn!(async move { + let mut orig = vec![0u8; VIRTIO_NET_HDR_LEN + 65535]; + let mut bufs = vec![vec![0u8; 1600]; IDEAL_BATCH_SIZE]; + let mut sizes = vec![0usize; IDEAL_BATCH_SIZE]; + while let Ok(n) = dev.recv_multiple(&mut orig, &mut bufs, &mut sizes, 0).await { + for i in 0..n { + let pkt = &bufs[i][..sizes[i]]; + if let Err(e) = stack_sink.send(pkt.to_vec()).await { + warn!("failed to send packet to stack: {:?}", e); + } + } + } + })); + + futs.push(tokio_spawn!({ + let iface = opt.interface.clone(); + async move { + handle_inbound_stream(tcp_listener, iface).await; + } + })); + + futs.push(tokio_spawn!(async move { + handle_inbound_datagram(udp_socket, opt.interface).await; + })); + + futures::future::join_all(futs).await.iter().for_each(|r| { + if let Err(e) = r { + error!("{:?}", e); + } + }); + } + + async fn handle_inbound_stream(mut tcp_listener: TcpListener, interface: String) { + while let Some((mut stream, local, remote)) = tcp_listener.next().await { + let interface = interface.clone(); + tokio::spawn(async move { + info!("tcp: {:?} => {:?}", local, remote); + match new_tcp_stream(remote, &interface).await { + Ok(mut r) => { + if let Err(e) = tokio::io::copy_bidirectional(&mut stream, &mut r).await { + warn!( + "failed to copy tcp stream {:?}=>{:?}: {:?}", + local, remote, e + ); + } + } + Err(e) => warn!( + "failed to open tcp stream {:?}=>{:?}: {:?}", + local, remote, e + ), + } + }); + } + } + + async fn handle_inbound_datagram(udp_socket: UdpSocket, interface: String) { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let (mut read_half, mut write_half) = udp_socket.split(); + tokio::spawn(async move { + while let Some((data, local, remote)) = rx.recv().await { + let _ = write_half.send((data, remote, local)).await; + } + }); + while let Some((data, local, remote)) = read_half.next().await { + let tx = tx.clone(); + let interface = interface.clone(); + tokio::spawn(async move { + match new_udp_packet(remote, &interface).await { + Ok(sock) => { + let _ = sock.send(&data).await; + loop { + let mut buf = vec![0; 1024]; + match sock.recv_from(&mut buf).await { + Ok((n, _)) => { + let _ = tx.send((buf[..n].to_vec(), local, remote)); + } + Err(e) => { + warn!("udp recv {:?}: {:?}", remote, e); + break; + } + } + } + } + Err(e) => warn!("failed to open udp socket {:?}: {:?}", remote, e), + } + }); + } + } + + async fn new_tcp_stream(addr: SocketAddr, iface: &str) -> std::io::Result { + use socket2_ext::{AddressBinding, BindDeviceOption}; + let s = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, None)?; + s.bind_to_device(BindDeviceOption::v4(iface))?; + s.set_keepalive(true)?; + s.set_nodelay(true)?; + s.set_nonblocking(true)?; + Ok(TcpSocket::from_std_stream(s.into()).connect(addr).await?) + } + + async fn new_udp_packet( + addr: SocketAddr, + iface: &str, + ) -> std::io::Result { + use socket2_ext::{AddressBinding, BindDeviceOption}; + let s = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::DGRAM, None)?; + s.bind_to_device(BindDeviceOption::v4(iface))?; + s.set_nonblocking(true)?; + let sock = tokio::net::UdpSocket::from_std(s.into())?; + sock.connect(addr).await?; + Ok(sock) + } +} + +#[cfg(not(target_os = "linux"))] +mod inner { + pub(super) fn main() {} +} + +fn main() { + inner::main(); +} diff --git a/netstack-smoltcp/examples/forward.rs b/netstack-smoltcp/examples/forward.rs new file mode 100644 index 0000000..d9a79d4 --- /dev/null +++ b/netstack-smoltcp/examples/forward.rs @@ -0,0 +1,326 @@ +use std::net::{IpAddr, SocketAddr}; + +use futures::{SinkExt, StreamExt}; +use netstack_smoltcp::{StackBuilder, TcpListener, UdpSocket}; +use structopt::StructOpt; +use tokio::net::{TcpSocket, TcpStream}; +use tracing::{error, info, warn}; + +// to run this example, you should set the policy routing **after the start of the main program** +// +// linux: +// with bind device: +// `curl 1.1.1.1 --interface utun8` +// with default route: +// `bash scripts/route-linux.sh add` +// `curl 1.1.1.1` +// with single route: +// `ip rule add to 1.1.1.1 table 200` +// `ip route add default dev utun8 table 200` +// `curl 1.1.1.1` +// +// macos: +// with default route: +// `bash scripts/route-macos.sh add` +// `curl 1.1.1.1` +// +// windows: +// with default route: +// tun2 set default route automatically, won't set agian +// # `powershell.exe scripts/route-windows.ps1 add` +// `curl 1.1.1.1` +// +// currently, the example only supports the TCP stream, and the UDP packet will be dropped. + +#[derive(Debug, StructOpt)] +#[structopt(name = "forward", about = "Simply forward tun tcp/udp traffic.")] +struct Opt { + /// Default binding interface, default by guessed. + /// Specify but doesn't exist, no device is bound. + #[structopt(short = "i", long = "interface")] + interface: String, + + /// name of the tun device, default to rtun8. + #[structopt(short = "n", long = "name", default_value = "utun8")] + name: String, + + /// Tracing subscriber log level. + #[structopt(long = "log-level", default_value = "debug")] + log_level: tracing::Level, + + /// Tokio current-thread runtime, default to multi-thread. + #[structopt(long = "current-thread")] + current_thread: bool, + + /// Tokio task spawn_local, default to spwan. + #[structopt(long = "local-task")] + local_task: bool, +} + +fn main() { + let opt = Opt::from_args(); + + let rt = if opt.current_thread { + tokio::runtime::Builder::new_current_thread() + } else { + tokio::runtime::Builder::new_multi_thread() + } + .enable_all() + .build() + .unwrap(); + + rt.block_on(main_exec(opt)); +} + +async fn main_exec(opt: Opt) { + macro_rules! tokio_spawn { + ($fut: expr) => { + if opt.local_task { + tokio::task::spawn_local($fut) + } else { + tokio::task::spawn($fut) + } + }; + } + + tracing::subscriber::set_global_default( + tracing_subscriber::FmtSubscriber::builder() + .with_max_level(opt.log_level) + .finish(), + ) + .unwrap(); + + let mut cfg = tun2::Configuration::default(); + cfg.layer(tun2::Layer::L3); + let fd = -1; + if fd >= 0 { + cfg.raw_fd(fd); + } else { + cfg.tun_name(&opt.name) + .address("10.10.10.2") + .destination("10.10.10.1") + .mtu(tun2::DEFAULT_MTU); + #[cfg(not(any(target_arch = "mips", target_arch = "mips64",)))] + { + cfg.netmask("255.255.255.0"); + } + cfg.up(); + } + + let device = tun2::create_as_async(&cfg).unwrap(); + let mut builder = StackBuilder::default() + .enable_tcp(true) + .enable_udp(true) + .enable_icmp(true) + .mtu(9000); + if let Some(device_broadcast) = get_device_broadcast(&device) { + builder = builder + // .add_ip_filter(Box::new(move |src, dst| *src != device_broadcast && *dst != device_broadcast)); + .add_ip_filter_fn(move |src, dst| *src != device_broadcast && *dst != device_broadcast); + } + + let (stack, runner, udp_socket, tcp_listener) = builder.build().unwrap(); + let udp_socket = udp_socket.unwrap(); // udp enabled + let tcp_listener = tcp_listener.unwrap(); // tcp enabled or icmp enabled + + if let Some(runner) = runner { + tokio_spawn!(runner); + } + + let framed = device.into_framed(); + let (mut tun_sink, mut tun_stream) = framed.split(); + let (mut stack_sink, mut stack_stream) = stack.split(); + + let mut futs = vec![]; + + // Reads packet from stack and sends to TUN. + futs.push(tokio_spawn!(async move { + while let Some(pkt) = stack_stream.next().await { + if let Ok(pkt) = pkt { + match tun_sink.send(pkt).await { + Ok(_) => {} + Err(e) => warn!("failed to send packet to TUN, err: {:?}", e), + } + } + } + })); + + // Reads packet from TUN and sends to stack. + futs.push(tokio_spawn!(async move { + while let Some(pkt) = tun_stream.next().await { + if let Ok(pkt) = pkt { + match stack_sink.send(pkt).await { + Ok(_) => {} + Err(e) => warn!("failed to send packet to stack, err: {:?}", e), + }; + } + } + })); + + // Extracts TCP connections from stack and sends them to the dispatcher. + futs.push(tokio_spawn!({ + let interface = opt.interface.clone(); + async move { + handle_inbound_stream(tcp_listener, interface).await; + } + })); + + // Receive and send UDP packets between netstack and NAT manager. The NAT + // manager would maintain UDP sessions and send them to the dispatcher. + futs.push(tokio_spawn!(async move { + handle_inbound_datagram(udp_socket, opt.interface).await; + })); + + futures::future::join_all(futs) + .await + .iter() + .for_each(|res| { + if let Err(e) = res { + error!("error: {:?}", e); + } + }); +} + +/// simply forward tcp stream +async fn handle_inbound_stream(mut tcp_listener: TcpListener, interface: String) { + while let Some((mut stream, local, remote)) = tcp_listener.next().await { + let interface = interface.clone(); + tokio::spawn(async move { + info!("new tcp connection: {:?} => {:?}", local, remote); + match new_tcp_stream(remote, &interface).await { + Ok(mut remote_stream) => { + // pipe between two tcp stream + match tokio::io::copy_bidirectional(&mut stream, &mut remote_stream).await { + Ok(_) => {} + Err(e) => warn!( + "failed to copy tcp stream {:?}=>{:?}, err: {:?}", + local, remote, e + ), + } + } + Err(e) => warn!( + "failed to new tcp stream {:?}=>{:?}, err: {:?}", + local, remote, e + ), + } + }); + } +} + +/// simply forward udp datagram +async fn handle_inbound_datagram(udp_socket: UdpSocket, interface: String) { + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let (mut read_half, mut write_half) = udp_socket.split(); + tokio::spawn(async move { + while let Some((data, local, remote)) = rx.recv().await { + let _ = write_half.send((data, remote, local)).await; + } + }); + + while let Some((data, local, remote)) = read_half.next().await { + let tx = tx.clone(); + let interface = interface.clone(); + tokio::spawn(async move { + info!("new udp datagram: {:?} => {:?}", local, remote); + match new_udp_packet(remote, &interface).await { + Ok(remote_socket) => { + // pipe between two udp sockets + let _ = remote_socket.send(&data).await; + loop { + let mut buf = vec![0; 1024]; + match remote_socket.recv_from(&mut buf).await { + Ok((len, _)) => { + let _ = tx.send((buf[..len].to_vec(), local, remote)); + } + Err(e) => { + warn!( + "failed to recv udp datagram {:?}<->{:?}: {:?}", + local, remote, e + ); + break; + } + } + } + } + Err(e) => warn!( + "failed to new udp socket {:?}=>{:?}, err: {:?}", + local, remote, e + ), + } + }); + } +} + +async fn new_tcp_stream<'a>(addr: SocketAddr, iface: &str) -> std::io::Result { + use socket2_ext::{AddressBinding, BindDeviceOption}; + let socket = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, None)?; + socket.bind_to_device(BindDeviceOption::v4(iface))?; + socket.set_keepalive(true)?; + socket.set_nodelay(true)?; + socket.set_nonblocking(true)?; + + let stream = TcpSocket::from_std_stream(socket.into()) + .connect(addr) + .await?; + + Ok(stream) +} + +async fn new_udp_packet(addr: SocketAddr, iface: &str) -> std::io::Result { + use socket2_ext::{AddressBinding, BindDeviceOption}; + let socket = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::DGRAM, None)?; + socket.bind_to_device(BindDeviceOption::v4(iface))?; + socket.set_nonblocking(true)?; + + let socket = tokio::net::UdpSocket::from_std(socket.into()); + if let Ok(ref socket) = socket { + socket.connect(addr).await?; + } + socket +} + +fn get_device_broadcast(device: &tun2::AsyncDevice) -> Option { + use tun2::AbstractDevice; + + let mtu = device.mtu().unwrap_or(tun2::DEFAULT_MTU); + + let address = match device.address() { + Ok(a) => match a { + IpAddr::V4(v4) => v4, + IpAddr::V6(_) => return None, + }, + Err(_) => return None, + }; + + let netmask = match device.netmask() { + Ok(n) => match n { + IpAddr::V4(v4) => v4, + IpAddr::V6(_) => return None, + }, + Err(_) => return None, + }; + + match smoltcp::wire::Ipv4Cidr::from_netmask(address, netmask) { + Ok(address_net) => match address_net.broadcast() { + Some(broadcast) => { + info!( + "tun device network: {} (address: {}, netmask: {}, broadcast: {}, mtu: {})", + address_net, address, netmask, broadcast, mtu, + ); + + Some(broadcast) + } + None => { + error!("invalid tun address {}, netmask {}", address, netmask); + None + } + }, + Err(err) => { + error!( + "invalid tun address {}, netmask {}, error: {}", + address, netmask, err + ); + None + } + } +} diff --git a/netstack-smoltcp/scripts/bench-offload.sh b/netstack-smoltcp/scripts/bench-offload.sh new file mode 100644 index 0000000..09bd837 --- /dev/null +++ b/netstack-smoltcp/scripts/bench-offload.sh @@ -0,0 +1,174 @@ +#!/usr/bin/env bash +# bench-offload.sh +# +# Benchmarks netstack-smoltcp's forward examples with 2-stream iperf3. +# Compares: +# - examples/forward (tun2, no GRO/GSO offload) +# - examples/forward-offload-linux (tun-rs, Linux GRO/GSO offload via IFF_VNET_HDR) +# +# Setup: creates a veth pair + network namespace; iperf3 server runs inside +# the namespace, the forward proxy bridges traffic through a TUN device. +# +# Requirements: cargo, iperf3, ip (iproute2), root/CAP_NET_ADMIN +# +# Usage: +# sudo bash scripts/bench-offload.sh +# +# Run from the root of the netstack-smoltcp repository. + +set -euo pipefail + +# ── config ──────────────────────────────────────────────────────────────────── +REPO_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +NS=bench +VETH_HOST=veth-host +VETH_NS=veth-bench +HOST_IP=172.19.0.1 +NS_IP=172.19.0.2 +PREFIX=24 +TUN_NAME=utun8 +TUN_IP=10.10.10.2 +IPERF_PORT=5201 +DURATION=15 +STREAMS=2 + +# ── helpers ─────────────────────────────────────────────────────────────────── +die() { echo "ERROR: $*" >&2; exit 1; } +require() { command -v "$1" &>/dev/null || die "'$1' not found"; } + +cleanup() { + pkill -f "forward-" 2>/dev/null || true + ip netns exec "$NS" pkill iperf3 2>/dev/null || true + ip route del "${NS_IP}/32" dev "$TUN_NAME" 2>/dev/null || true + ip tuntap del dev "$TUN_NAME" mode tun 2>/dev/null || true + ip link del "$VETH_HOST" 2>/dev/null || true + ip netns del "$NS" 2>/dev/null || true +} +trap cleanup EXIT + +# ── preflight ───────────────────────────────────────────────────────────────── +require cargo +require iperf3 +require ip +[[ $EUID -eq 0 ]] || die "run as root (needs CAP_NET_ADMIN for TUN + netns)" +[[ -f "$REPO_DIR/Cargo.toml" ]] || die "run from the netstack-smoltcp repo root" +grep -q 'name = "netstack-smoltcp"' "$REPO_DIR/Cargo.toml" \ + || die "Cargo.toml does not look like netstack-smoltcp" + +# ── network setup ───────────────────────────────────────────────────────────── +echo "[net] setting up namespace '$NS' and veth pair..." +cleanup 2>/dev/null || true +sleep 0.5 + +ip netns add "$NS" +ip link add "$VETH_HOST" type veth peer name "$VETH_NS" +ip link set "$VETH_NS" netns "$NS" +ip addr add "${HOST_IP}/${PREFIX}" dev "$VETH_HOST" +ip link set "$VETH_HOST" up +ip netns exec "$NS" ip addr add "${NS_IP}/${PREFIX}" dev "$VETH_NS" +ip netns exec "$NS" ip link set "$VETH_NS" up +ip netns exec "$NS" ip link set lo up +echo "[net] ${HOST_IP} <──veth──> ${NS_IP} (ns:${NS})" + +# ── build: forward (tun2, no offload) ──────────────────────────────────────── +echo "" +echo "[build] examples/forward (tun2, no GRO/GSO offload)..." +( + cd "$REPO_DIR" + cargo build --example forward --release --quiet + cp target/release/examples/forward /tmp/forward-tun2 +) +echo "[build] done → /tmp/forward-tun2" + +# ── build: forward-offload-linux (tun-rs, GRO/GSO offload) ─────────────────── +echo "" +echo "[build] examples/forward-offload-linux (tun-rs, GRO/GSO offload)..." +( + cd "$REPO_DIR" + cargo build --example forward-offload-linux --release --quiet + cp target/release/examples/forward-offload-linux /tmp/forward-tun-rs +) +echo "[build] done → /tmp/forward-tun-rs" + +# ── benchmark runner ────────────────────────────────────────────────────────── +run_bench() { + local label="$1" binary="$2" + + # clean any leftover state + pkill -f "forward-" 2>/dev/null || true + ip netns exec "$NS" pkill iperf3 2>/dev/null || true + ip route del "${NS_IP}/32" dev "$TUN_NAME" 2>/dev/null || true + ip tuntap del dev "$TUN_NAME" mode tun 2>/dev/null || true + sleep 0.8 + + # start iperf3 server inside namespace + ip netns exec "$NS" iperf3 -s -p "$IPERF_PORT" -D \ + --logfile /tmp/iperf3-bench-server.log + + # start proxy + "$binary" -i "$VETH_HOST" -n "$TUN_NAME" --log-level warn & + sleep 2 + + ip link show "$TUN_NAME" &>/dev/null \ + || { echo " [!] TUN not up, skipping"; return 1; } + + # route iperf3 traffic through TUN (more-specific /32 overrides /24 via veth) + ip route add "${NS_IP}/32" dev "$TUN_NAME" + + echo " running iperf3: ${STREAMS} streams × ${DURATION}s …" + local out + out=$(iperf3 -c "$NS_IP" -p "$IPERF_PORT" \ + -t "$DURATION" -P "$STREAMS" 2>&1) + + local sender receiver + sender=$(echo "$out" | grep "SUM.*sender" | awk '{print $6, $7}') + receiver=$(echo "$out" | grep "SUM.*receiver" | awk '{print $6, $7}') + + if [[ -z "$sender" ]]; then + echo " result: FAILED" + echo "$out" | tail -5 | sed 's/^/ /' + else + printf " sender: %s\n" "$sender" + printf " receiver: %s\n" "$receiver" + fi + + pkill -f "forward-" 2>/dev/null || true + ip netns exec "$NS" pkill iperf3 2>/dev/null || true + ip route del "${NS_IP}/32" dev "$TUN_NAME" 2>/dev/null || true + ip tuntap del dev "$TUN_NAME" mode tun 2>/dev/null || true + sleep 0.8 +} + +# ── direct baseline ─────────────────────────────────────────────────────────── +echo "" +echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" +echo " BASELINE: direct veth (no TUN, no proxy)" +echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" +ip netns exec "$NS" pkill iperf3 2>/dev/null || true; sleep 0.3 +ip netns exec "$NS" iperf3 -s -p "$IPERF_PORT" -D \ + --logfile /tmp/iperf3-bench-server.log; sleep 0.3 +echo " running iperf3: ${STREAMS} streams × ${DURATION}s …" +baseline_out=$(iperf3 -c "$NS_IP" -p "$IPERF_PORT" \ + -t "$DURATION" -P "$STREAMS" 2>&1) +echo "$baseline_out" | grep "SUM.*sender" | awk '{printf " sender: %s %s\n", $6, $7}' +echo "$baseline_out" | grep "SUM.*receiver" | awk '{printf " receiver: %s %s\n", $6, $7}' +ip netns exec "$NS" pkill iperf3 2>/dev/null || true; sleep 0.5 + +# ── tun2 ───────────────────────────────────────────────────────────────────── +echo "" +echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" +echo " tun2 (main branch — no GRO/GSO offload)" +echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" +run_bench "tun2" /tmp/forward-tun2 + +# ── tun-rs + offload ────────────────────────────────────────────────────────── +echo "" +echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" +echo " tun-rs (patched — GRO/GSO offload via IFF_VNET_HDR)" +echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" +run_bench "tun-rs+offload" /tmp/forward-tun-rs + +echo "" +echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" +echo " done." +echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" \ No newline at end of file diff --git a/netstack-smoltcp/scripts/route-linux.sh b/netstack-smoltcp/scripts/route-linux.sh new file mode 100644 index 0000000..ca0b691 --- /dev/null +++ b/netstack-smoltcp/scripts/route-linux.sh @@ -0,0 +1,26 @@ +#!/bin/bash +#__author__: cavivie + +DEFAULT_TUN_NAME="utun8" + +function do_route() { + local route_op="${1}" + local tun_name="${2:-$DEFAULT_TUN_NAME}" + ip route ${route_op} 0.0.0.0/1 dev ${tun_name} + ip route ${route_op} 128.0.0.0/1 dev ${tun_name} +} + +function usage(){ + echo "Usage: + route add add tun routes to system route table + route del delete routes from system route table + route help display all usages of the shell script" +} + +# START MAIN-OPTIONS +case $1 in + add) do_route add $2;; + del) do_route delete $2;; + *) usage ;; +esac +# END MAIN-OPTIONS diff --git a/netstack-smoltcp/scripts/route-macos.sh b/netstack-smoltcp/scripts/route-macos.sh new file mode 100644 index 0000000..786f47d --- /dev/null +++ b/netstack-smoltcp/scripts/route-macos.sh @@ -0,0 +1,36 @@ +#!/bin/bash +#__author__: cavivie + +DEFAULT_TUN_ADDR="10.10.10.2/24" +DEFAULT_TUN_DEST="10.10.10.1" + +function do_route() { + local route_op="${1}" + local tun_addr="${2:-$DEFAULT_TUN_ADDR}" + local tun_dest="${3:-$DEFAULT_TUN_DEST}" + sudo route ${route_op} -net 1.0.0.0/8 ${tun_dest} + sudo route ${route_op} -net 2.0.0.0/7 ${tun_dest} + sudo route ${route_op} -net 4.0.0.0/6 ${tun_dest} + sudo route ${route_op} -net 8.0.0.0/5 ${tun_dest} + sudo route ${route_op} -net 16.0.0.0/4 ${tun_dest} + sudo route ${route_op} -net 32.0.0.0/3 ${tun_dest} + sudo route ${route_op} -net 64.0.0.0/2 ${tun_dest} + sudo route ${route_op} -net 128.0.0.0/1 ${tun_dest} + # tun2 do like this automatically + sudo route ${route_op} -net ${tun_addr} ${tun_dest} +} + +function usage(){ + echo "Usage: + route add add tun routes to system route table + route del delete routes from system route table + route help display all usages of the shell script" +} + +# START MAIN-OPTIONS +case $1 in + add) do_route add $2 $3;; + del) do_route delete $2 $3;; + *) usage ;; +esac +# END MAIN-OPTIONS diff --git a/netstack-smoltcp/scripts/route-windows.ps1 b/netstack-smoltcp/scripts/route-windows.ps1 new file mode 100644 index 0000000..818221c --- /dev/null +++ b/netstack-smoltcp/scripts/route-windows.ps1 @@ -0,0 +1,30 @@ +#__author__: cavivie + +param( + [string]$Cmd = "help", + [string]$TunName = "utun8", + [string]$TunGateway = "10.10.10.1" +) + +$ErrorActionPreference = "Stop" + +# START MAIN-OPTIONS +switch ($Cmd) { + "add" { + # tun2 do like this automatically + New-NetRoute -DestinationPrefix "0.0.0.0/1" -InterfaceAlias $TunName -NextHop "$TunGateway" + New-NetRoute -DestinationPrefix "128.0.0.0/1" -InterfaceAlias $TunName -NextHop "$TunGateway" + } + "del" { + # tun2 do like this automatically + Get-NetRoute -DestinationPrefix "0.0.0.0/1" -InterfaceAlias $TunName | Remove-NetRoute + Get-NetRoute -DestinationPrefix "128.0.0.0/1" -InterfaceAlias $TunName | Remove-NetRoute + } + default { + Write-Host "Usage: + route add add tun routes to system route table + route del delete routes from system route table + route help display all usages of the shell script" + } +} +# END MAIN-OPTIONS diff --git a/netstack-smoltcp/src/device.rs b/netstack-smoltcp/src/device.rs new file mode 100644 index 0000000..e9cabbf --- /dev/null +++ b/netstack-smoltcp/src/device.rs @@ -0,0 +1,101 @@ +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; + +use smoltcp::{ + phy::{Device, DeviceCapabilities, Medium, RxToken, TxToken}, + time::Instant, +}; +use tokio::sync::mpsc::{unbounded_channel, Permit, Sender, UnboundedReceiver, UnboundedSender}; + +use crate::packet::AnyIpPktFrame; + +pub(super) struct VirtualDevice { + in_buf_avail: Arc, + in_buf: UnboundedReceiver>, + out_buf: Sender, + mtu: usize, +} + +impl VirtualDevice { + pub(super) fn new( + iface_egress_tx: Sender, + mtu: usize, + ) -> (Self, UnboundedSender>, Arc) { + let iface_ingress_tx_avail = Arc::new(AtomicBool::new(false)); + let (iface_ingress_tx, iface_ingress_rx) = unbounded_channel(); + ( + Self { + in_buf_avail: iface_ingress_tx_avail.clone(), + in_buf: iface_ingress_rx, + out_buf: iface_egress_tx, + mtu, + }, + iface_ingress_tx, + iface_ingress_tx_avail, + ) + } +} + +impl Device for VirtualDevice { + type RxToken<'a> = VirtualRxToken; + type TxToken<'a> = VirtualTxToken<'a>; + + fn receive(&mut self, _timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { + let Ok(buffer) = self.in_buf.try_recv() else { + self.in_buf_avail.store(false, Ordering::Release); + return None; + }; + + let Ok(permit) = self.out_buf.try_reserve() else { + self.in_buf_avail.store(false, Ordering::Release); + return None; + }; + + Some((Self::RxToken { buffer }, Self::TxToken { permit })) + } + + fn transmit(&mut self, _timestamp: Instant) -> Option> { + match self.out_buf.try_reserve() { + Ok(permit) => Some(Self::TxToken { permit }), + Err(_) => None, + } + } + + fn capabilities(&self) -> DeviceCapabilities { + let mut capabilities = DeviceCapabilities::default(); + capabilities.medium = Medium::Ip; + capabilities.max_transmission_unit = self.mtu; + capabilities + } +} + +pub(super) struct VirtualRxToken { + buffer: Vec, +} + +impl RxToken for VirtualRxToken { + fn consume(self, f: F) -> R + where + F: FnOnce(&[u8]) -> R, + { + f(&self.buffer[..]) + } +} + +pub(super) struct VirtualTxToken<'a> { + permit: Permit<'a, Vec>, +} + +impl<'a> TxToken for VirtualTxToken<'a> { + fn consume(self, len: usize, f: F) -> R + where + F: FnOnce(&mut [u8]) -> R, + { + let mut buffer = vec![0u8; len]; + let result = f(&mut buffer); + self.permit.send(buffer); + result + } +} diff --git a/netstack-smoltcp/src/filter.rs b/netstack-smoltcp/src/filter.rs new file mode 100644 index 0000000..daced19 --- /dev/null +++ b/netstack-smoltcp/src/filter.rs @@ -0,0 +1,56 @@ +use std::net::IpAddr; + +pub type IpFilter<'a> = Box bool + Send + Sync + 'a>; + +pub struct IpFilters<'a> { + filters: Vec>, +} + +impl<'a> Default for IpFilters<'a> { + fn default() -> Self { + Self::new() + } +} + +impl<'a> IpFilters<'a> { + pub fn new() -> Self { + Self { + filters: Default::default(), + } + } + + pub fn with_non_broadcast() -> Self { + macro_rules! non_broadcast { + ($addr:ident) => { + match $addr { + IpAddr::V4(a) => !(a.is_broadcast() || a.is_multicast() || a.is_unspecified()), + IpAddr::V6(a) => !(a.is_multicast() || a.is_unspecified()), + } + }; + } + Self { + filters: vec![Box::new(|src, dst| { + non_broadcast!(src) && non_broadcast!(dst) + })], + } + } + + pub fn add(&mut self, filter: IpFilter<'a>) { + self.filters.push(filter); + } + + pub fn add_fn(&mut self, filter: F) + where + F: Fn(&IpAddr, &IpAddr) -> bool + Send + Sync + 'a, + { + self.filters.push(Box::new(filter)); + } + + pub fn add_all>>(&mut self, filters: I) { + self.filters.extend(filters); + } + + pub fn is_allowed(&self, src: &IpAddr, dst: &IpAddr) -> bool { + self.filters.iter().all(|filter| filter(src, dst)) + } +} diff --git a/netstack-smoltcp/src/lib.rs b/netstack-smoltcp/src/lib.rs new file mode 100644 index 0000000..ab72413 --- /dev/null +++ b/netstack-smoltcp/src/lib.rs @@ -0,0 +1,22 @@ +mod device; + +mod runner; +pub use runner::Runner; + +mod packet; +pub use packet::AnyIpPktFrame; + +mod filter; +pub use filter::{IpFilter, IpFilters}; + +pub mod udp; +pub use udp::UdpSocket; + +pub mod tcp; +pub use tcp::{TcpListener, TcpStream}; + +pub mod stack; +pub use stack::{Stack, StackBuilder}; + +/// Re-export +pub use smoltcp; diff --git a/netstack-smoltcp/src/packet.rs b/netstack-smoltcp/src/packet.rs new file mode 100644 index 0000000..a12521b --- /dev/null +++ b/netstack-smoltcp/src/packet.rs @@ -0,0 +1,53 @@ +use std::net::IpAddr; + +use smoltcp::wire::{IpProtocol, IpVersion, Ipv4Packet, Ipv6Packet}; + +pub type AnyIpPktFrame = Vec; + +#[derive(Debug)] +pub(super) enum IpPacket> { + Ipv4(Ipv4Packet), + Ipv6(Ipv6Packet), +} + +impl + Copy> IpPacket { + pub fn new_checked(packet: T) -> smoltcp::wire::Result> { + let buffer = packet.as_ref(); + match IpVersion::of_packet(buffer)? { + IpVersion::Ipv4 => Ok(IpPacket::Ipv4(Ipv4Packet::new_checked(packet)?)), + IpVersion::Ipv6 => Ok(IpPacket::Ipv6(Ipv6Packet::new_checked(packet)?)), + } + } + + pub fn src_addr(&self) -> IpAddr { + match *self { + IpPacket::Ipv4(ref packet) => IpAddr::from(packet.src_addr()), + IpPacket::Ipv6(ref packet) => IpAddr::from(packet.src_addr()), + } + } + + pub fn dst_addr(&self) -> IpAddr { + match *self { + IpPacket::Ipv4(ref packet) => IpAddr::from(packet.dst_addr()), + IpPacket::Ipv6(ref packet) => IpAddr::from(packet.dst_addr()), + } + } + + pub fn protocol(&self) -> IpProtocol { + match *self { + IpPacket::Ipv4(ref packet) => packet.next_header(), + IpPacket::Ipv6(ref packet) => packet.next_header(), + } + } +} + +impl<'a, T: AsRef<[u8]> + ?Sized> IpPacket<&'a T> { + /// Return a pointer to the payload. + #[inline] + pub fn payload(&self) -> &'a [u8] { + match *self { + IpPacket::Ipv4(ref packet) => packet.payload(), + IpPacket::Ipv6(ref packet) => packet.payload(), + } + } +} diff --git a/netstack-smoltcp/src/runner.rs b/netstack-smoltcp/src/runner.rs new file mode 100644 index 0000000..63b9675 --- /dev/null +++ b/netstack-smoltcp/src/runner.rs @@ -0,0 +1,41 @@ +use std::{ + future::{Future, IntoFuture}, + pin::Pin, + task::{Context, Poll}, +}; + +/// BoxFuture acts the same as the [BoxFuture in crate futures utils], +/// which is an owned dynamically typed Future for use in cases where you +/// can’t statically type your result or need to add some indirection. +/// But the difference of this structure is that it will conditionally +/// implement Send according to the properties of type T, which does not +/// require two sets of API interfaces in single-threaded and multi-threaded. +/// +/// [BoxFuture in crate futures utils]: https://docs.rs/futures-util/latest/futures_util/future/type.BoxFuture.html +pub struct BoxFuture<'a, T>(Pin + 'a>>); + +impl<'a, T> BoxFuture<'a, T> { + pub fn new(f: F) -> BoxFuture<'a, T> + where + F: IntoFuture + 'a, + { + BoxFuture(Box::pin(f.into_future())) + } + + #[allow(unused)] + pub fn wrap(f: Pin + 'a>>) -> BoxFuture<'a, T> { + BoxFuture(f) + } +} + +unsafe impl Send for BoxFuture<'_, T> {} + +impl Future for BoxFuture<'_, T> { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll { + self.0.as_mut().poll(context) + } +} + +pub type Runner = BoxFuture<'static, std::io::Result<()>>; diff --git a/netstack-smoltcp/src/stack.rs b/netstack-smoltcp/src/stack.rs new file mode 100644 index 0000000..c30470d --- /dev/null +++ b/netstack-smoltcp/src/stack.rs @@ -0,0 +1,279 @@ +use std::{ + net::IpAddr, + pin::Pin, + task::{ready, Context, Poll}, +}; + +use futures::{Sink, Stream}; +use smoltcp::wire::IpProtocol; +use tokio::sync::mpsc::{channel, Receiver}; +use tokio_util::sync::PollSender; +use tracing::{debug, trace}; + +use crate::{ + filter::{IpFilter, IpFilters}, + packet::{AnyIpPktFrame, IpPacket}, + runner::Runner, + tcp::TcpListener, + udp::UdpSocket, +}; + +pub struct StackBuilder { + enable_udp: bool, + enable_tcp: bool, + enable_icmp: bool, + stack_buffer_size: usize, + udp_buffer_size: usize, + tcp_buffer_size: usize, + mtu: usize, + ip_filters: IpFilters<'static>, +} + +impl Default for StackBuilder { + fn default() -> Self { + Self { + enable_udp: false, + enable_tcp: false, + enable_icmp: false, + stack_buffer_size: 1024, + udp_buffer_size: 512, + tcp_buffer_size: 512, + mtu: 1504, // 1500 for Ethernet + 4 for VLAN + ip_filters: IpFilters::with_non_broadcast(), + } + } +} + +#[allow(unused)] +impl StackBuilder { + pub fn enable_udp(mut self, enable: bool) -> Self { + self.enable_udp = enable; + self + } + + pub fn enable_tcp(mut self, enable: bool) -> Self { + self.enable_tcp = enable; + self + } + + pub fn enable_icmp(mut self, enable: bool) -> Self { + self.enable_icmp = enable; + self + } + + pub fn stack_buffer_size(mut self, size: usize) -> Self { + self.stack_buffer_size = size; + self + } + + pub fn udp_buffer_size(mut self, size: usize) -> Self { + self.udp_buffer_size = size; + self + } + + pub fn tcp_buffer_size(mut self, size: usize) -> Self { + self.tcp_buffer_size = size; + self + } + + pub fn set_ip_filters(mut self, filters: IpFilters<'static>) -> Self { + self.ip_filters = filters; + self + } + + pub fn add_ip_filter(mut self, filter: IpFilter<'static>) -> Self { + self.ip_filters.add(filter); + self + } + + pub fn add_ip_filter_fn(mut self, filter: F) -> Self + where + F: Fn(&IpAddr, &IpAddr) -> bool + Send + Sync + 'static, + { + self.ip_filters.add_fn(filter); + self + } + + pub fn mtu(mut self, mtu: usize) -> Self { + self.mtu = mtu; + self + } + + #[allow(clippy::type_complexity)] + pub fn build( + self, + ) -> std::io::Result<( + Stack, + Option, + Option, + Option, + )> { + let (stack_tx, stack_rx) = channel(self.stack_buffer_size); + + let (udp_tx, udp_rx) = if self.enable_udp { + let (udp_tx, udp_rx) = channel(self.udp_buffer_size); + (Some(PollSender::new(udp_tx)), Some(udp_rx)) + } else { + (None, None) + }; + + let (tcp_tx, tcp_rx) = if self.enable_tcp { + let (tcp_tx, tcp_rx) = channel(self.tcp_buffer_size); + (Some(PollSender::new(tcp_tx)), Some(tcp_rx)) + } else { + (None, None) + }; + + // ICMP is handled by TCP's Interface. + // smoltcp's interface will always send replies to EchoRequest + if self.enable_icmp && !self.enable_tcp { + use std::io::{Error, ErrorKind::InvalidInput}; + return Err(Error::new(InvalidInput, "ICMP requires TCP")); + } + let icmp_tx = if self.enable_icmp { + tcp_tx.clone() + } else { + None + }; + + let udp_socket = udp_rx.map(|udp_rx| UdpSocket::new(udp_rx, stack_tx.clone())); + + let (tcp_runner, tcp_listener) = if let Some(tcp_rx) = tcp_rx { + let (tcp_runner, tcp_listener) = TcpListener::new(tcp_rx, stack_tx, self.mtu)?; + (Some(tcp_runner), Some(tcp_listener)) + } else { + (None, None) + }; + + let stack = Stack { + ip_filters: self.ip_filters, + stack_rx, + sink_buf: None, + udp_tx, + tcp_tx, + icmp_tx, + }; + + Ok((stack, tcp_runner, udp_socket, tcp_listener)) + } +} + +pub struct Stack { + ip_filters: IpFilters<'static>, + sink_buf: Option<(AnyIpPktFrame, IpProtocol)>, + udp_tx: Option>, + tcp_tx: Option>, + icmp_tx: Option>, + stack_rx: Receiver, +} + +impl Stack { + fn poll_send(&mut self, cx: &mut Context<'_>) -> Poll> { + let (item, proto) = match self.sink_buf.take() { + Some(val) => val, + None => return Poll::Ready(Ok(())), + }; + + let tx = match proto { + IpProtocol::Tcp => self.tcp_tx.as_mut(), + IpProtocol::Udp => self.udp_tx.as_mut(), + IpProtocol::Icmp | IpProtocol::Icmpv6 => self.icmp_tx.as_mut(), + _ => unreachable!(), + }; + + let Some(tx) = tx else { + return Poll::Ready(Ok(())); + }; + + match tx.poll_reserve(cx) { + Poll::Pending => { + self.sink_buf = Some((item, proto)); + Poll::Pending + } + Poll::Ready(Err(_)) => Poll::Ready(Err(channel_closed_err("channel is closed"))), + Poll::Ready(Ok(_)) => match tx.send_item(item) { + Ok(()) => Poll::Ready(Ok(())), + Err(_) => Poll::Ready(Err(channel_closed_err("channel is closed"))), + }, + } + } +} + +// Recv from stack. +impl Stream for Stack { + type Item = std::io::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.stack_rx.poll_recv(cx) { + Poll::Ready(Some(pkt)) => Poll::Ready(Some(Ok(pkt))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +// Send to stack. +impl Sink for Stack { + type Error = std::io::Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // If a buffered item exists, try to flush it first. This also properly + // registers the waker via poll_reserve so we get woken when the channel + // has capacity. Without this, returning Pending here with _cx unused + // means the task never gets rescheduled. + if self.sink_buf.is_some() { + ready!(self.poll_send(cx))?; + } + Poll::Ready(Ok(())) + } + + fn start_send(mut self: Pin<&mut Self>, item: AnyIpPktFrame) -> Result<(), Self::Error> { + if item.is_empty() { + return Ok(()); + } + + use std::io::{Error, ErrorKind::InvalidInput}; + let packet = IpPacket::new_checked(item.as_slice()) + .map_err(|err| Error::new(InvalidInput, format!("invalid IP packet: {err}")))?; + + let src_ip = packet.src_addr(); + let dst_ip = packet.dst_addr(); + + let addr_allowed = self.ip_filters.is_allowed(&src_ip, &dst_ip); + if !addr_allowed { + trace!("IP packet {src_ip} -> {dst_ip} (allowed? {addr_allowed}) throwing away",); + return Ok(()); + } + + let protocol = packet.protocol(); + if matches!( + protocol, + IpProtocol::Tcp | IpProtocol::Udp | IpProtocol::Icmp | IpProtocol::Icmpv6 + ) { + self.sink_buf.replace((item, protocol)); + } else { + debug!("tun IP packet ignored (protocol: {:?})", protocol); + } + + Ok(()) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_send(cx) + } + + fn poll_close( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + self.stack_rx.close(); + Poll::Ready(Ok(())) + } +} + +fn channel_closed_err(err: E) -> std::io::Error +where + E: Into>, +{ + std::io::Error::new(std::io::ErrorKind::BrokenPipe, err) +} diff --git a/netstack-smoltcp/src/tcp.rs b/netstack-smoltcp/src/tcp.rs new file mode 100644 index 0000000..b03a306 --- /dev/null +++ b/netstack-smoltcp/src/tcp.rs @@ -0,0 +1,561 @@ +use std::{ + collections::HashMap, + net::SocketAddr, + pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + task::{Context, Poll, Waker}, +}; + +use futures::Stream; +use smoltcp::{ + iface::{Config as InterfaceConfig, Interface, SocketHandle, SocketSet}, + phy::Device, + socket::tcp::{Socket as TcpSocket, SocketBuffer as TcpSocketBuffer, State as TcpState}, + storage::RingBuffer, + time::{Duration, Instant}, + wire::{HardwareAddress, IpAddress, IpCidr, IpProtocol, Ipv4Address, Ipv6Address, TcpPacket}, +}; +use spin::Mutex as SpinMutex; +use tokio::{ + io::{AsyncRead, AsyncWrite, ReadBuf}, + sync::{ + mpsc::{unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender}, + Notify, + }, +}; +use tracing::{error, trace}; + +use crate::{ + device::VirtualDevice, + packet::{AnyIpPktFrame, IpPacket}, + Runner, +}; + +// NOTE: Default buffer could contain 20 AEAD packets +const DEFAULT_TCP_SEND_BUFFER_SIZE: u32 = 0x3FFF * 20; +const DEFAULT_TCP_RECV_BUFFER_SIZE: u32 = 0x3FFF * 20; + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +enum TcpSocketState { + Normal, + Close, + Closing, + Closed, +} + +struct TcpSocketControl { + send_buffer: RingBuffer<'static, u8>, + send_waker: Option, + recv_buffer: RingBuffer<'static, u8>, + recv_waker: Option, + recv_state: TcpSocketState, + send_state: TcpSocketState, +} + +struct TcpSocketCreation { + control: SharedControl, + socket: TcpSocket<'static>, +} + +type SharedNotify = Arc; +type SharedControl = Arc>; + +struct TcpListenerRunner; + +impl TcpListenerRunner { + fn create( + device: VirtualDevice, + iface: Interface, + iface_ingress_tx: UnboundedSender>, + iface_ingress_tx_avail: Arc, + tcp_rx: Receiver, + stream_tx: UnboundedSender, + sockets: HashMap, + ) -> Runner { + Runner::new(async move { + let notify = Arc::new(Notify::new()); + let (socket_tx, socket_rx) = unbounded_channel::(); + let res = tokio::select! { + v = Self::handle_packet(notify.clone(), iface_ingress_tx, iface_ingress_tx_avail.clone(), tcp_rx, stream_tx, socket_tx) => v, + v = Self::handle_socket(notify, device, iface, iface_ingress_tx_avail, sockets, socket_rx) => v, + }; + res?; + trace!("VirtDevice::poll thread exited"); + Ok(()) + }) + } + + async fn handle_packet( + notify: SharedNotify, + iface_ingress_tx: UnboundedSender>, + iface_ingress_tx_avail: Arc, + mut tcp_rx: Receiver, + stream_tx: UnboundedSender, + socket_tx: UnboundedSender, + ) -> std::io::Result<()> { + while let Some(frame) = tcp_rx.recv().await { + let packet = match IpPacket::new_checked(frame.as_slice()) { + Ok(p) => p, + Err(err) => { + error!("invalid TCP IP packet: {:?}", err,); + continue; + } + }; + + // Specially handle icmp packet by TCP interface. + if matches!(packet.protocol(), IpProtocol::Icmp | IpProtocol::Icmpv6) { + iface_ingress_tx + .send(frame) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))?; + iface_ingress_tx_avail.store(true, Ordering::Release); + notify.notify_one(); + continue; + } + + let src_ip = packet.src_addr(); + let dst_ip = packet.dst_addr(); + let payload = packet.payload(); + + let packet = match TcpPacket::new_checked(payload) { + Ok(p) => p, + Err(err) => { + error!("invalid TCP err: {err}, src_ip: {src_ip}, dst_ip: {dst_ip}, payload: {payload:?}"); + continue; + } + }; + let src_port = packet.src_port(); + let dst_port = packet.dst_port(); + + let src_addr = SocketAddr::new(src_ip, src_port); + let dst_addr = SocketAddr::new(dst_ip, dst_port); + + // TCP first handshake packet, create a new Connection + if packet.syn() && !packet.ack() { + let mut socket = TcpSocket::new( + TcpSocketBuffer::new(vec![0u8; DEFAULT_TCP_RECV_BUFFER_SIZE as usize]), + TcpSocketBuffer::new(vec![0u8; DEFAULT_TCP_SEND_BUFFER_SIZE as usize]), + ); + socket.set_keep_alive(Some(Duration::from_secs(28))); + // FIXME: It should follow system's setting. 7200 is Linux's default. + socket.set_timeout(Some(Duration::from_secs(7200))); + // NO ACK delay + // socket.set_ack_delay(None); + + if let Err(err) = socket.listen(dst_addr) { + error!("listen error: {:?}", err); + continue; + } + + trace!("created TCP connection for {} <-> {}", src_addr, dst_addr); + + let control = Arc::new(SpinMutex::new(TcpSocketControl { + send_buffer: RingBuffer::new(vec![0u8; DEFAULT_TCP_SEND_BUFFER_SIZE as usize]), + send_waker: None, + recv_buffer: RingBuffer::new(vec![0u8; DEFAULT_TCP_RECV_BUFFER_SIZE as usize]), + recv_waker: None, + recv_state: TcpSocketState::Normal, + send_state: TcpSocketState::Normal, + })); + + stream_tx + .send(TcpStream { + src_addr, + dst_addr, + notify: notify.clone(), + control: control.clone(), + }) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))?; + socket_tx + .send(TcpSocketCreation { control, socket }) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))?; + } + + // Pipeline tcp stream packet + iface_ingress_tx + .send(frame) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))?; + iface_ingress_tx_avail.store(true, Ordering::Release); + notify.notify_one(); + } + Ok(()) + } + + async fn handle_socket( + notify: SharedNotify, + mut device: VirtualDevice, + mut iface: Interface, + iface_ingress_tx_avail: Arc, + mut sockets: HashMap, + mut socket_rx: UnboundedReceiver, + ) -> std::io::Result<()> { + let mut socket_set = SocketSet::new(vec![]); + loop { + while let Ok(TcpSocketCreation { control, socket }) = socket_rx.try_recv() { + let handle = socket_set.add(socket); + sockets.insert(handle, control); + } + + let before_poll = Instant::now(); + let updated_sockets = iface.poll(before_poll, &mut device, &mut socket_set); + if matches!( + updated_sockets, + smoltcp::iface::PollResult::SocketStateChanged + ) { + trace!("VirtDevice::poll costed {}", Instant::now() - before_poll); + } + + // Check all the sockets' status + let mut sockets_to_remove = Vec::new(); + + for (socket_handle, control) in sockets.iter() { + let socket_handle = *socket_handle; + let socket = socket_set.get_mut::(socket_handle); + let mut control = control.lock(); + + // Remove the socket only when it is in the closed state. + if socket.state() == TcpState::Closed { + sockets_to_remove.push(socket_handle); + + control.send_state = TcpSocketState::Closed; + control.recv_state = TcpSocketState::Closed; + + if let Some(waker) = control.send_waker.take() { + waker.wake(); + } + if let Some(waker) = control.recv_waker.take() { + waker.wake(); + } + + trace!("closed TCP connection"); + continue; + } + + // SHUT_WR — only close once the send_buffer has been fully + // drained into the smoltcp socket. Closing earlier transitions + // the socket to FIN_WAIT_1, making can_send() return false, so + // the send loop below never runs and the remaining data is lost. + if matches!(control.send_state, TcpSocketState::Close) + && control.send_buffer.is_empty() + { + trace!("closing TCP Write Half, {:?}", socket.state()); + + socket.close(); + control.send_state = TcpSocketState::Closing; + } + + // Check if readable + let mut wake_receiver = false; + while socket.can_recv() && !control.recv_buffer.is_full() { + let result = socket.recv(|buffer| { + let n = control.recv_buffer.enqueue_slice(buffer); + (n, ()) + }); + + match result { + Ok(..) => wake_receiver = true, + Err(err) => { + error!("socket recv error: {:?}, {:?}", err, socket.state()); + + // Don't know why. Abort the connection. + socket.abort(); + + if matches!(control.recv_state, TcpSocketState::Normal) { + control.recv_state = TcpSocketState::Closed; + } + wake_receiver = true; + + // The socket will be recycled in the next poll. + break; + } + } + } + + // If socket is not in ESTABLISH, FIN-WAIT-1, FIN-WAIT-2, + // the local client have closed our receiver. + let states = [ + TcpState::Listen, + TcpState::SynReceived, + TcpState::Established, + TcpState::FinWait1, + TcpState::FinWait2, + ]; + if matches!(control.recv_state, TcpSocketState::Normal) + && !socket.may_recv() + && !states.contains(&socket.state()) + { + trace!("closed TCP Read Half, {:?}", socket.state()); + + // Let TcpStream::poll_read returns EOF. + control.recv_state = TcpSocketState::Closed; + wake_receiver = true; + } + + if wake_receiver && control.recv_waker.is_some() { + if let Some(waker) = control.recv_waker.take() { + waker.wake(); + } + } + + // Check if writable + let mut wake_sender = false; + while socket.can_send() && !control.send_buffer.is_empty() { + let result = socket.send(|buffer| { + let n = control.send_buffer.dequeue_slice(buffer); + (n, ()) + }); + + match result { + Ok(..) => wake_sender = true, + Err(err) => { + error!("socket send error: {:?}, {:?}", err, socket.state()); + + // Don't know why. Abort the connection. + socket.abort(); + + if matches!(control.send_state, TcpSocketState::Normal) { + control.send_state = TcpSocketState::Closed; + } + wake_sender = true; + + // The socket will be recycled in the next poll. + break; + } + } + } + + if wake_sender && control.send_waker.is_some() { + if let Some(waker) = control.send_waker.take() { + waker.wake(); + } + } + } + + for socket_handle in sockets_to_remove { + sockets.remove(&socket_handle); + socket_set.remove(socket_handle); + } + + if !iface_ingress_tx_avail.load(Ordering::Acquire) { + let next_duration = iface + .poll_delay(before_poll, &socket_set) + .unwrap_or(Duration::from_millis(5)); + if next_duration != Duration::ZERO { + let _ = tokio::time::timeout( + tokio::time::Duration::from(next_duration), + notify.notified(), + ) + .await; + } + } + } + } +} + +pub struct TcpListener { + stream_rx: UnboundedReceiver, +} + +impl TcpListener { + pub(super) fn new( + tcp_rx: Receiver, + stack_tx: Sender, + mtu: usize, + ) -> std::io::Result<(Runner, Self)> { + let (mut device, iface_ingress_tx, iface_ingress_tx_avail) = + VirtualDevice::new(stack_tx, mtu); + let iface = Self::create_interface(&mut device)?; + + let (stream_tx, stream_rx) = unbounded_channel(); + + let runner = TcpListenerRunner::create( + device, + iface, + iface_ingress_tx, + iface_ingress_tx_avail, + tcp_rx, + stream_tx, + HashMap::new(), + ); + + Ok((runner, Self { stream_rx })) + } + + fn create_interface(device: &mut D) -> std::io::Result + where + D: Device + ?Sized, + { + let mut iface_config = InterfaceConfig::new(HardwareAddress::Ip); + iface_config.random_seed = rand::random(); + let mut iface = Interface::new(iface_config, device, Instant::now()); + iface.update_ip_addrs(|ip_addrs| { + ip_addrs + .push(IpCidr::new(IpAddress::v4(0, 0, 0, 1), 0)) + .expect("iface IPv4"); + ip_addrs + .push(IpCidr::new(IpAddress::v6(0, 0, 0, 0, 0, 0, 0, 1), 0)) + .expect("iface IPv6"); + }); + iface + .routes_mut() + .add_default_ipv4_route(Ipv4Address::new(0, 0, 0, 1)) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::AddrNotAvailable, e))?; + iface + .routes_mut() + .add_default_ipv6_route(Ipv6Address::new(0, 0, 0, 0, 0, 0, 0, 1)) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::AddrNotAvailable, e))?; + iface.set_any_ip(true); + Ok(iface) + } +} + +impl Stream for TcpListener { + type Item = (TcpStream, SocketAddr, SocketAddr); + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.stream_rx.poll_recv(cx).map(|stream| { + stream.map(|stream| { + let local_addr = *stream.local_addr(); + let remote_addr: SocketAddr = *stream.remote_addr(); + (stream, local_addr, remote_addr) + }) + }) + } +} + +pub struct TcpStream { + src_addr: SocketAddr, + dst_addr: SocketAddr, + notify: SharedNotify, + control: SharedControl, +} + +impl Drop for TcpStream { + fn drop(&mut self) { + let mut control = self.control.lock(); + + if matches!(control.recv_state, TcpSocketState::Normal) { + control.recv_state = TcpSocketState::Close; + } + + if matches!(control.send_state, TcpSocketState::Normal) { + control.send_state = TcpSocketState::Close; + } + + self.notify.notify_one(); + } +} + +impl TcpStream { + pub fn local_addr(&self) -> &SocketAddr { + &self.src_addr + } + + pub fn remote_addr(&self) -> &SocketAddr { + &self.dst_addr + } +} + +impl AsyncRead for TcpStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let mut control = self.control.lock(); + + // Read from buffer + if control.recv_buffer.is_empty() { + // If socket is already closed / half closed, just return EOF directly. + if matches!(control.recv_state, TcpSocketState::Closed) { + return Ok(()).into(); + } + + // Nothing could be read. Wait for notify. + if let Some(old_waker) = control.recv_waker.replace(cx.waker().clone()) { + if !old_waker.will_wake(cx.waker()) { + old_waker.wake(); + } + } + + return Poll::Pending; + } + + let recv_buf = buf.initialize_unfilled(); + let n = control.recv_buffer.dequeue_slice(recv_buf); + buf.advance(n); + + if n > 0 { + self.notify.notify_one(); + } + + Ok(()).into() + } +} + +impl AsyncWrite for TcpStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let mut control = self.control.lock(); + + // If state == Close | Closing | Closed, the TCP stream WR half is closed. + if !matches!(control.send_state, TcpSocketState::Normal) { + return Err(std::io::ErrorKind::BrokenPipe.into()).into(); + } + + // Write to buffer + + if control.send_buffer.is_full() { + if let Some(old_waker) = control.send_waker.replace(cx.waker().clone()) { + if !old_waker.will_wake(cx.waker()) { + old_waker.wake(); + } + } + + return Poll::Pending; + } + + let n = control.send_buffer.enqueue_slice(buf); + + if n > 0 { + self.notify.notify_one(); + } + + Ok(n).into() + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Ok(()).into() + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut control = self.control.lock(); + + if matches!(control.send_state, TcpSocketState::Closed) { + return Ok(()).into(); + } + + // SHUT_WR + if matches!(control.send_state, TcpSocketState::Normal) { + control.send_state = TcpSocketState::Close; + } + + if let Some(old_waker) = control.send_waker.replace(cx.waker().clone()) { + if !old_waker.will_wake(cx.waker()) { + old_waker.wake(); + } + } + + self.notify.notify_one(); + + Poll::Pending + } +} diff --git a/netstack-smoltcp/src/udp.rs b/netstack-smoltcp/src/udp.rs new file mode 100644 index 0000000..ac896a2 --- /dev/null +++ b/netstack-smoltcp/src/udp.rs @@ -0,0 +1,155 @@ +use std::{ + net::SocketAddr, + pin::Pin, + task::{Context, Poll}, +}; + +use etherparse::PacketBuilder; +use futures::{ready, Sink, SinkExt, Stream}; +use smoltcp::wire::UdpPacket; +use tokio::sync::mpsc::{Receiver, Sender}; +use tokio_util::sync::PollSender; +use tracing::{error, trace}; + +use crate::packet::{AnyIpPktFrame, IpPacket}; + +pub type UdpMsg = ( + Vec, /* payload */ + SocketAddr, /* local */ + SocketAddr, /* remote */ +); + +pub struct UdpSocket { + udp_rx: Receiver, + stack_tx: PollSender, +} + +impl UdpSocket { + pub(super) fn new(udp_rx: Receiver, stack_tx: Sender) -> Self { + Self { + udp_rx, + stack_tx: PollSender::new(stack_tx), + } + } + + pub fn split(self) -> (ReadHalf, WriteHalf) { + ( + ReadHalf { + udp_rx: self.udp_rx, + }, + WriteHalf { + stack_tx: self.stack_tx, + }, + ) + } +} + +pub struct ReadHalf { + udp_rx: Receiver, +} + +pub struct WriteHalf { + stack_tx: PollSender, +} + +impl Stream for ReadHalf { + type Item = UdpMsg; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + loop { + match ready!(self.udp_rx.poll_recv(cx)) { + Some(frame) => { + let packet = match IpPacket::new_checked(frame.as_slice()) { + Ok(p) => p, + Err(err) => { + error!("invalid IP packet: {}", err); + continue; + } + }; + + let src_ip = packet.src_addr(); + let dst_ip = packet.dst_addr(); + let payload = packet.payload(); + + let packet = match UdpPacket::new_checked(payload) { + Ok(p) => p, + Err(err) => { + error!("invalid err: {err}, src_ip: {src_ip}, dst_ip: {dst_ip}, payload: {payload:?}"); + continue; + } + }; + let src_port = packet.src_port(); + let dst_port = packet.dst_port(); + + let src_addr = SocketAddr::new(src_ip, src_port); + let dst_addr = SocketAddr::new(dst_ip, dst_port); + + trace!("created UDP socket for {} <-> {}", src_addr, dst_addr); + + return Poll::Ready(Some((packet.payload().to_vec(), src_addr, dst_addr))); + } + None => return Poll::Ready(None), + } + } + } +} + +impl Sink for WriteHalf { + type Error = std::io::Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match ready!(self.stack_tx.poll_ready_unpin(cx)) { + Ok(()) => Poll::Ready(Ok(())), + Err(err) => Poll::Ready(Err(std::io::Error::other(err))), + } + } + + fn start_send(mut self: Pin<&mut Self>, item: UdpMsg) -> Result<(), Self::Error> { + use std::io::{Error, ErrorKind::InvalidData}; + let (data, src_addr, dst_addr) = item; + + if data.is_empty() { + return Ok(()); + } + + let builder = match (src_addr, dst_addr) { + (SocketAddr::V4(src), SocketAddr::V4(dst)) => { + PacketBuilder::ipv4(src.ip().octets(), dst.ip().octets(), 20) + .udp(src_addr.port(), dst_addr.port()) + } + (SocketAddr::V6(src), SocketAddr::V6(dst)) => { + PacketBuilder::ipv6(src.ip().octets(), dst.ip().octets(), 20) + .udp(src_addr.port(), dst_addr.port()) + } + _ => { + return Err(Error::new(InvalidData, "src or destination type unmatch")); + } + }; + + let mut ip_packet_writer = Vec::with_capacity(builder.size(data.len())); + builder + .write(&mut ip_packet_writer, &data) + .map_err(|err| Error::other(format!("PacketBuilder::write: {err}")))?; + + match self.stack_tx.start_send_unpin(ip_packet_writer) { + Ok(()) => Ok(()), + Err(err) => Err(Error::other(format!("send error: {err}"))), + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + use std::io::Error; + match ready!(self.stack_tx.poll_flush_unpin(cx)) { + Ok(()) => Poll::Ready(Ok(())), + Err(err) => Poll::Ready(Err(Error::other(format!("flush error: {err}")))), + } + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + use std::io::Error; + match ready!(self.stack_tx.poll_close_unpin(cx)) { + Ok(()) => Poll::Ready(Ok(())), + Err(err) => Poll::Ready(Err(Error::other(format!("close error: {err}")))), + } + } +} diff --git a/netstack-smoltcp/tests/regression.rs b/netstack-smoltcp/tests/regression.rs new file mode 100644 index 0000000..ddd41f6 --- /dev/null +++ b/netstack-smoltcp/tests/regression.rs @@ -0,0 +1,75 @@ +//! Regression tests that reproduce the bugs found in the static analysis. + +use std::time::Duration; + +use etherparse::{IpNumber, Ipv4Header, UdpHeader}; +use futures::SinkExt; +use tokio::time::timeout; + +use netstack_smoltcp::StackBuilder; + +fn make_udp_ipv4( + src_ip: [u8; 4], + src_port: u16, + dst_ip: [u8; 4], + dst_port: u16, + payload: &[u8], +) -> Vec { + let udp_hdr = UdpHeader::with_ipv4_checksum( + src_port, + dst_port, + &Ipv4Header::new( + (UdpHeader::LEN + payload.len()) as u16, + 64, + IpNumber::UDP, + src_ip, + dst_ip, + ) + .unwrap(), + payload, + ) + .unwrap(); + + let ip_hdr = Ipv4Header::new( + (UdpHeader::LEN + payload.len()) as u16, + 64, + IpNumber::UDP, + src_ip, + dst_ip, + ) + .unwrap(); + + let mut buf = Vec::with_capacity(Ipv4Header::MIN_LEN + UdpHeader::LEN + payload.len()); + ip_hdr.write(&mut buf).unwrap(); + udp_hdr.write(&mut buf).unwrap(); + buf.extend_from_slice(payload); + buf +} + +/// before(include) a15e0b72bfc72cb032e67138070da01e325d66f8 +/// sink_buf is used in `Stack` to hold a slot for sending any pkt +/// +/// the original assumption is that the `poll_ready` -> `start_send` -> `poll_flush` +/// are called sequentially so the slot could be reused and will never get blocked. +/// +/// but once the user calls `send_all` on `Stack`, which will not immediate flush the pkt(call `poll_flush`), +/// then `sink_buf` is could be Some(pkt), then it will trigger `Poll::Pending` branch in `Stack::poll_ready`, +/// who did not register the waker correctly, so it will got hanged forever. +#[tokio::test(flavor = "current_thread")] +async fn bug1_poll_ready_waker_registered_via_send_all() { + let (mut stack, _runner, _udp_socket, _tcp) = StackBuilder::default() + .enable_udp(true) + .udp_buffer_size(64) + .stack_buffer_size(64) + .build() + .unwrap(); + + let pkt1 = make_udp_ipv4([1, 2, 3, 4], 1111, [5, 6, 7, 8], 9999, b"first"); + let pkt2 = make_udp_ipv4([1, 2, 3, 4], 1111, [5, 6, 7, 8], 9999, b"second"); + + let mut stream = futures::stream::iter([Ok(pkt1), Ok(pkt2)]); + + let result = timeout(Duration::from_secs(1), stack.send_all(&mut stream)).await; + // should be ok after the fix + assert!(result.is_ok()); +}