From 7792ac148139d8aead7d205baa335dbfaac50b22 Mon Sep 17 00:00:00 2001 From: rustdesk Date: Thu, 12 Jun 2025 21:32:28 +0800 Subject: [PATCH] udp punch and ipv6 punch --- Cargo.lock | 324 ++++++++++++++++++++++++++------- Cargo.toml | 2 + libs/hbb_common | 2 +- src/client.rs | 325 +++++++++++++++++++++++++++++---- src/client/io_loop.rs | 2 +- src/common.rs | 360 ++++++++++++++++++++++++++++++++----- src/kcp_stream.rs | 141 +++++++++++++++ src/lib.rs | 2 + src/port_forward.rs | 2 +- src/rendezvous_mediator.rs | 165 +++++++++++++---- src/server.rs | 2 +- src/server/connection.rs | 2 +- 12 files changed, 1151 insertions(+), 178 deletions(-) create mode 100644 src/kcp_stream.rs diff --git a/Cargo.lock b/Cargo.lock index ac25f30d2..5bd8498e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -39,18 +39,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "ahash" -version = "0.8.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" -dependencies = [ - "cfg-if 1.0.0", - "once_cell", - "version_check", - "zerocopy 0.7.34", -] - [[package]] name = "aho-corasick" version = "1.1.3" @@ -87,12 +75,6 @@ dependencies = [ "alloc-no-stdlib", ] -[[package]] -name = "allocator-api2" -version = "0.2.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" - [[package]] name = "alsa" version = "0.9.0" @@ -100,7 +82,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37fe60779335388a88c01ac6c3be40304d1e349de3ada3b15f7808bb90fa9dce" dependencies = [ "alsa-sys", - "bitflags 2.6.0", + "bitflags 2.9.1", "libc", ] @@ -217,9 +199,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.86" +version = "1.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" +checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" [[package]] name = "arboard" @@ -476,6 +458,17 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "auto_impl" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffdcb70bdbc4d478427380519163274ac86e52916e10f0a8889adf0f96d3fee7" +dependencies = [ + "proc-macro2 1.0.93", + "quote 1.0.36", + "syn 2.0.98", +] + [[package]] name = "autocfg" version = "0.1.8" @@ -582,7 +575,7 @@ version = "0.69.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a00dc851838a2120612785d195287475a3ac45514741da670b735818822129a0" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.1", "cexpr", "clang-sys", "itertools 0.12.1", @@ -596,6 +589,24 @@ dependencies = [ "syn 2.0.98", ] +[[package]] +name = "bindgen" +version = "0.71.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f58bf3d7db68cfbac37cfc485a8d711e87e064c3d0fe0435b92f7a407f9d6b3" +dependencies = [ + "bitflags 2.9.1", + "cexpr", + "clang-sys", + "itertools 0.12.1", + "proc-macro2 1.0.93", + "quote 1.0.36", + "regex", + "rustc-hash 2.1.1", + "shlex", + "syn 2.0.98", +] + [[package]] name = "bit_field" version = "0.10.2" @@ -610,9 +621,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.6.0" +version = "2.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967" dependencies = [ "serde 1.0.203", ] @@ -728,6 +739,16 @@ version = "3.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" +[[package]] +name = "bytecodec" +version = "0.4.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adf4c9d0bbf32eea58d7c0f812058138ee8edaf0f2802b6d03561b504729a325" +dependencies = [ + "byteorder", + "trackable 0.2.24", +] + [[package]] name = "bytemuck" version = "1.21.0" @@ -794,7 +815,7 @@ version = "0.18.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ca26ef0159422fb77631dc9d17b102f253b876fe1586b03b803e63a309b4ee2" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.1", "cairo-sys-rs", "glib 0.18.5", "libc", @@ -983,7 +1004,7 @@ version = "0.1.0" dependencies = [ "cacao", "cc", - "dashmap", + "dashmap 5.5.3", "dirs 5.0.1", "fsevent", "fuser", @@ -1411,6 +1432,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9710d3b3739c2e349eb44fe848ad0b7c8cb1e42bd87ee49371df2f7acaf3e675" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" + [[package]] name = "crc32fast" version = "1.4.2" @@ -1511,6 +1547,20 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "dasp" version = "0.11.0" @@ -2042,7 +2092,7 @@ version = "4.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74351c3392ea1ff6cd2628e0042d268ac2371cb613252ff383b6dfa50d22fa79" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.1", "libc", ] @@ -2207,7 +2257,7 @@ dependencies = [ "is-terminal", "lazy_static", "log", - "nu-ansi-term", + "nu-ansi-term 0.49.0", "regex", "thiserror 1.0.61", ] @@ -2726,7 +2776,7 @@ version = "0.18.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "233daaf6e83ae6a12a52055f568f9d7cf4671dabb78ff9560ab6da230ce00ee5" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.1", "futures-channel", "futures-core", "futures-executor", @@ -3022,7 +3072,7 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" dependencies = [ - "ahash 0.7.8", + "ahash", ] [[package]] @@ -3030,10 +3080,12 @@ name = "hashbrown" version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" -dependencies = [ - "ahash 0.8.11", - "allocator-api2", -] + +[[package]] +name = "hashbrown" +version = "0.15.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5" [[package]] name = "hbb_common" @@ -3568,6 +3620,28 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kcp-sys" +version = "0.1.0" +source = "git+https://github.com/rustdesk-org/kcp-sys#00e8865454615a5c554d899efd8bc6eae812aaf1" +dependencies = [ + "anyhow", + "auto_impl", + "bindgen 0.71.1", + "bitflags 2.9.1", + "bytes", + "cc", + "dashmap 6.1.0", + "parking_lot", + "rand 0.8.5", + "thiserror 2.0.11", + "tokio", + "tokio-util", + "tracing", + "tracing-subscriber", + "zerocopy 0.7.34", +] + [[package]] name = "keepawake" version = "0.4.3" @@ -3598,7 +3672,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b750dcadc39a09dbadd74e118f6dd6598df77fa01df0cfcdc52c28dece74528a" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.1", "serde 1.0.203", "unicode-segmentation", ] @@ -3689,7 +3763,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e310b3a6b5907f99202fcdb4960ff45b93735d7c7d96b760fcff8db2dc0e103d" dependencies = [ "cfg-if 1.0.0", - "windows-targets 0.48.5", + "windows-targets 0.52.5", ] [[package]] @@ -3752,7 +3826,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.1", "libc", ] @@ -4118,7 +4192,7 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2076a31b7010b17a38c01907c45b945e8f11495ee4dd588309718901b1f7a5b7" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.1", "jni-sys", "log", "ndk-sys 0.5.0+25.2.9519653", @@ -4230,7 +4304,7 @@ version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.1", "cfg-if 1.0.0", "cfg_aliases 0.1.1", "libc", @@ -4243,7 +4317,7 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.1", "cfg-if 1.0.0", "cfg_aliases 0.2.1", "libc", @@ -4331,6 +4405,16 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi 0.3.9", +] + [[package]] name = "nu-ansi-term" version = "0.49.0" @@ -4546,7 +4630,7 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4e89ad9e3d7d297152b17d39ed92cd50ca8063a89a9fa569046d41568891eff" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.1", "block2 0.5.1", "libc", "objc2 0.5.2", @@ -4562,7 +4646,7 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "617fbf49e071c178c0b24c080767db52958f716d9eabdf0890523aeae54773ef" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.1", "block2 0.5.1", "objc2 0.5.2", "objc2-foundation", @@ -4601,7 +4685,7 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ee638a5da3799329310ad4cfa62fbf045d5f56e3ef5ba4149e7452dcf89d5a8" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.1", "block2 0.5.1", "libc", "objc2 0.5.2", @@ -4613,7 +4697,7 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd0cba1276f6023976a406a14ffa85e1fdd19df6b0f737b063b95f6c8c7aadd6" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.1", "block2 0.5.1", "objc2 0.5.2", "objc2-foundation", @@ -4625,7 +4709,7 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e42bee7bff906b14b167da2bac5efe6b6a07e6f7c0a21a7308d40c960242dc7a" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.1", "block2 0.5.1", "objc2 0.5.2", "objc2-foundation", @@ -4694,7 +4778,7 @@ version = "0.10.68" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6174bc48f102d208783c2c84bf931bb75927a617866870de8a4ea85597f871f5" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.1", "cfg-if 1.0.0", "foreign-types 0.3.2", "libc", @@ -4802,6 +4886,12 @@ dependencies = [ "serde_json 1.0.118", ] +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "page_size" version = "0.6.0" @@ -5672,7 +5762,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c82cf8cff14456045f55ec4241383baeff27af886adb72ffb2162f99911de0fd" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.1", ] [[package]] @@ -5960,6 +6050,7 @@ dependencies = [ "impersonate_system", "include_dir", "jni", + "kcp-sys", "keepawake", "lazy_static", "libloading 0.8.4", @@ -5996,6 +6087,7 @@ dependencies = [ "sha2", "shared_memory", "shutdown_hooks", + "stunclient", "sys-locale", "system_shutdown", "tao", @@ -6066,7 +6158,7 @@ version = "0.38.34" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.1", "errno", "libc", "linux-raw-sys 0.4.14", @@ -6270,7 +6362,7 @@ version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.1", "core-foundation 0.10.0", "core-foundation-sys 0.8.7", "libc", @@ -6409,6 +6501,15 @@ dependencies = [ "tzdb 0.5.10", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shared_memory" version = "0.12.4" @@ -6595,6 +6696,33 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "stun_codec" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "feed9dafe0bda84f2b6ca3ce726b0a1f1ac2e8b63c6ecfb89b08b32313247b5b" +dependencies = [ + "bytecodec", + "byteorder", + "crc", + "hmac", + "md5", + "sha1", + "trackable 1.3.0", +] + +[[package]] +name = "stunclient" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c969a14b4a4c09c320416ebf880b3d5a81ad1612065741eb10521951c06c8991" +dependencies = [ + "bytecodec", + "rand 0.8.5", + "stun_codec", + "tokio", +] + [[package]] name = "subtle" version = "2.6.1" @@ -6899,6 +7027,16 @@ dependencies = [ "syn 2.0.98", ] +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if 1.0.0", + "once_cell", +] + [[package]] name = "threadpool" version = "1.8.1" @@ -7077,16 +7215,16 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.11" +version = "0.7.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" +checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" dependencies = [ "bytes", "futures-core", "futures-io", "futures-sink", "futures-util", - "hashbrown 0.14.5", + "hashbrown 0.15.4", "pin-project-lite", "slab", "tokio", @@ -7205,9 +7343,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" -version = "0.1.40" +version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "pin-project-lite", "tracing-attributes", @@ -7216,9 +7354,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.27" +version = "0.1.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +checksum = "1b1ffbcf9c6f6b99d386e7444eb608ba646ae452a36b39737deb9663b610f662" dependencies = [ "proc-macro2 1.0.93", "quote 1.0.36", @@ -7227,11 +7365,66 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.32" +version = "0.1.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" +dependencies = [ + "nu-ansi-term 0.46.0", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", +] + +[[package]] +name = "trackable" +version = "0.2.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b98abb9e7300b9ac902cc04920945a874c1973e08c310627cc4458c04b70dd32" +dependencies = [ + "trackable 1.3.0", + "trackable_derive", +] + +[[package]] +name = "trackable" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15bd114abb99ef8cee977e517c8f37aee63f184f2d08e3e6ceca092373369ae" +dependencies = [ + "trackable_derive", +] + +[[package]] +name = "trackable_derive" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebeb235c5847e2f82cfe0f07eb971d1e5f6804b18dac2ae16349cc604380f82f" +dependencies = [ + "quote 1.0.36", + "syn 1.0.109", ] [[package]] @@ -7516,6 +7709,12 @@ dependencies = [ "bindgen 0.65.1", ] +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "vcpkg" version = "0.2.15" @@ -7709,7 +7908,7 @@ version = "0.31.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e321577a0a165911bdcfb39cf029302479d7527b517ee58ab0f6ad09edf0943" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.1", "rustix 0.38.34", "wayland-backend", "wayland-scanner", @@ -7721,7 +7920,7 @@ version = "0.32.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62989625a776e827cc0f15d41444a3cea5205b963c3a25be48ae1b52d6b4daaa" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.1", "wayland-backend", "wayland-client", "wayland-scanner", @@ -7733,7 +7932,7 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd993de54a40a40fbe5601d9f1fbcaef0aebcc5fda447d7dc8f6dcbaae4f8953" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.1", "wayland-backend", "wayland-client", "wayland-protocols", @@ -8584,7 +8783,7 @@ version = "0.39.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.1", ] [[package]] @@ -8801,6 +9000,7 @@ version = "0.7.34" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae87e3fcd617500e5d106f0380cf7b77f3c6092aae37191433159dda23cfb087" dependencies = [ + "byteorder", "zerocopy-derive 0.7.34", ] diff --git a/Cargo.toml b/Cargo.toml index 6cf0e5b86..7108e923b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,6 +81,8 @@ fon = "0.6" zip = "0.6" shutdown_hooks = "0.1" totp-rs = { version = "5.4", default-features = false, features = ["gen_secret", "otpauth"] } +stunclient = "0.4" +kcp-sys= { git = "https://github.com/rustdesk-org/kcp-sys"} [target.'cfg(not(target_os = "linux"))'.dependencies] # https://github.com/rustdesk/rustdesk/discussions/10197, not use cpal on linux diff --git a/libs/hbb_common b/libs/hbb_common index fa160b286..93f9445c6 160000 --- a/libs/hbb_common +++ b/libs/hbb_common @@ -1 +1 @@ -Subproject commit fa160b286449d4feee26c03dfa721d9997b4748b +Subproject commit 93f9445c69f30487f1479a9e4e1882773e00c7f9 diff --git a/src/client.rs b/src/client.rs index 466201436..f2cf814cd 100644 --- a/src/client.rs +++ b/src/client.rs @@ -30,7 +30,9 @@ use uuid::Uuid; use crate::{ check_port, common::input::{MOUSE_BUTTON_LEFT, MOUSE_BUTTON_RIGHT, MOUSE_TYPE_DOWN, MOUSE_TYPE_UP}, - create_symmetric_key_msg, decode_id_pk, get_rs_pk, is_keyboard_mode_supported, secure_tcp, + create_symmetric_key_msg, decode_id_pk, get_rs_pk, is_keyboard_mode_supported, + kcp_stream::KcpStream, + secure_tcp, ui_interface::{get_builtin_option, use_texture_render}, ui_session_interface::{InvokeUiSession, Session}, }; @@ -40,7 +42,6 @@ pub use file_trait::FileManager; #[cfg(not(feature = "flutter"))] #[cfg(not(any(target_os = "android", target_os = "ios")))] use hbb_common::tokio::sync::mpsc::UnboundedSender; -use hbb_common::tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use hbb_common::{ allow_err, anyhow::{anyhow, Context}, @@ -50,17 +51,23 @@ use hbb_common::{ READ_TIMEOUT, RELAY_PORT, RENDEZVOUS_PORT, RENDEZVOUS_SERVERS, }, fs::JobType, + futures::future::{select_ok, FutureExt}, get_version_number, log, message_proto::{option_message::BoolOption, *}, protobuf::{Message as _, MessageField}, rand, rendezvous_proto::*, sha2::{Digest, Sha256}, - socket_client::{connect_tcp, connect_tcp_local, ipv4_to_ipv6}, + socket_client::{connect_tcp, connect_tcp_local, ipv4_to_ipv6, new_direct_udp_for}, sodiumoxide::{base64, crypto::sign}, timeout, tokio::{ self, + net::UdpSocket, + sync::{ + mpsc::{unbounded_channel, UnboundedReceiver}, + oneshot, + }, time::{interval, Duration, Instant}, }, AddrMangle, ResultType, Stream, @@ -184,7 +191,10 @@ impl Client { token: &str, conn_type: ConnType, interface: impl Interface, - ) -> ResultType<((Stream, bool, Option>), (i32, String))> { + ) -> ResultType<( + (Stream, bool, Option>, Option), + (i32, String), + )> { debug_assert!(peer == interface.get_id()); interface.update_direct(None); interface.update_received(false); @@ -208,7 +218,10 @@ impl Client { token: &str, conn_type: ConnType, interface: impl Interface, - ) -> ResultType<((Stream, bool, Option>), (i32, String))> { + ) -> ResultType<( + (Stream, bool, Option>, Option), + (i32, String), + )> { if config::is_incoming_only() { bail!("Incoming only mode"); } @@ -220,6 +233,7 @@ impl Client { .await?, true, None, + None, ), (0, "".to_owned()), )); @@ -231,6 +245,7 @@ impl Client { connect_tcp_local(peer, None, CONNECT_TIMEOUT).await?, true, None, + None, ), (0, "".to_owned()), )); @@ -259,8 +274,29 @@ impl Client { } }; + crate::test_ipv6().await; + + let (stop_udp_tx, stop_udp_rx) = oneshot::channel::<()>(); + let mut udp = + // no need to care about multiple rendezvous servers case, since it is acutally not used any more. + // Shared state for UDP NAT test result + if let Ok((socket, addr)) = new_direct_udp_for(&rendezvous_server).await { + let udp_port = Arc::new(Mutex::new(0)); + let up_cloned = udp_port.clone(); + let socket_cloned = socket.clone(); + let func = async move { + allow_err!(test_udp_uat(socket_cloned, addr, up_cloned, stop_udp_rx).await); + }; + tokio::spawn(func); + (Some(socket), Some(udp_port)) + } else { + (None, None) + }; + let mut start = Instant::now(); let mut socket = connect_tcp(&*rendezvous_server, CONNECT_TIMEOUT).await; debug_assert!(!servers.contains(&rendezvous_server)); + let rtt = start.elapsed(); + log::debug!("TCP connection establishment time used: {:?}", rtt); if socket.is_err() && !servers.is_empty() { log::info!("try the other servers: {:?}", servers); for server in servers { @@ -280,40 +316,65 @@ impl Client { let my_addr = socket.local_addr(); let mut signed_id_pk = Vec::new(); let mut relay_server = "".to_owned(); - - if !key.is_empty() && !token.is_empty() { - // mainly for the security of token - secure_tcp(&mut socket, key).await.map_err(|e| anyhow!("Failed to secure tcp: {}", e))?; - } - - let start = std::time::Instant::now(); let mut peer_addr = Config::get_any_listen_addr(true); let mut peer_nat_type = NatType::UNKNOWN_NAT; let my_nat_type = crate::get_nat_type(100).await; let mut is_local = false; let mut feedback = 0; + let force_relay = interface.is_force_relay() || use_ws() || Config::is_proxy(); + use hbb_common::protobuf::Enum; + let nat_type = if force_relay { + NatType::SYMMETRIC + } else { + NatType::from_i32(my_nat_type).unwrap_or(NatType::UNKNOWN_NAT) + }; + + if !key.is_empty() && !token.is_empty() { + // mainly for the security of token + secure_tcp(&mut socket, key) + .await + .map_err(|e| anyhow!("Failed to secure tcp: {}", e))?; + } else if let Some(udp) = udp.1.as_ref() { + let tm = Instant::now(); + loop { + let port = *udp.lock().unwrap(); + if port > 0 { + break; + } + // await for 0.5 RTT + if tm.elapsed() > rtt / 2 { + break; + } + hbb_common::sleep(0.001).await; + } + } + // Stop UDP NAT test task if still running + let _ = stop_udp_tx.send(()); + let mut msg_out = RendezvousMessage::new(); + let mut ipv6 = if let Some((socket, addr)) = crate::get_ipv6_socket().await { + (Some(socket), Some(addr)) + } else { + (None, None) + }; + let udp_nat_port = udp.1.map(|x| *x.lock().unwrap()).unwrap_or(0); + msg_out.set_punch_hole_request(PunchHoleRequest { + id: peer.to_owned(), + token: token.to_owned(), + nat_type: nat_type.into(), + licence_key: key.to_owned(), + conn_type: conn_type.into(), + version: crate::VERSION.to_owned(), + udp_port: udp_nat_port as _, + force_relay, + socket_addr_v6: ipv6.1.unwrap_or_default(), + ..Default::default() + }); for i in 1..=3 { log::info!("#{} punch attempt with {}, id: {}", i, my_addr, peer); - let mut msg_out = RendezvousMessage::new(); - use hbb_common::protobuf::Enum; - let nat_type = if interface.is_force_relay() || use_ws() || Config::is_proxy() { - NatType::SYMMETRIC - } else { - NatType::from_i32(my_nat_type).unwrap_or(NatType::UNKNOWN_NAT) - }; - msg_out.set_punch_hole_request(PunchHoleRequest { - id: peer.to_owned(), - token: token.to_owned(), - nat_type: nat_type.into(), - licence_key: key.to_owned(), - conn_type: conn_type.into(), - version: crate::VERSION.to_owned(), - ..Default::default() - }); socket.send(&msg_out).await?; // below timeout should not bigger than hbbs's connection timeout. if let Some(msg_in) = - crate::get_next_nonkeyexchange_msg(&mut socket, Some(i * 6000)).await + crate::get_next_nonkeyexchange_msg(&mut socket, Some(i * 3000)).await { match msg_in.union { Some(rendezvous_message::Union::PunchHoleResponse(ph)) => { @@ -343,6 +404,23 @@ impl Client { relay_server = ph.relay_server; peer_addr = AddrMangle::decode(&ph.socket_addr); feedback = ph.feedback; + let s = udp.0.take(); + if ph.is_udp && s.is_some() { + if let Some(s) = s { + allow_err!(s.connect(peer_addr).await); + udp.0 = Some(s); + } + } + let s = ipv6.0.take(); + if !ph.socket_addr_v6.is_empty() && s.is_some() { + let addr = AddrMangle::decode(&ph.socket_addr_v6); + if addr.port() > 0 { + if let Some(s) = s { + allow_err!(s.connect(addr).await); + ipv6.0 = Some(s); + } + } + } log::info!("Hole Punched {} = {}", peer, peer_addr); break; } @@ -353,20 +431,44 @@ impl Client { start.elapsed(), rr.relay_server ); + start = Instant::now(); + let mut connect_futures = Vec::new(); + if let Some(s) = ipv6.0 { + let addr = AddrMangle::decode(&rr.socket_addr_v6); + if addr.port() > 0 { + if s.connect(addr).await.is_ok() { + connect_futures.push(udp_nat_connect(s, "IPv6").boxed()); + } + } + } signed_id_pk = rr.pk().into(); - let mut conn = Self::create_relay( + let fut = Self::create_relay( peer, rr.uuid, rr.relay_server, key, conn_type, my_addr.is_ipv4(), - ) - .await?; + ); + connect_futures.push( + async move { + let conn = fut.await?; + Ok((conn, None, "Relay")) + } + .boxed(), + ); + // Run all connection attempts concurrently, return the first successful one + let (conn, kcp, typ) = match select_ok(connect_futures).await { + Ok(conn) => (Ok(conn.0 .0), conn.0 .1, conn.0 .2), + + Err(e) => (Err(e), None, ""), + }; + let mut conn = conn?; feedback = rr.feedback; + log::info!("{:?} used to establish {typ} connection", start.elapsed()); let pk = Self::secure_connection(peer, signed_id_pk, key, &mut conn).await?; - return Ok(((conn, false, pk), (feedback, rendezvous_server))); + return Ok(((conn, false, pk, kcp), (feedback, rendezvous_server))); } _ => { log::error!("Unexpected protobuf msg received: {:?}", msg_in); @@ -405,6 +507,8 @@ impl Client { token, conn_type, interface, + udp.0, + ipv6.0, ) .await?, (feedback, rendezvous_server), @@ -427,7 +531,9 @@ impl Client { token: &str, conn_type: ConnType, interface: impl Interface, - ) -> ResultType<(Stream, bool, Option>)> { + udp_socket_nat: Option>, + udp_socket_v6: Option>, + ) -> ResultType<(Stream, bool, Option>, Option)> { let direct_failures = interface.get_lch().read().unwrap().direct_failures; let mut connect_timeout = 0; const MIN: u64 = 1000; @@ -462,8 +568,28 @@ impl Client { } log::info!("peer address: {}, timeout: {}", peer, connect_timeout); let start = std::time::Instant::now(); - // NOTICE: Socks5 is be used event in intranet. Which may be not a good way. - let mut conn = connect_tcp_local(peer, Some(local_addr), connect_timeout).await; + + let mut connect_futures = Vec::new(); + let fut = connect_tcp_local(peer, Some(local_addr), connect_timeout); + connect_futures.push( + async move { + let conn = fut.await?; + Ok((conn, None, "TCP")) + } + .boxed(), + ); + if let Some(udp_socket_nat) = udp_socket_nat { + connect_futures.push(udp_nat_connect(udp_socket_nat, "UDP").boxed()); + } + if let Some(udp_socket_v6) = udp_socket_v6 { + connect_futures.push(udp_nat_connect(udp_socket_v6, "IPv6").boxed()); + } + // Run all connection attempts concurrently, return the first successful one + let (mut conn, kcp, mut typ) = match select_ok(connect_futures).await { + Ok(conn) => (Ok(conn.0 .0), conn.0 .1, conn.0 .2), + Err(e) => (Err(e), None, ""), + }; + let mut direct = !conn.is_err(); interface.update_direct(Some(direct)); if interface.is_force_relay() || conn.is_err() { @@ -482,6 +608,7 @@ impl Client { if let Err(e) = conn { bail!("Failed to connect via relay server: {}", e); } + typ = "Relay"; direct = false; } else { bail!("Failed to make direct connection to remote desktop"); @@ -493,9 +620,9 @@ impl Client { interface.get_lch().write().unwrap().set_direct_failure(n); } let mut conn = conn?; - log::info!("{:?} used to establish connection", start.elapsed()); + log::info!("{:?} used to establish {typ} connection", start.elapsed()); let pk = Self::secure_connection(peer_id, signed_id_pk, key, &mut conn).await?; - Ok((conn, direct, pk)) + Ok((conn, direct, pk, kcp)) } /// Establish secure connection with the server. @@ -3707,3 +3834,125 @@ pub mod peer_online { } } } + +async fn test_udp_uat( + udp_socket: Arc, + server_addr: SocketAddr, + udp_port: Arc>, + mut stop_udp_rx: oneshot::Receiver<()>, +) -> ResultType<()> { + let (tx, mut rx) = oneshot::channel::<_>(); + tokio::spawn(async { + if let Ok(v) = crate::test_nat_ipv4().await { + tx.send(v).ok(); + } + }); + + let start = Instant::now(); + let mut msg_out = RendezvousMessage::new(); + msg_out.set_test_nat_request(TestNatRequest { + ..Default::default() + }); + // Adaptive retry strategy that works within TCP RTT constraints + // Start with aggressive sending, then back off + let mut retry_interval = Duration::from_millis(20); // Start fast + const MAX_INTERVAL: Duration = Duration::from_millis(200); + let mut packets_sent = 0; + + // Send initial burst to improve reliability + let data = msg_out.write_to_bytes()?; + for _ in 0..2 { + if let Err(e) = udp_socket.send_to(&data, server_addr).await { + log::warn!("Failed to send initial UDP NAT test packet: {}", e); + } else { + packets_sent += 1; + } + } + let mut last_send_time = Instant::now(); + let mut buf = [0u8; 1024]; + + loop { + tokio::select! { + Ok((addr, server)) = &mut rx => { + *udp_port.lock().unwrap() = addr.port(); + log::debug!("UDP NAT test received response from {}: {}", addr, server); + break; + } + _ = &mut stop_udp_rx => { + log::debug!("UDP NAT test received stop signal after {} packets", packets_sent); + break; + } + _ = hbb_common::sleep(retry_interval.as_secs_f32()) => { + // Adaptive retry: send fewer packets as time goes on + let elapsed = last_send_time.elapsed(); + + if elapsed >= retry_interval { + // Send single packet (not double) to reduce network load + if let Err(e) = udp_socket.send_to(&data, server_addr).await { + log::warn!("Failed to send UDP NAT test retry packet: {}", e); + } else { + packets_sent += 1; + } + + // Exponentially increase interval to reduce network pressure + retry_interval = std::cmp::min( + Duration::from_millis((retry_interval.as_millis() as f64 * 1.5) as u64), + MAX_INTERVAL + ); + last_send_time = Instant::now(); + } + } + res = udp_socket.recv(&mut buf[..]) => { + match res { + Ok(n) => { + match RendezvousMessage::parse_from_bytes(&buf[0..n]) { + Ok(msg_in) => { + if let Some(rendezvous_message::Union::TestNatResponse(response)) = msg_in.union { + *udp_port.lock().unwrap() = response.port as u16; + break; + } + } + Err(e) => { + log::warn!("Failed to parse UDP NAT test response: {}", e); + } + } + } + Err(e) => { + log::warn!("UDP NAT test socket error: {}", e); + } + } + } + } + } + + let final_port = *udp_port.lock().unwrap(); + log::debug!( + "UDP NAT test to {:?} finished: time={:?}, port={}, packets_sent={}, success={}", + server_addr, + start.elapsed(), + final_port, + packets_sent, + final_port > 0 + ); + Ok(()) +} + +#[inline] +async fn udp_nat_connect( + socket: Arc, + typ: &'static str, +) -> ResultType<(Stream, Option, &'static str)> { + crate::punch_udp(socket.clone(), false) + .await + .map_err(|err| { + log::debug!("{err}"); + anyhow!(err) + })?; + let res = KcpStream::connect(socket, Duration::from_secs(CONNECT_TIMEOUT as _)) + .await + .map_err(|err| { + log::debug!("Failed to connect KCP stream: {}", err); + anyhow!(err) + })?; + Ok((res.1, Some(res.0), typ)) +} diff --git a/src/client/io_loop.rs b/src/client/io_loop.rs index 687c942d9..39f9185a0 100644 --- a/src/client/io_loop.rs +++ b/src/client/io_loop.rs @@ -172,7 +172,7 @@ impl Remote { ) .await { - Ok(((mut peer, direct, pk), (feedback, rendezvous_server))) => { + Ok(((mut peer, direct, pk, _kcp), (feedback, rendezvous_server))) => { self.handler .connection_round_state .lock() diff --git a/src/common.rs b/src/common.rs index f78cb9e9b..8ca291b2b 100644 --- a/src/common.rs +++ b/src/common.rs @@ -1,10 +1,12 @@ use std::{ collections::HashMap, future::Future, + net::{SocketAddr, ToSocketAddrs}, sync::{Arc, Mutex, RwLock}, task::Poll, }; +use default_net::ip; use serde_json::{json, Map, Value}; #[cfg(not(any(target_os = "android", target_os = "ios")))] @@ -23,10 +25,10 @@ use hbb_common::{ rendezvous_proto::*, socket_client, sodiumoxide::crypto::{box_, secretbox, sign}, - tcp::FramedStream, timeout, tokio::{ self, + net::UdpSocket, time::{Duration, Instant, Interval}, }, ResultType, Stream, @@ -78,6 +80,7 @@ lazy_static::lazy_static! { pub static ref SOFTWARE_UPDATE_URL: Arc> = Default::default(); pub static ref DEVICE_ID: Arc> = Default::default(); pub static ref DEVICE_NAME: Arc> = Default::default(); + static ref PUBLIC_IPV6_ADDR: Arc, Option)>> = Default::default(); } lazy_static::lazy_static! { @@ -526,6 +529,7 @@ impl Drop for CheckTestNatType { } pub fn test_nat_type() { + test_ipv6_sync(); use std::sync::atomic::{AtomicBool, Ordering}; std::thread::spawn(move || { static IS_RUNNING: AtomicBool = AtomicBool::new(false); @@ -1674,6 +1678,318 @@ pub fn get_hwid() -> Bytes { Bytes::from(hasher.finalize().to_vec()) } +#[inline] +pub fn get_builtin_option(key: &str) -> String { + config::BUILTIN_SETTINGS + .read() + .unwrap() + .get(key) + .cloned() + .unwrap_or_default() +} + +#[inline] +pub fn is_custom_client() -> bool { + get_app_name() != "RustDesk" +} + +pub fn verify_login(raw: &str, id: &str) -> bool { + true + /* + if is_custom_client() { + return true; + } + #[cfg(debug_assertions)] + return true; + let Ok(pk) = crate::decode64("IycjQd4TmWvjjLnYd796Rd+XkK+KG+7GU1Ia7u4+vSw=") else { + return false; + }; + let Some(key) = get_pk(&pk).map(|x| sign::PublicKey(x)) else { + return false; + }; + let Ok(v) = crate::decode64(raw) else { + return false; + }; + let raw = sign::verify(&v, &key).unwrap_or_default(); + let v_str = std::str::from_utf8(&raw) + .unwrap_or_default() + .split(":") + .next() + .unwrap_or_default(); + v_str == id + */ +} + +#[inline] +pub fn is_udp_disabled() -> bool { + get_builtin_option(config::keys::OPTION_DISABLE_UDP) == "Y" +} + +// this crate https://github.com/yoshd/stun-client supports nat type +async fn stun_ipv6_test(stun_server: &str) -> ResultType<(SocketAddr, String)> { + use std::net::ToSocketAddrs; + use stunclient::StunClient; + let local_addr = SocketAddr::from(([0u16; 8], 0)); // [::]:0 + let socket = UdpSocket::bind(&local_addr).await?; + let Some(stun_addr) = stun_server + .to_socket_addrs()? + .filter(|x| x.is_ipv6()) + .next() + else { + bail!( + "Failed to resolve STUN ipv6 server address: {}", + stun_server + ); + }; + let client = StunClient::new(stun_addr); + let addr = client.query_external_address_async(&socket).await?; + Ok(if addr.ip().is_ipv6() { + (addr, stun_server.to_owned()) + } else { + bail!("STUN server returned non-IPv6 address: {}", addr) + }) +} + +async fn stun_ipv4_test(stun_server: &str) -> ResultType<(SocketAddr, String)> { + use std::net::ToSocketAddrs; + use stunclient::StunClient; + let local_addr = SocketAddr::from(([0u8; 4], 0)); + let socket = UdpSocket::bind(&local_addr).await?; + let Some(stun_addr) = stun_server + .to_socket_addrs()? + .filter(|x| x.is_ipv4()) + .next() + else { + bail!( + "Failed to resolve STUN ipv4 server address: {}", + stun_server + ); + }; + let client = StunClient::new(stun_addr); + let addr = client.query_external_address_async(&socket).await?; + Ok(if addr.ip().is_ipv4() { + (addr, stun_server.to_owned()) + } else { + bail!("STUN server returned non-IPv6 address: {}", addr) + }) +} + +static STUNS_V4: [&str; 3] = [ + "stun.l.google.com:19302", + "stun.cloudflare.com:3478", + "stun.chat.bilibili.com:3478", +]; + +static STUNS_V6: [&str; 3] = [ + "stun.l.google.com:19302", + "stun.cloudflare.com:3478", + "stun.nextcloud.com:3478", +]; + +pub async fn test_nat_ipv4() -> ResultType<(SocketAddr, String)> { + use hbb_common::futures::future::{select_ok, FutureExt}; + let tests = STUNS_V4 + .iter() + .map(|&stun| stun_ipv4_test(stun).boxed()) + .collect::>(); + + match select_ok(tests).await { + Ok(res) => { + return Ok(res.0); + } + Err(e) => { + bail!( + "Failed to get public IPv4 address via public STUN servers: {}", + e + ); + } + }; +} + +async fn test_bind_ipv6() -> ResultType { + let local_addr = SocketAddr::from(([0u16; 8], 0)); // [::]:0 + let socket = UdpSocket::bind(local_addr).await?; + let addr = STUNS_V6[0] + .to_socket_addrs()? + .filter(|x| x.is_ipv6()) + .next() + .ok_or_else(|| { + anyhow!( + "Failed to resolve STUN ipv6 server address: {}", + STUNS_V6[0] + ) + })?; + socket.connect(addr).await?; + Ok(socket.local_addr()?) +} + +pub async fn test_ipv6() -> Option> { + if PUBLIC_IPV6_ADDR + .lock() + .unwrap() + .1 + .map(|x| x.elapsed().as_secs() < 60) + .unwrap_or(false) + { + return None; + } + PUBLIC_IPV6_ADDR.lock().unwrap().1 = Some(Instant::now()); + + match test_bind_ipv6().await { + Ok(mut addr) => { + if let std::net::IpAddr::V6(ip) = addr.ip() { + if !ip.is_loopback() + && !ip.is_unspecified() + && !ip.is_multicast() + && (ip.segments()[0] & 0xe000) == 0x2000 + { + addr.set_port(0); + PUBLIC_IPV6_ADDR.lock().unwrap().0 = Some(addr); + log::debug!("Found public IPv6 address locally: {}", addr); + } + } + } + Err(e) => { + log::warn!("Failed to bind IPv6 socket: {}", e); + } + } + // Interestingly, on my macOS, sometimes my ipv6 works, sometimes not (test with ping6 or https://test-ipv6.com/). + // I checked ifconfig, could not see any difference. Both secure ipv6 and temporary ipv6 are there. + // So we can not rely on the local ipv6 address queries with if_addrs. + // above test_bind_ipv6 is safer, because it can fail in this case. + /* + std::thread::spawn(|| { + if let Ok(ifaces) = if_addrs::get_if_addrs() { + for iface in ifaces { + if let if_addrs::IfAddr::V6(v6) = iface.addr { + let ip = v6.ip; + if !ip.is_loopback() + && !ip.is_unspecified() + && !ip.is_multicast() + && !ip.is_unique_local() + && !ip.is_unicast_link_local() + && (ip.segments()[0] & 0xe000) == 0x2000 + { + // only use the first one, on mac, the first one is the stable + // one, the last one is the temporary one. The middle ones are deperecated. + *PUBLIC_IPV6_ADDR.lock().unwrap() = + Some((SocketAddr::from((ip, 0)), Instant::now())); + log::debug!("Found public IPv6 address locally: {}", ip); + break; + } + } + } + } + }); + */ + + Some(tokio::spawn(async { + use hbb_common::futures::future::{select_ok, FutureExt}; + let tests = STUNS_V6 + .iter() + .map(|&stun| stun_ipv6_test(stun).boxed()) + .collect::>(); + + match select_ok(tests).await { + Ok(res) => { + let mut addr = res.0 .0; + addr.set_port(0); // Set port to 0 to avoid conflicts + PUBLIC_IPV6_ADDR.lock().unwrap().0 = Some(addr); + log::debug!( + "Found public IPv6 address via STUN server {}: {}", + res.0 .1, + addr + ); + } + Err(e) => { + log::error!("Failed to get public IPv6 address: {}", e); + } + }; + })) +} + +pub async fn punch_udp( + socket: Arc, + listen: bool, +) -> ResultType> { + let mut retry_interval = Duration::from_millis(20); + const MAX_INTERVAL: Duration = Duration::from_millis(200); + const MAX_TIME: Duration = Duration::from_secs(20); + let mut packets_sent = 0; + socket.send(&[]).await.ok(); + packets_sent += 1; + let mut last_send_time = Instant::now(); + let tm = Instant::now(); + let mut data = [0u8; 1024]; + + loop { + tokio::select! { + _ = hbb_common::sleep(retry_interval.as_secs_f32()) => { + if tm.elapsed() > MAX_TIME { + bail!("UDP punch is timed out, stop sending packets after {:?} packets", packets_sent); + } + let elapsed = last_send_time.elapsed(); + + if elapsed >= retry_interval { + socket.send(&[]).await.ok(); + packets_sent += 1; + + // Exponentially increase interval to reduce network pressure + retry_interval = std::cmp::min( + Duration::from_millis((retry_interval.as_millis() as f64 * 1.5) as u64), + MAX_INTERVAL + ); + last_send_time = Instant::now(); + } + } + res = socket.recv(&mut data) => match res { + Err(e) => bail!("UDP punch failed, {packets_sent} packets sent: {e}"), + Ok(n) => { + // log::debug!("UDP punch succeeded after sending {} packets after {:?}", packets_sent, tm.elapsed()); + if listen { + if n == 0 { + continue; + } + return Ok(Some(bytes::BytesMut::from(&data[..n]))); + } + return Ok(None); + } + } + } + } +} + +fn test_ipv6_sync() { + #[tokio::main(flavor = "current_thread")] + async fn func() { + if let Some(job) = test_ipv6().await { + job.await.ok(); + } + } + std::thread::spawn(func); +} + +pub async fn get_ipv6_socket() -> Option<(Arc, bytes::Bytes)> { + let Some(addr) = PUBLIC_IPV6_ADDR.lock().unwrap().0 else { + return None; + }; + + match UdpSocket::bind(addr).await { + Err(err) => { + log::warn!("Failed to create UDP socket for IPv6: {err}"); + } + Ok(socket) => { + if let Ok(local_addr_v6) = socket.local_addr() { + return Some(( + Arc::new(socket), + hbb_common::AddrMangle::encode(local_addr_v6).into(), + )); + } + } + } + None +} + #[cfg(test)] mod tests { use super::*; @@ -1815,45 +2131,3 @@ mod tests { ); } } - -#[inline] -pub fn get_builtin_option(key: &str) -> String { - config::BUILTIN_SETTINGS - .read() - .unwrap() - .get(key) - .cloned() - .unwrap_or_default() -} - -#[inline] -pub fn is_custom_client() -> bool { - get_app_name() != "RustDesk" -} - -pub fn verify_login(raw: &str, id: &str) -> bool { - true - /* - if is_custom_client() { - return true; - } - #[cfg(debug_assertions)] - return true; - let Ok(pk) = crate::decode64("IycjQd4TmWvjjLnYd796Rd+XkK+KG+7GU1Ia7u4+vSw=") else { - return false; - }; - let Some(key) = get_pk(&pk).map(|x| sign::PublicKey(x)) else { - return false; - }; - let Ok(v) = crate::decode64(raw) else { - return false; - }; - let raw = sign::verify(&v, &key).unwrap_or_default(); - let v_str = std::str::from_utf8(&raw) - .unwrap_or_default() - .split(":") - .next() - .unwrap_or_default(); - v_str == id - */ -} diff --git a/src/kcp_stream.rs b/src/kcp_stream.rs new file mode 100644 index 000000000..46538e6ba --- /dev/null +++ b/src/kcp_stream.rs @@ -0,0 +1,141 @@ +use hbb_common::{ + anyhow, + bytes::{Bytes, BytesMut}, + bytes_codec::BytesCodec, + config, log, + tcp::{DynTcpStream, FramedStream}, + tokio::{self, net::UdpSocket, sync::mpsc, sync::oneshot}, + tokio_util, ResultType, Stream, +}; +use kcp_sys::{ + endpoint::KcpEndpoint, + packet_def::{KcpPacket, KcpPacketHeader}, + stream, +}; +use std::{net::SocketAddr, sync::Arc}; + +pub struct KcpStream { + _endpoint: KcpEndpoint, + stop_sender: Option>, +} + +impl KcpStream { + fn create_framed(stream: stream::KcpStream, local_addr: Option) -> Stream { + Stream::Tcp(FramedStream( + tokio_util::codec::Framed::new(DynTcpStream(Box::new(stream)), BytesCodec::new()), + local_addr.unwrap_or(config::Config::get_any_listen_addr(true)), + None, + 0, + )) + } + + pub async fn accept( + udp_socket: Arc, + timeout: std::time::Duration, + init_packet: Option, + ) -> ResultType<(Self, Stream)> { + let mut endpoint = KcpEndpoint::new(); + endpoint.run().await; + + let (input, output) = (endpoint.input_sender(), endpoint.output_receiver().unwrap()); + let (stop_sender, stop_receiver) = oneshot::channel(); + if let Some(packet) = init_packet { + if packet.len() >= size_of::() { + input.send(packet.into()).await?; + } + } + Self::kcp_io(udp_socket.clone(), input, output, stop_receiver).await; + + let conn_id = tokio::time::timeout(timeout, endpoint.accept()).await??; + if let Some(stream) = stream::KcpStream::new(&endpoint, conn_id) { + Ok(( + Self { + _endpoint: endpoint, + stop_sender: Some(stop_sender), + }, + Self::create_framed(stream, udp_socket.local_addr().ok()), + )) + } else { + Err(anyhow::anyhow!("Failed to create KcpStream")) + } + } + + pub async fn connect( + udp_socket: Arc, + timeout: std::time::Duration, + ) -> ResultType<(Self, Stream)> { + let mut endpoint = KcpEndpoint::new(); + endpoint.run().await; + + let (input, output) = (endpoint.input_sender(), endpoint.output_receiver().unwrap()); + let (stop_sender, stop_receiver) = oneshot::channel(); + Self::kcp_io(udp_socket.clone(), input, output, stop_receiver).await; + + let conn_id = endpoint.connect(timeout, 0, 0, Bytes::new()).await.unwrap(); + if let Some(stream) = stream::KcpStream::new(&endpoint, conn_id) { + Ok(( + Self { + _endpoint: endpoint, + stop_sender: Some(stop_sender), + }, + Self::create_framed(stream, udp_socket.local_addr().ok()), + )) + } else { + Err(anyhow::anyhow!("Failed to create KcpStream")) + } + } + + async fn kcp_io( + udp_socket: Arc, + input: mpsc::Sender, + mut output: mpsc::Receiver, + mut stop_receiver: oneshot::Receiver<()>, + ) { + let udp = udp_socket.clone(); + tokio::spawn(async move { + let mut buf = vec![0; 10240]; + loop { + tokio::select! { + _ = &mut stop_receiver => { + log::debug!("KCP io loop received stop signal"); + break; + } + Some(data) = output.recv() => { + if let Err(e) = udp.send(&data.inner()).await { + log::debug!("KCP send error: {:?}", e); + break; + } + } + result = udp.recv_from(&mut buf) => { + match result { + Ok((size, _)) => { + if size < size_of::() { + continue; + } + input + .send(BytesMut::from(&buf[..size]).into()) + .await.ok(); + } + Err(e) => { + log::debug!("KCP recv_from error: {:?}", e); + break; + } + } + } + else => { + log::debug!("KCP endpoint input closed"); + break; + } + } + } + }); + } +} + +impl Drop for KcpStream { + fn drop(&mut self) { + if let Some(sender) = self.stop_sender.take() { + let _ = sender.send(()); + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 31d672cd3..0711416fd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -71,3 +71,5 @@ pub mod privacy_mode; #[cfg(windows)] pub mod virtual_display_manager; + +mod kcp_stream; \ No newline at end of file diff --git a/src/port_forward.rs b/src/port_forward.rs index 28ac624cd..bd4b9fb78 100644 --- a/src/port_forward.rs +++ b/src/port_forward.rs @@ -118,7 +118,7 @@ async fn connect_and_login( } else { ConnType::PORT_FORWARD }; - let ((mut stream, direct, _pk), (feedback, rendezvous_server)) = + let ((mut stream, direct, _pk, _kcp), (feedback, rendezvous_server)) = Client::start(id, key, token, conn_type, interface.clone()).await?; interface.update_direct(Some(direct)); let mut buffer = Vec::new(); diff --git a/src/rendezvous_mediator.rs b/src/rendezvous_mediator.rs index 09e3b4fb9..dca1f0721 100644 --- a/src/rendezvous_mediator.rs +++ b/src/rendezvous_mediator.rs @@ -4,7 +4,7 @@ use std::{ atomic::{AtomicBool, Ordering}, Arc, }, - time::Instant, + time::{Duration, Instant}, }; use uuid::Uuid; @@ -18,10 +18,9 @@ use hbb_common::{ futures::future::join_all, log, protobuf::Message as _, - proxy::Proxy, rendezvous_proto::*, sleep, - socket_client::{self, connect_tcp, is_ipv4}, + socket_client::{self, connect_tcp, is_ipv4, new_direct_udp_for, new_udp_for}, tokio::{self, select, sync::Mutex, time::interval}, udp::FramedSocket, AddrMangle, IntoTargetAddr, ResultType, Stream, TargetAddr, @@ -30,13 +29,13 @@ use hbb_common::{ use crate::{ check_port, server::{check_zombie, new as new_server, ServerPtr}, - ui_interface::get_builtin_option, }; type Message = RendezvousMessage; lazy_static::lazy_static! { - static ref SOLVING_PK_MISMATCH: Arc> = Default::default(); + static ref SOLVING_PK_MISMATCH: Mutex = Default::default(); + static ref LAST_MSG: Mutex<(SocketAddr, Instant)> = Mutex::new((SocketAddr::new([0; 4].into(), 0), Instant::now())); } static SHOULD_EXIT: AtomicBool = AtomicBool::new(false); static MANUAL_RESTARTED: AtomicBool = AtomicBool::new(false); @@ -142,7 +141,7 @@ impl RendezvousMediator { pub async fn start_udp(server: ServerPtr, host: String) -> ResultType<()> { let host = check_port(&host, RENDEZVOUS_PORT); log::info!("start udp: {host}"); - let (mut socket, mut addr) = socket_client::new_udp_for(&host, CONNECT_TIMEOUT).await?; + let (mut socket, mut addr) = new_udp_for(&host, CONNECT_TIMEOUT).await?; let mut rz = Self { addr: addr.clone(), host: host.clone(), @@ -388,7 +387,7 @@ impl RendezvousMediator { if (cfg!(debug_assertions) && option_env!("TEST_TCP").is_some()) || Config::is_proxy() || use_ws() - || get_builtin_option(config::keys::OPTION_DISABLE_UDP) == "Y" + || crate::is_udp_disabled() { Self::start_tcp(server, host).await } else { @@ -404,6 +403,7 @@ impl RendezvousMediator { server, rr.secure, false, + Default::default(), ) .await } @@ -416,6 +416,7 @@ impl RendezvousMediator { server: ServerPtr, secure: bool, initiate: bool, + socket_addr_v6: bytes::Bytes, ) -> ResultType<()> { let peer_addr = AddrMangle::decode(&socket_addr); log::info!( @@ -432,6 +433,7 @@ impl RendezvousMediator { let mut rr = RelayResponse { socket_addr: socket_addr.into(), version: crate::VERSION.to_owned(), + socket_addr_v6, ..Default::default() }; if initiate { @@ -454,16 +456,28 @@ impl RendezvousMediator { } async fn handle_intranet(&self, fla: FetchLocalAddr, server: ServerPtr) -> ResultType<()> { + let addr = AddrMangle::decode(&fla.socket_addr); + let last = *LAST_MSG.lock().await; + *LAST_MSG.lock().await = (addr, Instant::now()); + // skip duplicate punch hole messages + if last.0 == addr && last.1.elapsed().as_millis() < 100 { + return Ok(()); + } + let peer_addr_v6 = hbb_common::AddrMangle::decode(&fla.socket_addr_v6); let relay_server = self.get_relay_server(fla.relay_server.clone()); - // nat64, go relay directly, because current hbbs will crash if demangle ipv6 address - // websocket, go relay directly - if is_ipv4(&self.addr) - && !config::is_disable_tcp_listen() - && !Config::is_proxy() - && !use_ws() - { + let relay = use_ws() || Config::is_proxy(); + let mut socket_addr_v6 = Default::default(); + if peer_addr_v6.port() > 0 && !relay { + socket_addr_v6 = start_ipv6(peer_addr_v6, addr, server.clone()).await; + } + if is_ipv4(&self.addr) && !relay && !config::is_disable_tcp_listen() { if let Err(err) = self - .handle_intranet_(fla.clone(), server.clone(), relay_server.clone()) + .handle_intranet_( + fla.clone(), + server.clone(), + relay_server.clone(), + socket_addr_v6.clone(), + ) .await { log::debug!("Failed to handle intranet: {:?}, will try relay", err); @@ -479,6 +493,7 @@ impl RendezvousMediator { server, true, true, + socket_addr_v6, ) .await } @@ -488,6 +503,7 @@ impl RendezvousMediator { fla: FetchLocalAddr, server: ServerPtr, relay_server: String, + socket_addr_v6: bytes::Bytes, ) -> ResultType<()> { let peer_addr = AddrMangle::decode(&fla.socket_addr); log::debug!("Handle intranet from {:?}", peer_addr); @@ -503,6 +519,7 @@ impl RendezvousMediator { local_addr: AddrMangle::encode(local_addr).into(), relay_server, version: crate::VERSION.to_owned(), + socket_addr_v6, ..Default::default() }); let bytes = msg_out.write_to_bytes()?; @@ -512,13 +529,25 @@ impl RendezvousMediator { } async fn handle_punch_hole(&self, ph: PunchHole, server: ServerPtr) -> ResultType<()> { + let mut peer_addr = AddrMangle::decode(&ph.socket_addr); + let last = *LAST_MSG.lock().await; + *LAST_MSG.lock().await = (peer_addr, Instant::now()); + // skip duplicate punch hole messages + if last.0 == peer_addr && last.1.elapsed().as_millis() < 100 { + return Ok(()); + } + let peer_addr_v6 = hbb_common::AddrMangle::decode(&ph.socket_addr_v6); + let relay = use_ws() || Config::is_proxy() || ph.force_relay; + let mut socket_addr_v6 = Default::default(); + if peer_addr_v6.port() > 0 && !relay { + socket_addr_v6 = start_ipv6(peer_addr_v6, peer_addr, server.clone()).await; + } let relay_server = self.get_relay_server(ph.relay_server); // for ensure, websocket go relay directly if ph.nat_type.enum_value() == Ok(NatType::SYMMETRIC) || Config::get_nat_type() == NatType::SYMMETRIC as i32 - || config::is_disable_tcp_listen() - || use_ws() - || Config::is_proxy() + || relay + || (config::is_disable_tcp_listen() && ph.udp_port <= 0) { let uuid = Uuid::new_v4().to_string(); return self @@ -529,11 +558,27 @@ impl RendezvousMediator { server, true, true, + socket_addr_v6.clone(), ) .await; } - let peer_addr = AddrMangle::decode(&ph.socket_addr); - log::debug!("Punch hole to {:?}", peer_addr); + use hbb_common::protobuf::Enum; + let nat_type = NatType::from_i32(Config::get_nat_type()).unwrap_or(NatType::UNKNOWN_NAT); + let msg_punch = PunchHoleSent { + socket_addr: ph.socket_addr, + id: Config::get_id(), + relay_server, + nat_type: nat_type.into(), + version: crate::VERSION.to_owned(), + socket_addr_v6, + ..Default::default() + }; + if ph.udp_port > 0 { + peer_addr.set_port(ph.udp_port as u16); + self.punch_udp_hole(peer_addr, server, msg_punch).await?; + return Ok(()); + } + log::debug!("Punch tcp hole to {:?}", peer_addr); let mut socket = { let socket = connect_tcp(&*self.host, CONNECT_TIMEOUT).await?; let local_addr = socket.local_addr(); @@ -543,22 +588,36 @@ impl RendezvousMediator { socket }; let mut msg_out = Message::new(); - use hbb_common::protobuf::Enum; - let nat_type = NatType::from_i32(Config::get_nat_type()).unwrap_or(NatType::UNKNOWN_NAT); - msg_out.set_punch_hole_sent(PunchHoleSent { - socket_addr: ph.socket_addr, - id: Config::get_id(), - relay_server, - nat_type: nat_type.into(), - version: crate::VERSION.to_owned(), - ..Default::default() - }); + msg_out.set_punch_hole_sent(msg_punch); let bytes = msg_out.write_to_bytes()?; socket.send_raw(bytes).await?; crate::accept_connection(server.clone(), socket, peer_addr, true).await; Ok(()) } + async fn punch_udp_hole( + &self, + peer_addr: SocketAddr, + server: ServerPtr, + msg_punch: PunchHoleSent, + ) -> ResultType<()> { + let mut msg_out = Message::new(); + msg_out.set_punch_hole_sent(msg_punch); + let (socket, addr) = new_direct_udp_for(&self.host).await?; + let data = msg_out.write_to_bytes()?; + socket.send_to(&data, addr).await?; + let socket_cloned = socket.clone(); + tokio::spawn(async move { + for _ in 0..2 { + let tm = (hbb_common::time_based_rand() % 20 + 10) as f32 / 1000.; + hbb_common::sleep(tm).await; + socket.send_to(&data, addr).await.ok(); + } + }); + udp_nat_listen(socket_cloned.clone(), peer_addr, peer_addr, server).await?; + Ok(()) + } + async fn register_pk(&mut self, socket: Sink<'_>) -> ResultType<()> { let mut msg_out = Message::new(); let pk = Config::get_key_pair().1; @@ -722,3 +781,49 @@ impl Sink<'_> { } } } + +async fn start_ipv6( + peer_addr_v6: SocketAddr, + peer_addr_v4: SocketAddr, + server: ServerPtr, +) -> bytes::Bytes { + crate::test_ipv6().await; + if let Some((socket, local_addr_v6)) = crate::get_ipv6_socket().await { + let server = server.clone(); + tokio::spawn(async move { + allow_err!(udp_nat_listen(socket.clone(), peer_addr_v6, peer_addr_v4, server).await); + }); + return local_addr_v6; + } + Default::default() +} + +async fn udp_nat_listen( + socket: Arc, + peer_addr: SocketAddr, + peer_addr_v4: SocketAddr, + server: ServerPtr, +) -> ResultType<()> { + let tm = Instant::now(); + let socket_cloned = socket.clone(); + let func = async { + socket.connect(peer_addr).await?; + let res = crate::punch_udp(socket.clone(), true).await?; + let stream = crate::kcp_stream::KcpStream::accept( + socket, + Duration::from_secs(CONNECT_TIMEOUT as _), + res, + ) + .await?; + crate::server::create_tcp_connection(server, stream.1, peer_addr_v4, true).await?; + Ok(()) + }; + func.await.map_err(|e: anyhow::Error| { + anyhow::anyhow!( + "Stop listening on {:?} for remote {peer_addr} with KCP, {:?} elapsed: {e}", + socket_cloned.local_addr(), + tm.elapsed() + ) + })?; + Ok(()) +} diff --git a/src/server.rs b/src/server.rs index 87e6f390f..4d70fa5fa 100644 --- a/src/server.rs +++ b/src/server.rs @@ -246,7 +246,7 @@ pub async fn accept_connection( secure: bool, ) { if let Err(err) = accept_connection_(server, socket, secure).await { - log::error!("Failed to accept connection from {}: {}", peer_addr, err); + log::warn!("Failed to accept connection from {}: {}", peer_addr, err); } } diff --git a/src/server/connection.rs b/src/server/connection.rs index 3176fe686..071eaf07e 100644 --- a/src/server/connection.rs +++ b/src/server/connection.rs @@ -1843,7 +1843,7 @@ impl Connection { ) .await { - log::error!("ipc to connection manager exit: {}", err); + log::warn!("ipc to connection manager exit: {}", err); // https://github.com/rustdesk/rustdesk-server-pro/discussions/382#discussioncomment-10525725, cm may start failed #[cfg(windows)] if !crate::platform::is_prelogin()