From 9b2579246ceb5c79af083ea191c52b35f8f5cbf8 Mon Sep 17 00:00:00 2001 From: Aaron Siddhartha Mondal Date: Fri, 12 Jul 2024 18:30:00 +0200 Subject: [PATCH] Migrate to hyper 1.x, axum 0.7.x, tonic 0.12.x https://www.youtube.com/watch?v=7Twnmhe948A&ab_channel=Kontor.TV --- BUILD.bazel | 3 +- Cargo.lock | 258 ++++++---- Cargo.toml | 17 +- nativelink-config/Cargo.toml | 2 +- nativelink-error/Cargo.toml | 12 +- nativelink-error/src/lib.rs | 6 + nativelink-macro/Cargo.toml | 6 +- nativelink-proto/Cargo.toml | 10 +- .../build.bazel.remote.execution.v2.pb.rs | 113 ++--- ..._machina.nativelink.remote_execution.pb.rs | 26 +- .../genproto/google.bytestream.pb.rs | 29 +- .../genproto/google.devtools.build.v1.pb.rs | 26 +- .../genproto/google.longrunning.pb.rs | 27 +- nativelink-scheduler/Cargo.toml | 24 +- nativelink-service/BUILD.bazel | 9 +- nativelink-service/Cargo.toml | 22 +- nativelink-service/src/execution_server.rs | 1 - nativelink-service/src/health_server.rs | 17 +- nativelink-service/tests/bep_server_test.rs | 9 +- .../tests/bytestream_server_test.rs | 139 ++++-- nativelink-store/BUILD.bazel | 4 +- nativelink-store/Cargo.toml | 42 +- nativelink-util/BUILD.bazel | 7 +- nativelink-util/Cargo.toml | 31 +- nativelink-util/src/channel_body.rs | 61 +++ nativelink-util/src/connection_manager.rs | 4 +- nativelink-util/src/lib.rs | 1 + nativelink-util/tests/channel_body_test.rs | 471 ++++++++++++++++++ nativelink-worker/BUILD.bazel | 4 +- nativelink-worker/Cargo.toml | 21 +- .../src/running_actions_manager.rs | 1 - nativelink-worker/tests/local_worker_test.rs | 55 +- .../tests/utils/local_worker_test_utils.rs | 24 +- src/bin/nativelink.rs | 42 +- 34 files changed, 1059 insertions(+), 465 deletions(-) create mode 100644 nativelink-util/src/channel_body.rs create mode 100644 nativelink-util/tests/channel_body_test.rs diff --git a/BUILD.bazel b/BUILD.bazel index 4338456b9..2179435b1 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -26,7 +26,8 @@ rust_binary( "@crates//:axum", "@crates//:clap", "@crates//:futures", - "@crates//:hyper", + "@crates//:hyper-1.4.1", + "@crates//:hyper-util", "@crates//:mimalloc", "@crates//:parking_lot", "@crates//:prometheus-client", diff --git a/Cargo.lock b/Cargo.lock index 864aed5e5..43cec4962 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -175,7 +175,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -186,9 +186,15 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "autocfg" version = "1.3.0" @@ -505,7 +511,7 @@ dependencies = [ "aws-smithy-types", "bytes", "fastrand", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "http-body 1.0.1", @@ -592,18 +598,19 @@ dependencies = [ [[package]] name = "axum" -version = "0.6.20" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" dependencies = [ "async-trait", "axum-core", - "bitflags 1.3.2", "bytes", "futures-util", - "http 0.2.12", - "http-body 0.4.6", - "hyper 0.14.30", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.4.1", + "hyper-util", "itoa", "matchit", "memchr", @@ -615,28 +622,33 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 1.0.1", "tokio", "tower", "tower-layer", "tower-service", + "tracing", ] [[package]] name = "axum-core" -version = "0.3.4" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" dependencies = [ "async-trait", "bytes", "futures-util", - "http 0.2.12", - "http-body 0.4.6", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", "mime", + "pin-project-lite", "rustversion", + "sync_wrapper 0.1.2", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -723,9 +735,9 @@ dependencies = [ [[package]] name = "blake3" -version = "1.5.2" +version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d08263faac5cde2a4d52b513dadb80846023aade56fcd8fc99ba73ba8050e92" +checksum = "e9ec96fe9a81b5e365f9db71fe00edc4fe4ca2cc7dcb7861f0603012a7caa210" dependencies = [ "arrayref", "arrayvec", @@ -764,7 +776,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", "syn_derive", ] @@ -825,9 +837,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.2" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47de7e88bbbd467951ae7f5a6f34f70d1b4d9cfce53d5fd70f74ebe118b3db56" +checksum = "324c74f2155653c90b04f25b2a47a8a631360cb908f92a772695f430c7e31052" [[package]] name = "cfg-if" @@ -872,7 +884,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -913,8 +925,7 @@ dependencies = [ [[package]] name = "console-api" version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a257c22cd7e487dd4a13d413beabc512c5052f0bc048db0da6a84c3d8a6142fd" +source = "git+https://github.com/tokio-rs/console?rev=5f6faa2#5f6faa22d944735c2b8c312cac03b35a4ab228ef" dependencies = [ "futures-core", "prost", @@ -926,8 +937,7 @@ dependencies = [ [[package]] name = "console-subscriber" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31c4cc54bae66f7d9188996404abdf7fdfa23034ef8e43478c8810828abad758" +source = "git+https://github.com/tokio-rs/console?rev=5f6faa2#5f6faa22d944735c2b8c312cac03b35a4ab228ef" dependencies = [ "console-api", "crossbeam-channel", @@ -935,6 +945,7 @@ dependencies = [ "futures-task", "hdrhistogram", "humantime", + "hyper-util", "prost", "prost-types", "serde", @@ -1333,7 +1344,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -1423,6 +1434,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "h2" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.1.0", + "indexmap 2.2.6", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -1566,7 +1596,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "httparse", @@ -1587,9 +1617,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" dependencies = [ "bytes", + "futures-channel", + "futures-util", + "h2 0.4.5", "http 1.1.0", "http-body 1.0.1", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", "tokio", + "want", ] [[package]] @@ -1611,14 +1650,15 @@ dependencies = [ [[package]] name = "hyper-timeout" -version = "0.4.1" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" dependencies = [ - "hyper 0.14.30", + "hyper 1.4.1", + "hyper-util", "pin-project-lite", "tokio", - "tokio-io-timeout", + "tower-service", ] [[package]] @@ -1628,12 +1668,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3ab92f4f49ee4fb4f997c784b7a2e0fa70050211e0b6a287f898c3c9785ca956" dependencies = [ "bytes", + "futures-channel", "futures-util", "http 1.1.0", "http-body 1.0.1", "hyper 1.4.1", "pin-project-lite", + "socket2", "tokio", + "tower", + "tower-service", + "tracing", ] [[package]] @@ -1675,9 +1720,9 @@ checksum = "f8478577c03552c21db0e2724ffb8986a5ce7af88107e6be5d2ee6e158c12800" [[package]] name = "itertools" -version = "0.12.1" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" dependencies = [ "either", ] @@ -1877,7 +1922,8 @@ dependencies = [ "axum", "clap", "futures", - "hyper 0.14.30", + "hyper 1.4.1", + "hyper-util", "mimalloc", "nativelink-config", "nativelink-error", @@ -1931,7 +1977,7 @@ version = "0.4.0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -1982,9 +2028,13 @@ dependencies = [ name = "nativelink-service" version = "0.4.0" dependencies = [ + "axum", "bytes", "futures", - "hyper 0.14.30", + "http-body 1.0.1", + "http-body-util", + "hyper 1.4.1", + "hyper-util", "maplit", "nativelink-config", "nativelink-error", @@ -2069,7 +2119,8 @@ dependencies = [ "console-subscriber", "futures", "hex", - "hyper 0.14.30", + "http-body-util", + "hyper 1.4.1", "hyper-util", "lru", "mock_instant", @@ -2105,7 +2156,8 @@ dependencies = [ "formatx", "futures", "hex", - "hyper 0.14.30", + "hyper 1.4.1", + "hyper-util", "nativelink-config", "nativelink-error", "nativelink-macro", @@ -2301,7 +2353,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -2342,7 +2394,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -2396,7 +2448,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e" dependencies = [ "proc-macro2", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -2461,14 +2513,14 @@ checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] name = "prost" -version = "0.12.6" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" +checksum = "e13db3d3fde688c61e2446b4d843bc27a7e8af269a69440c0308021dc92333cc" dependencies = [ "bytes", "prost-derive", @@ -2476,9 +2528,9 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.12.6" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" +checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1" dependencies = [ "bytes", "heck", @@ -2491,28 +2543,28 @@ dependencies = [ "prost", "prost-types", "regex", - "syn 2.0.68", + "syn 2.0.71", "tempfile", ] [[package]] name = "prost-derive" -version = "0.12.6" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" +checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca" dependencies = [ "anyhow", "itertools", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] name = "prost-types" -version = "0.12.6" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0" +checksum = "cee5168b05f49d4b0ca581206eb14a7b22fafd963efe729ac48eb03266e25cc2" dependencies = [ "prost", ] @@ -2636,7 +2688,7 @@ checksum = "8dfe1dc77e38e260bbd53e98d3aec64add3cdf5d773e38d344c63660196117f5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -2877,6 +2929,21 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls" +version = "0.23.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4828ea528154ae444e5a642dbb7d5623354030dc9822b83fd9bb79683c7399d0" +dependencies = [ + "log", + "once_cell", + "ring", + "rustls-pki-types", + "rustls-webpki 0.102.5", + "subtle", + "zeroize", +] + [[package]] name = "rustls-native-certs" version = "0.6.3" @@ -2962,9 +3029,9 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" [[package]] name = "scc" -version = "2.1.2" +version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af947d0ca10a2f3e00c7ec1b515b7c83e5cb3fa62d4c11a64301d9eec54440e9" +checksum = "9f3281c67bce3cc354216537112a1571d2c28b9e7d744a07ef79b43fad64386c" dependencies = [ "sdd", ] @@ -2996,9 +3063,9 @@ dependencies = [ [[package]] name = "sdd" -version = "0.2.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b84345e4c9bd703274a082fb80caaa99b7612be48dfaa1dd9266577ec412309d" +checksum = "63f9f783a4e576f9e02b89a7ec4a5aed48fb5265b99a111dfeb92c153733937e" [[package]] name = "seahash" @@ -3022,9 +3089,9 @@ dependencies = [ [[package]] name = "security-framework" -version = "2.11.0" +version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c627723fd09706bacdb5cf41499e95098555af3c3c29d014dc3c458ef6be11c0" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ "bitflags 2.6.0", "core-foundation", @@ -3035,9 +3102,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.11.0" +version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "317936bbbd05227752583946b9e66d7ce3b489f84e11a94a510b4437fef407d7" +checksum = "75da29fe9b9b08fe9d6b22b5b4bcbc75d8db3aa31e639aa56bb62e9d46bfceaf" dependencies = [ "core-foundation-sys", "libc", @@ -3066,7 +3133,7 @@ checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -3136,7 +3203,7 @@ checksum = "82fe9db325bcef1fbcde82e078a5cc4efdf787e96b3b9cf45b50b529f2083d67" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -3288,9 +3355,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.68" +version = "2.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "901fa70d88b9d6c98022e23b4136f9f3e54e4662c3bc1bd1d84a42a9a0f0c1e9" +checksum = "b146dcf730474b4bcd16c311627b31ede9ab149045db4d6088b3becaea046462" dependencies = [ "proc-macro2", "quote", @@ -3306,7 +3373,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -3315,6 +3382,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sync_wrapper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" + [[package]] name = "tap" version = "1.0.1" @@ -3350,7 +3423,7 @@ checksum = "d20468752b09f49e909e55a5d338caa8bedf615594e9d80bc4c565d30faf798c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -3428,16 +3501,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "tokio-io-timeout" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" -dependencies = [ - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-macros" version = "2.3.0" @@ -3446,7 +3509,7 @@ checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -3481,6 +3544,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" +dependencies = [ + "rustls 0.23.11", + "rustls-pki-types", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.15" @@ -3525,28 +3599,30 @@ dependencies = [ [[package]] name = "tonic" -version = "0.11.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" +checksum = "f738b6a169a29bca4e39656db89c44a08e09c5b700b896ee9e7459f0652e81dd" dependencies = [ "async-stream", "async-trait", "axum", - "base64 0.21.7", + "base64 0.22.1", "bytes", "flate2", - "h2", - "http 0.2.12", - "http-body 0.4.6", - "hyper 0.14.30", + "h2 0.4.5", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.4.1", "hyper-timeout", + "hyper-util", "percent-encoding", "pin-project", "prost", "rustls-pemfile 2.1.2", - "rustls-pki-types", + "socket2", "tokio", - "tokio-rustls 0.25.0", + "tokio-rustls 0.26.0", "tokio-stream", "tower", "tower-layer", @@ -3556,15 +3632,15 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.11.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be4ef6dd70a610078cb4e338a0f79d06bc759ff1b22d2120c2ff02ae264ba9c2" +checksum = "690943cc223adcdd67bb597a2e573ead1b88e999ba37528fe8e6356bf44b29b6" dependencies = [ "prettyplease", "proc-macro2", "prost-build", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -3619,7 +3695,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] @@ -4015,7 +4091,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.71", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 68d41ea9b..970ca134e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,20 +41,21 @@ nativelink-store = { path = "nativelink-store" } nativelink-util = { path = "nativelink-util" } nativelink-worker = { path = "nativelink-worker" } -async-lock = "3.3.0" -axum = "0.6.20" +async-lock = "3.4.0" +axum = "0.7.5" -clap = { version = "4.5.4", features = ["derive"] } +clap = { version = "4.5.9", features = ["derive"] } futures = "0.3.30" -hyper = { version = "0.14.28" } -mimalloc = "0.1.41" -parking_lot = "0.12.2" +hyper = { version = "1.4.1" } +hyper-util = "0.1.6" +mimalloc = "0.1.43" +parking_lot = "0.12.3" prometheus-client = "0.21.2" rustls-pemfile = "2.1.2" scopeguard = "1.2.0" serde_json5 = "0.1.0" -tokio = { version = "1.37.0", features = ["rt-multi-thread", "signal"] } +tokio = { version = "1.38.0", features = ["rt-multi-thread", "signal"] } tokio-rustls = "0.25.0" -tonic = { version = "0.11.0", features = ["gzip", "tls"] } +tonic = { version = "0.12.0", features = ["gzip", "tls"] } tower = "0.4.13" tracing = "0.1.40" diff --git a/nativelink-config/Cargo.toml b/nativelink-config/Cargo.toml index e33c6c3a6..5d031bd36 100644 --- a/nativelink-config/Cargo.toml +++ b/nativelink-config/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies] byte-unit = "5.1.4" humantime = "2.1.0" -serde = { version = "1.0.203", features = ["derive"] } +serde = { version = "1.0.204", features = ["derive"] } serde_json5 = "0.1.0" shellexpand = "3.1.0" diff --git a/nativelink-error/Cargo.toml b/nativelink-error/Cargo.toml index 7d2b96883..3a3dd578e 100644 --- a/nativelink-error/Cargo.toml +++ b/nativelink-error/Cargo.toml @@ -11,9 +11,9 @@ autobenches = false nativelink-proto = { path = "../nativelink-proto" } hex = "0.4.3" -prost = "0.12.4" -prost-types = "0.12.4" -redis = "0.25.2" -serde = { version = "1.0.201", features = ["derive"] } -tokio = { version = "1.37.0" } -tonic = { version = "0.11.0", features = ["gzip"] } +prost = "0.13.1" +prost-types = "0.13.1" +redis = "0.25.4" +serde = { version = "1.0.204", features = ["derive"] } +tokio = { version = "1.38.0" } +tonic = { version = "0.12.0", features = ["gzip"] } diff --git a/nativelink-error/src/lib.rs b/nativelink-error/src/lib.rs index 83a8201c3..3622f2b73 100644 --- a/nativelink-error/src/lib.rs +++ b/nativelink-error/src/lib.rs @@ -146,6 +146,12 @@ impl From for Error { } } +impl From for Error { + fn from(err: prost::UnknownEnumValue) -> Self { + make_err!(Code::Internal, "{}", err.to_string()) + } +} + impl From for Error { fn from(err: std::num::TryFromIntError) -> Self { make_err!(Code::InvalidArgument, "{}", err.to_string()) diff --git a/nativelink-macro/Cargo.toml b/nativelink-macro/Cargo.toml index 33012d6ef..909829c91 100644 --- a/nativelink-macro/Cargo.toml +++ b/nativelink-macro/Cargo.toml @@ -9,6 +9,6 @@ proc-macro = true [dependencies] # TODO(allada) We currently need to pin these to specific version. # Some down-stream can't be upgraded just yet. -proc-macro2 = "=1.0.86" -quote = "=1.0.36" -syn = "=2.0.68" +proc-macro2 = "1.0.86" +quote = "1.0.36" +syn = "2.0.71" diff --git a/nativelink-proto/Cargo.toml b/nativelink-proto/Cargo.toml index a7d22c0f4..d5a3c90d4 100644 --- a/nativelink-proto/Cargo.toml +++ b/nativelink-proto/Cargo.toml @@ -19,10 +19,10 @@ path = "genproto/lib.rs" doctest = false [dependencies] -prost = "0.12.4" -prost-types = "0.12.4" -tonic = { version = "0.11.0", features = ["gzip"] } +prost = "0.13.1" +prost-types = "0.13.1" +tonic = { version = "0.12.0", features = ["gzip"] } [dev-dependencies] -prost-build = "0.12.4" -tonic-build = "0.11.0" +prost-build = "0.13.1" +tonic-build = "0.12.0" diff --git a/nativelink-proto/genproto/build.bazel.remote.execution.v2.pb.rs b/nativelink-proto/genproto/build.bazel.remote.execution.v2.pb.rs index 6c1e3a8a6..b1eda3e6d 100644 --- a/nativelink-proto/genproto/build.bazel.remote.execution.v2.pb.rs +++ b/nativelink-proto/genproto/build.bazel.remote.execution.v2.pb.rs @@ -913,7 +913,7 @@ pub struct OutputSymlink { } /// An `ExecutionPolicy` can be used to control the scheduling of the action. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ExecutionPolicy { /// The priority (relative importance) of this action. Generally, a lower value /// means that the action should be run sooner than actions having a greater @@ -930,7 +930,7 @@ pub struct ExecutionPolicy { /// A `ResultsCachePolicy` is used for fine-grained control over how action /// outputs are stored in the CAS and Action Cache. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ResultsCachePolicy { /// The priority (relative importance) of this content in the overall cache. /// Generally, a lower value means a longer retention time or other advantage, @@ -1066,7 +1066,7 @@ pub struct ExecuteResponse { /// field][google.longrunning.Operation.done] of the /// [Operation][google.longrunning.Operation] and terminate the stream. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ExecutionStage {} /// Nested message and enum types in `ExecutionStage`. pub mod execution_stage { @@ -1500,7 +1500,7 @@ pub struct ServerCapabilities { /// The digest function used for converting values into keys for CAS and Action /// Cache. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct DigestFunction {} /// Nested message and enum types in `DigestFunction`. pub mod digest_function { @@ -1638,7 +1638,7 @@ pub mod digest_function { } /// Describes the server/instance capabilities for updating the action cache. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ActionCacheUpdateCapabilities { #[prost(bool, tag = "1")] pub update_enabled: bool, @@ -1657,7 +1657,7 @@ pub struct PriorityCapabilities { pub mod priority_capabilities { /// Supported range of priorities, including boundaries. #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Message)] + #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct PriorityRange { /// The minimum numeric value for this priority range, which represents the /// most urgent task or longest retained item. @@ -1671,7 +1671,7 @@ pub mod priority_capabilities { } /// Describes how the server treats absolute symlink targets. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct SymlinkAbsolutePathStrategy {} /// Nested message and enum types in `SymlinkAbsolutePathStrategy`. pub mod symlink_absolute_path_strategy { @@ -1724,7 +1724,7 @@ pub mod symlink_absolute_path_strategy { } /// Compression formats which may be supported. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct Compressor {} /// Nested message and enum types in `Compressor`. pub mod compressor { @@ -3046,19 +3046,17 @@ pub mod execution_server { /// respect the information provided. #[derive(Debug)] pub struct ExecutionServer { - inner: _Inner, + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); impl ExecutionServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -3121,7 +3119,6 @@ pub mod execution_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/build.bazel.remote.execution.v2.Execution/Execute" => { #[allow(non_camel_case_types)] @@ -3153,7 +3150,6 @@ pub mod execution_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ExecuteSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -3200,7 +3196,6 @@ pub mod execution_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = WaitExecutionSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -3222,8 +3217,11 @@ pub mod execution_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) .body(empty_body()) .unwrap(), ) @@ -3244,16 +3242,6 @@ pub mod execution_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } impl tonic::server::NamedService for ExecutionServer { const NAME: &'static str = "build.bazel.remote.execution.v2.Execution"; } @@ -3323,19 +3311,17 @@ pub mod action_cache_server { /// respect the information provided. #[derive(Debug)] pub struct ActionCacheServer { - inner: _Inner, + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); impl ActionCacheServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -3398,7 +3384,6 @@ pub mod action_cache_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/build.bazel.remote.execution.v2.ActionCache/GetActionResult" => { #[allow(non_camel_case_types)] @@ -3429,7 +3414,6 @@ pub mod action_cache_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = GetActionResultSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -3476,7 +3460,6 @@ pub mod action_cache_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = UpdateActionResultSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -3498,8 +3481,11 @@ pub mod action_cache_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) .body(empty_body()) .unwrap(), ) @@ -3520,16 +3506,6 @@ pub mod action_cache_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } impl tonic::server::NamedService for ActionCacheServer { const NAME: &'static str = "build.bazel.remote.execution.v2.ActionCache"; } @@ -3794,19 +3770,17 @@ pub mod content_addressable_storage_server { /// respect the information provided. #[derive(Debug)] pub struct ContentAddressableStorageServer { - inner: _Inner, + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); impl ContentAddressableStorageServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -3870,7 +3844,6 @@ pub mod content_addressable_storage_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/build.bazel.remote.execution.v2.ContentAddressableStorage/FindMissingBlobs" => { #[allow(non_camel_case_types)] @@ -3905,7 +3878,6 @@ pub mod content_addressable_storage_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = FindMissingBlobsSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -3955,7 +3927,6 @@ pub mod content_addressable_storage_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = BatchUpdateBlobsSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -4005,7 +3976,6 @@ pub mod content_addressable_storage_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = BatchReadBlobsSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -4053,7 +4023,6 @@ pub mod content_addressable_storage_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = GetTreeSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -4075,8 +4044,11 @@ pub mod content_addressable_storage_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) .body(empty_body()) .unwrap(), ) @@ -4097,16 +4069,6 @@ pub mod content_addressable_storage_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } impl tonic::server::NamedService for ContentAddressableStorageServer { const NAME: &'static str = "build.bazel.remote.execution.v2.ContentAddressableStorage"; @@ -4145,19 +4107,17 @@ pub mod capabilities_server { /// returned will pertain to that instance. #[derive(Debug)] pub struct CapabilitiesServer { - inner: _Inner, + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); impl CapabilitiesServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -4220,7 +4180,6 @@ pub mod capabilities_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/build.bazel.remote.execution.v2.Capabilities/GetCapabilities" => { #[allow(non_camel_case_types)] @@ -4251,7 +4210,6 @@ pub mod capabilities_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = GetCapabilitiesSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -4273,8 +4231,11 @@ pub mod capabilities_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) .body(empty_body()) .unwrap(), ) @@ -4295,16 +4256,6 @@ pub mod capabilities_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } impl tonic::server::NamedService for CapabilitiesServer { const NAME: &'static str = "build.bazel.remote.execution.v2.Capabilities"; } diff --git a/nativelink-proto/genproto/com.github.trace_machina.nativelink.remote_execution.pb.rs b/nativelink-proto/genproto/com.github.trace_machina.nativelink.remote_execution.pb.rs index d4e9eae70..8c9cb5a53 100644 --- a/nativelink-proto/genproto/com.github.trace_machina.nativelink.remote_execution.pb.rs +++ b/nativelink-proto/genproto/com.github.trace_machina.nativelink.remote_execution.pb.rs @@ -481,19 +481,17 @@ pub mod worker_api_server { /// / to determine which jobs the worker can process. #[derive(Debug)] pub struct WorkerApiServer { - inner: _Inner, + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); impl WorkerApiServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -556,7 +554,6 @@ pub mod worker_api_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/com.github.trace_machina.nativelink.remote_execution.WorkerApi/ConnectWorker" => { #[allow(non_camel_case_types)] @@ -588,7 +585,6 @@ pub mod worker_api_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ConnectWorkerSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -634,7 +630,6 @@ pub mod worker_api_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = KeepAliveSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -680,7 +675,6 @@ pub mod worker_api_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = GoingAwaySvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -724,7 +718,6 @@ pub mod worker_api_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ExecutionResponseSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -746,8 +739,11 @@ pub mod worker_api_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) .body(empty_body()) .unwrap(), ) @@ -768,16 +764,6 @@ pub mod worker_api_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } impl tonic::server::NamedService for WorkerApiServer { const NAME: &'static str = "com.github.trace_machina.nativelink.remote_execution.WorkerApi"; } diff --git a/nativelink-proto/genproto/google.bytestream.pb.rs b/nativelink-proto/genproto/google.bytestream.pb.rs index caf3cee02..4cd7889e6 100644 --- a/nativelink-proto/genproto/google.bytestream.pb.rs +++ b/nativelink-proto/genproto/google.bytestream.pb.rs @@ -85,7 +85,7 @@ pub struct WriteRequest { } /// Response object for ByteStream.Write. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct WriteResponse { /// The number of bytes that have been processed for the given resource. #[prost(int64, tag = "1")] @@ -101,7 +101,7 @@ pub struct QueryWriteStatusRequest { } /// Response object for ByteStream.QueryWriteStatus. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct QueryWriteStatusResponse { /// The number of bytes that have been processed for the given resource. #[prost(int64, tag = "1")] @@ -427,19 +427,17 @@ pub mod byte_stream_server { /// The errors returned by the service are in the Google canonical error space. #[derive(Debug)] pub struct ByteStreamServer { - inner: _Inner, + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); impl ByteStreamServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -502,7 +500,6 @@ pub mod byte_stream_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/google.bytestream.ByteStream/Read" => { #[allow(non_camel_case_types)] @@ -534,7 +531,6 @@ pub mod byte_stream_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ReadSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -582,7 +578,6 @@ pub mod byte_stream_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = WriteSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -628,7 +623,6 @@ pub mod byte_stream_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = QueryWriteStatusSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -650,8 +644,11 @@ pub mod byte_stream_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) .body(empty_body()) .unwrap(), ) @@ -672,16 +669,6 @@ pub mod byte_stream_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } impl tonic::server::NamedService for ByteStreamServer { const NAME: &'static str = "google.bytestream.ByteStream"; } diff --git a/nativelink-proto/genproto/google.devtools.build.v1.pb.rs b/nativelink-proto/genproto/google.devtools.build.v1.pb.rs index 2ea78bb11..b86683a6b 100644 --- a/nativelink-proto/genproto/google.devtools.build.v1.pb.rs +++ b/nativelink-proto/genproto/google.devtools.build.v1.pb.rs @@ -195,7 +195,7 @@ pub mod build_event { /// Notification of the end of a build event stream published by a build /// component other than CONTROLLER (See StreamId.BuildComponents). #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Message)] + #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct BuildComponentStreamFinished { /// How the event stream finished. #[prost(enumeration = "build_component_stream_finished::FinishType", tag = "1")] @@ -765,19 +765,17 @@ pub mod publish_build_event_server { /// more than one build tool stream for an invocation attempt of a build. #[derive(Debug)] pub struct PublishBuildEventServer { - inner: _Inner, + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); impl PublishBuildEventServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -840,7 +838,6 @@ pub mod publish_build_event_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/google.devtools.build.v1.PublishBuildEvent/PublishLifecycleEvent" => { #[allow(non_camel_case_types)] @@ -875,7 +872,6 @@ pub mod publish_build_event_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = PublishLifecycleEventSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -931,7 +927,6 @@ pub mod publish_build_event_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = PublishBuildToolEventStreamSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -953,8 +948,11 @@ pub mod publish_build_event_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) .body(empty_body()) .unwrap(), ) @@ -975,16 +973,6 @@ pub mod publish_build_event_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } impl tonic::server::NamedService for PublishBuildEventServer { const NAME: &'static str = "google.devtools.build.v1.PublishBuildEvent"; diff --git a/nativelink-proto/genproto/google.longrunning.pb.rs b/nativelink-proto/genproto/google.longrunning.pb.rs index 39eff2c83..1b177d538 100644 --- a/nativelink-proto/genproto/google.longrunning.pb.rs +++ b/nativelink-proto/genproto/google.longrunning.pb.rs @@ -495,19 +495,17 @@ pub mod operations_server { /// so developers can have a consistent client experience. #[derive(Debug)] pub struct OperationsServer { - inner: _Inner, + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); impl OperationsServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -570,7 +568,6 @@ pub mod operations_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/google.longrunning.Operations/ListOperations" => { #[allow(non_camel_case_types)] @@ -601,7 +598,6 @@ pub mod operations_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ListOperationsSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -647,7 +643,6 @@ pub mod operations_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = GetOperationSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -693,7 +688,6 @@ pub mod operations_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = DeleteOperationSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -739,7 +733,6 @@ pub mod operations_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = CancelOperationSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -785,7 +778,6 @@ pub mod operations_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = WaitOperationSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -807,8 +799,11 @@ pub mod operations_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) .body(empty_body()) .unwrap(), ) @@ -829,16 +824,6 @@ pub mod operations_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } impl tonic::server::NamedService for OperationsServer { const NAME: &'static str = "google.longrunning.Operations"; } diff --git a/nativelink-scheduler/Cargo.toml b/nativelink-scheduler/Cargo.toml index 109eb8eb5..4e7774cc6 100644 --- a/nativelink-scheduler/Cargo.toml +++ b/nativelink-scheduler/Cargo.toml @@ -13,26 +13,26 @@ nativelink-proto = { path = "../nativelink-proto" } # files somewhere else. nativelink-store = { path = "../nativelink-store" } -async-lock = "3.3.0" -async-trait = "0.1.80" -blake3 = "1.5.1" -prost = "0.12.4" -uuid = { version = "1.8.0", features = ["v4"] } +async-lock = "3.4.0" +async-trait = "0.1.81" +blake3 = "1.5.2" +prost = "0.13.1" +uuid = { version = "1.10.0", features = ["v4"] } futures = "0.3.30" hashbrown = "0.14" lru = "0.12.3" -parking_lot = "0.12.2" +parking_lot = "0.12.3" rand = "0.8.5" scopeguard = "1.2.0" -tokio = { version = "1.37.0", features = ["sync", "rt", "parking_lot"] } +tokio = { version = "1.38.0", features = ["sync", "rt", "parking_lot"] } tokio-stream = { version = "0.1.15", features = ["sync"] } -tonic = { version = "0.11.0", features = ["gzip", "tls"] } +tonic = { version = "0.12.0", features = ["gzip", "tls"] } tracing = "0.1.40" -bitflags = "2.5.0" -redis = { version = "0.25.2", features = ["aio", "tokio", "json"] } -serde = "1.0.203" +bitflags = "2.6.0" +redis = { version = "0.25.4", features = ["aio", "tokio", "json"] } +serde = "1.0.204" redis-macros = "0.3.0" -serde_json = "1.0.117" +serde_json = "1.0.120" [dev-dependencies] nativelink-macro = { path = "../nativelink-macro" } diff --git a/nativelink-service/BUILD.bazel b/nativelink-service/BUILD.bazel index 57e53aa6f..65dfc9ce0 100644 --- a/nativelink-service/BUILD.bazel +++ b/nativelink-service/BUILD.bazel @@ -27,9 +27,12 @@ rust_library( "//nativelink-scheduler", "//nativelink-store", "//nativelink-util", + "@crates//:axum", "@crates//:bytes", "@crates//:futures", - "@crates//:hyper", + "@crates//:http-body", + "@crates//:http-body-util", + "@crates//:hyper-1.4.1", "@crates//:parking_lot", "@crates//:prost", "@crates//:rand", @@ -66,7 +69,9 @@ rust_test_suite( "//nativelink-util", "@crates//:bytes", "@crates//:futures", - "@crates//:hyper", + "@crates//:http-body-util", + "@crates//:hyper-1.4.1", + "@crates//:hyper-util", "@crates//:maplit", "@crates//:pretty_assertions", "@crates//:prometheus-client", diff --git a/nativelink-service/Cargo.toml b/nativelink-service/Cargo.toml index 18d889eeb..ef94d5b8a 100644 --- a/nativelink-service/Cargo.toml +++ b/nativelink-service/Cargo.toml @@ -11,25 +11,29 @@ nativelink-util = { path = "../nativelink-util" } nativelink-store = { path = "../nativelink-store" } nativelink-scheduler = { path = "../nativelink-scheduler" } -bytes = "1.6.0" +axum = "0.7.5" +bytes = "1.6.1" futures = "0.3.30" -hyper = { version = "0.14.28" } +http-body = "1.0.1" +http-body-util = "0.1.2" +hyper = { version = "1.4.1" } serde_json5 = "0.1.0" -parking_lot = "0.12.2" -prost = "0.12.4" +parking_lot = "0.12.3" +prost = "0.13.1" rand = "0.8.5" -tokio = { version = "1.37.0", features = ["sync", "rt"] } +tokio = { version = "1.38.0", features = ["sync", "rt"] } tokio-stream = { version = "0.1.15", features = ["sync"] } -tonic = { version = "0.11.0", features = ["gzip", "tls"] } +tonic = { version = "0.12.0", features = ["gzip", "tls"] } tower = "0.4.13" tracing = "0.1.40" -uuid = { version = "1.8.0", features = ["v4"] } +uuid = { version = "1.10.0", features = ["v4"] } [dev-dependencies] nativelink-macro = { path = "../nativelink-macro" } -hyper = "0.14.28" +hyper = "1.4.1" +hyper-util = "0.1.6" maplit = "1.0.2" pretty_assertions = "1.4.0" prometheus-client = "0.21.2" -prost-types = "0.12.4" +prost-types = "0.13.1" diff --git a/nativelink-service/src/execution_server.rs b/nativelink-service/src/execution_server.rs index a42f0ef03..efe4e50ee 100644 --- a/nativelink-service/src/execution_server.rs +++ b/nativelink-service/src/execution_server.rs @@ -74,7 +74,6 @@ impl InstanceInfo { )?; let timeout = action .timeout - .clone() .map(|v| Duration::new(v.seconds as u64, v.nanos as u32)) .unwrap_or(Duration::MAX); diff --git a/nativelink-service/src/health_server.rs b/nativelink-service/src/health_server.rs index c84e0ad19..b348286ba 100644 --- a/nativelink-service/src/health_server.rs +++ b/nativelink-service/src/health_server.rs @@ -12,14 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::convert::Infallible; use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use axum::body::Body; +use bytes::Bytes; use futures::StreamExt; +use http_body_util::Full; use hyper::header::{HeaderValue, CONTENT_TYPE}; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{Request, Response, StatusCode}; use nativelink_util::health_utils::{ HealthRegistry, HealthStatus, HealthStatusDescription, HealthStatusReporter, }; @@ -41,9 +45,9 @@ impl HealthServer { } } -impl Service> for HealthServer { - type Response = Response; - type Error = std::convert::Infallible; +impl Service> for HealthServer { + type Response = Response>; + type Error = Infallible; type Future = Pin> + Send>>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { @@ -57,6 +61,7 @@ impl Service> for HealthServer { async move { let health_status_descriptions: Vec = health_registry.health_status_report().collect().await; + match serde_json5::to_string(&health_status_descriptions) { Ok(body) => { let contains_failed_report = @@ -72,14 +77,14 @@ impl Service> for HealthServer { Ok(Response::builder() .status(status_code) .header(CONTENT_TYPE, HeaderValue::from_static(JSON_CONTENT_TYPE)) - .body(Body::from(body)) + .body(Full::new(Bytes::from(body))) .unwrap()) } Err(e) => Ok(Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .header(CONTENT_TYPE, HeaderValue::from_static(JSON_CONTENT_TYPE)) - .body(Body::from(format!("Internal Failure: {e:?}"))) + .body(Full::new(Bytes::from(format!("Internal Failure: {e:?}")))) .unwrap()), } }, diff --git a/nativelink-service/tests/bep_server_test.rs b/nativelink-service/tests/bep_server_test.rs index 11c582ac7..c46f0fd9f 100644 --- a/nativelink-service/tests/bep_server_test.rs +++ b/nativelink-service/tests/bep_server_test.rs @@ -16,7 +16,7 @@ use std::borrow::Cow; use std::sync::Arc; use futures::StreamExt; -use hyper::Body; +use hyper::body::Frame; use nativelink_config::cas_server::BepConfig; use nativelink_error::{Error, ResultExt}; use nativelink_macro::nativelink_test; @@ -36,6 +36,7 @@ use nativelink_service::bep_server::BepServer; use nativelink_store::default_store_factory::store_factory; use nativelink_store::store_manager::StoreManager; use nativelink_util::buf_channel::make_buf_channel_pair; +use nativelink_util::channel_body::ChannelBody; use nativelink_util::common::encode_stream_proto; use nativelink_util::store_trait::{Store, StoreKey, StoreLike}; use pretty_assertions::assert_eq; @@ -155,9 +156,9 @@ async fn publish_build_tool_event_stream_test() -> Result<(), Box::default(); let stream = Streaming::new_request(codec.decoder(), body, None, None); let stream = bep_server @@ -302,7 +303,7 @@ async fn publish_build_tool_event_stream_test() -> Result<(), Box) -> (Sender, Streaming) { - let (tx, body) = Body::channel(); +fn make_stream( + encoding: Option, +) -> ( + tokio::sync::mpsc::Sender>, + Streaming, +) { + let (tx, body) = ChannelBody::new(); let mut codec = ProstCodec::::default(); let stream = Streaming::new_request(codec.decoder(), body, encoding, None); (tx, stream) } +type JoinHandle = JoinHandleDropGuard, tonic::Status>>; + fn make_stream_and_writer_spawn( bs_server: Arc, encoding: Option, -) -> ( - Sender, - JoinHandleDropGuard, tonic::Status>>, -) { +) -> (tokio::sync::mpsc::Sender>, JoinHandle) { let (tx, stream) = make_stream(encoding); let join_handle = spawn!("bs_server_write", async move { bs_server.write(Request::new(stream)).await - },); + }); (tx, join_handle) } @@ -133,12 +141,25 @@ async fn server_and_client_stub( } let server_spawn = spawn!("grpc_server", async move { - let http = Http::new().with_executor(Executor); - let bs_service = bs_server.into_service(); + let http = http2::Builder::new(Executor); + let grpc_service = tonic::service::Routes::new(bs_server.into_service()); + + let adapted_service = tower::ServiceBuilder::new() + .map_request(|req: hyper::Request| { + let (parts, body) = req.into_parts(); + let body = body + .map_err(|e| tonic::Status::internal(e.to_string())) + .boxed_unsync(); + hyper::Request::from_parts(parts, body) + }) + .service(grpc_service); + + let hyper_service = TowerToHyperService::new(adapted_service); while let Some(stream) = rx.next().await { let stream = stream.expect("Failed to get stream"); - http.serve_connection(stream, bs_service.clone()) + let wrapped_stream = TokioIo::new(stream); + http.serve_connection(wrapped_stream, hyper_service.clone()) .await .expect("Connection failed"); } @@ -155,7 +176,7 @@ async fn server_and_client_stub( const MAX_BUFFER_SIZE: usize = 4096; let (client, server) = tokio::io::duplex(MAX_BUFFER_SIZE); tx.send(Ok(server)).unwrap(); - Result::<_, Error>::Ok(client) + Result::<_, Error>::Ok(TokioIo::new(client)) } })) .await @@ -175,7 +196,7 @@ pub async fn chunked_stream_receives_all_data() -> Result<(), Box Result<(), Box Result<(), Box> { ); let store = store_manager.get_store("main_cas").unwrap(); - let (mut tx, join_handle) = + let (tx, join_handle) = make_stream_and_writer_spawn(bs_server.clone(), Some(CompressionEncoding::Gzip)); const WRITE_DATA: &str = "12456789abcdefghijk"; @@ -280,7 +304,8 @@ pub async fn resume_write_success() -> Result<(), Box> { // Write first chunk of data. write_request.write_offset = 0; write_request.data = WRITE_DATA[..BYTE_SPLIT_OFFSET].into(); - tx.send_data(encode_stream_proto(&write_request)?).await?; + tx.send(Frame::data(encode_stream_proto(&write_request)?)) + .await?; } { // Now disconnect our stream. @@ -289,14 +314,15 @@ pub async fn resume_write_success() -> Result<(), Box> { assert_eq!(result.is_err(), true, "Expected error to be returned"); } // Now reconnect. - let (mut tx, join_handle) = + let (tx, join_handle) = make_stream_and_writer_spawn(bs_server, Some(CompressionEncoding::Gzip)); { // Write the remainder of our data. write_request.write_offset = BYTE_SPLIT_OFFSET as i64; write_request.finish_write = true; write_request.data = WRITE_DATA[BYTE_SPLIT_OFFSET..].into(); - tx.send_data(encode_stream_proto(&write_request)?).await?; + tx.send(Frame::data(encode_stream_proto(&write_request)?)) + .await?; } { // Now disconnect our stream. @@ -326,7 +352,7 @@ pub async fn restart_write_success() -> Result<(), Box> { ); let store = store_manager.get_store("main_cas").unwrap(); - let (mut tx, join_handle) = + let (tx, join_handle) = make_stream_and_writer_spawn(bs_server.clone(), Some(CompressionEncoding::Gzip)); const WRITE_DATA: &str = "12456789abcdefghijk"; @@ -351,7 +377,8 @@ pub async fn restart_write_success() -> Result<(), Box> { // Write first chunk of data. write_request.write_offset = 0; write_request.data = WRITE_DATA[..BYTE_SPLIT_OFFSET].into(); - tx.send_data(encode_stream_proto(&write_request)?).await?; + tx.send(Frame::data(encode_stream_proto(&write_request)?)) + .await?; } { // Now disconnect our stream. @@ -360,20 +387,22 @@ pub async fn restart_write_success() -> Result<(), Box> { assert_eq!(result.is_err(), true, "Expected error to be returned"); } // Now reconnect. - let (mut tx, join_handle) = + let (tx, join_handle) = make_stream_and_writer_spawn(bs_server, Some(CompressionEncoding::Gzip)); { // Write first chunk of data again. write_request.write_offset = 0; write_request.data = WRITE_DATA[..BYTE_SPLIT_OFFSET].into(); - tx.send_data(encode_stream_proto(&write_request)?).await?; + tx.send(Frame::data(encode_stream_proto(&write_request)?)) + .await?; } { // Write the remainder of our data. write_request.write_offset = BYTE_SPLIT_OFFSET as i64; write_request.finish_write = true; write_request.data = WRITE_DATA[BYTE_SPLIT_OFFSET..].into(); - tx.send_data(encode_stream_proto(&write_request)?).await?; + tx.send(Frame::data(encode_stream_proto(&write_request)?)) + .await?; } { // Now disconnect our stream. @@ -401,7 +430,7 @@ pub async fn restart_mid_stream_write_success() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box> make_bytestream_server(store_manager.as_ref(), None).expect("Failed to make server"), ); - let (mut tx, join_handle) = + let (tx, join_handle) = make_stream_and_writer_spawn(bs_server, Some(CompressionEncoding::Gzip)); const WRITE_DATA: &str = "12456789abcdefghijk"; @@ -569,13 +603,15 @@ pub async fn out_of_order_data_fails() -> Result<(), Box> // Write first chunk of data. write_request.write_offset = 0; write_request.data = WRITE_DATA[..BYTE_SPLIT_OFFSET].into(); - tx.send_data(encode_stream_proto(&write_request)?).await?; + tx.send(Frame::data(encode_stream_proto(&write_request)?)) + .await?; } { // Write data it already has. write_request.write_offset = (BYTE_SPLIT_OFFSET - 1) as i64; write_request.data = WRITE_DATA[(BYTE_SPLIT_OFFSET - 1)..].into(); - tx.send_data(encode_stream_proto(&write_request)?).await?; + tx.send(Frame::data(encode_stream_proto(&write_request)?)) + .await?; } assert!( join_handle.await.expect("Failed to join").is_err(), @@ -586,7 +622,7 @@ pub async fn out_of_order_data_fails() -> Result<(), Box> write_request.write_offset = (BYTE_SPLIT_OFFSET - 1) as i64; write_request.data = WRITE_DATA[(BYTE_SPLIT_OFFSET - 1)..].into(); assert!( - tx.send_data(encode_stream_proto(&write_request)?) + tx.send(Frame::data(encode_stream_proto(&write_request)?)) .await .is_err(), "Expected error to be returned" @@ -603,7 +639,7 @@ pub async fn upload_zero_byte_chunk() -> Result<(), Box> ); let store = store_manager.get_store("main_cas").unwrap(); - let (mut tx, join_handle) = + let (tx, join_handle) = make_stream_and_writer_spawn(bs_server, Some(CompressionEncoding::Gzip)); let resource_name = make_resource_name(0); @@ -616,7 +652,8 @@ pub async fn upload_zero_byte_chunk() -> Result<(), Box> { // Write our zero byte data. - tx.send_data(encode_stream_proto(&write_request)?).await?; + tx.send(Frame::data(encode_stream_proto(&write_request)?)) + .await?; // Wait for stream to finish. join_handle .await @@ -640,7 +677,7 @@ pub async fn disallow_negative_write_offset() -> Result<(), Box Result<(), Box Result<(), Box> { make_bytestream_server(store_manager.as_ref(), None).expect("Failed to make server"), ); - let (mut tx, join_handle) = + let (tx, join_handle) = make_stream_and_writer_spawn(bs_server, Some(CompressionEncoding::Gzip)); let resource_name = make_resource_name(100); @@ -680,7 +718,8 @@ pub async fn out_of_sequence_write() -> Result<(), Box> { { // Write our zero byte data. - tx.send_data(encode_stream_proto(&write_request)?).await?; + tx.send(Frame::data(encode_stream_proto(&write_request)?)) + .await?; // Expect the write command to fail. assert!(join_handle.await.expect("Failed to join").is_err()); } @@ -846,7 +885,7 @@ pub async fn test_query_write_status_smoke_test() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box>, + } +} + +// Note: At the moment this is only used in a few tests after tonic removed its +// concrete Body type. See `nativelink_util::buf_channel` for channel +// implementations used in non-test code and see the tests for example usage. +impl ChannelBody { + #[must_use] + pub fn new() -> (tokio::sync::mpsc::Sender>, Self) { + Self::with_buffer_size(32) + } + + #[must_use] + pub fn with_buffer_size(buffer_size: usize) -> (tokio::sync::mpsc::Sender>, Self) { + let (tx, rx) = tokio::sync::mpsc::channel(buffer_size); + (tx, Self { rx }) + } +} + +impl Body for ChannelBody { + type Data = Bytes; + type Error = tonic::Status; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + let frame = ready!(self.project().rx.poll_recv(cx)); + Poll::Ready(frame.map(Ok)) + } + + fn is_end_stream(&self) -> bool { + self.rx.is_closed() + } +} diff --git a/nativelink-util/src/connection_manager.rs b/nativelink-util/src/connection_manager.rs index 1d80f63c3..dacc0f1e8 100644 --- a/nativelink-util/src/connection_manager.rs +++ b/nativelink-util/src/connection_manager.rs @@ -429,7 +429,7 @@ pub struct ResponseFuture { /// This is mostly copied from tonic::transport::channel except it wraps it /// to allow messaging about connection success and failure. impl tonic::codegen::Service> for Connection { - type Response = tonic::codegen::http::Response; + type Response = tonic::codegen::http::Response; type Error = tonic::transport::Error; type Future = ResponseFuture; @@ -479,7 +479,7 @@ impl tonic::codegen::Service /// to allow messaging about connection failure. impl Future for ResponseFuture { type Output = - Result, tonic::transport::Error>; + Result, tonic::transport::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let result = Pin::new(&mut self.inner).poll(cx); diff --git a/nativelink-util/src/lib.rs b/nativelink-util/src/lib.rs index 2811a4d68..50638e420 100644 --- a/nativelink-util/src/lib.rs +++ b/nativelink-util/src/lib.rs @@ -14,6 +14,7 @@ pub mod action_messages; pub mod buf_channel; +pub mod channel_body; pub mod common; pub mod connection_manager; pub mod default_store_key_subscribe; diff --git a/nativelink-util/tests/channel_body_test.rs b/nativelink-util/tests/channel_body_test.rs new file mode 100644 index 000000000..3322842b0 --- /dev/null +++ b/nativelink-util/tests/channel_body_test.rs @@ -0,0 +1,471 @@ +// Copyright 2023 The NativeLink Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use bytes::Bytes; +use futures::future; +use http_body_util::BodyExt; +use hyper::body::{Body, Frame}; +use hyper::http::HeaderMap; +use nativelink_macro::nativelink_test; +use nativelink_util::channel_body::ChannelBody; +use nativelink_util::spawn; +use tokio::sync::mpsc::error::TrySendError; +use tokio::time::timeout; +use tonic::Status; + +async fn setup_channel_body( + frames: Vec, +) -> (tokio::sync::mpsc::Sender>, ChannelBody) { + let (tx, body) = ChannelBody::new(); + for frame in frames { + tx.send(Frame::data(Bytes::from(frame))).await.unwrap(); + } + (tx, body) +} + +async fn check_frames(body: &mut ChannelBody, expected: Vec) { + for exp in expected { + let frame = body.frame().await.unwrap().unwrap(); + assert_eq!(frame.into_data().unwrap(), Bytes::from(exp)); + } + assert!(body.is_end_stream()); +} + +macro_rules! generate_channel_body_tests { + ($($name:ident: $input:expr,)*) => { + $( + #[nativelink_test] + async fn $name() { + let (tx, mut body) = setup_channel_body($input.to_vec()).await; + drop(tx); + check_frames(&mut body, $input.to_vec()).await; + } + )* + } +} + +generate_channel_body_tests! { + test_channel_body_empty: [String::new()], + test_channel_body_basic: ["Hello".to_string()], + test_channel_body_multiple_frames: ["Frame1".to_string(), "Frame2".to_string(), "Frame3".to_string()], + test_channel_body_empty_frames: [String::new(), "NonEmpty".to_string(), String::new()], + test_channel_body_large_data: [vec![0u8; 10 * 1024 * 1024].into_iter().map(|b| b.to_string()).collect::()], +} + +#[nativelink_test] +async fn test_channel_body_closure() { + let (tx, mut body) = ChannelBody::new(); + tx.send(Frame::data(Bytes::from("Hello"))).await.unwrap(); + drop(tx); + let frame = body.frame().await.unwrap().unwrap(); + assert_eq!(frame.into_data().unwrap(), Bytes::from("Hello")); + assert!(body.frame().await.is_none()); +} + +#[nativelink_test] +async fn test_channel_body_concurrent() { + let (tx, body) = ChannelBody::new(); + + let send_task = spawn!("send", async move { + for i in 0..10 { + tx.send(Frame::data(Bytes::from(format!("Frame{i}")))) + .await + .unwrap(); + } + }); + + let receive_task = spawn!("receive", async move { + let mut body = body; + for i in 0..10 { + let frame = body.frame().await.unwrap().unwrap(); + assert_eq!(frame.into_data().unwrap(), Bytes::from(format!("Frame{i}"))); + } + }); + + let (send_result, receive_result) = future::join(send_task, receive_task).await; + send_result.unwrap(); + receive_result.unwrap(); +} + +#[nativelink_test] +async fn test_channel_body_error_propagation() { + let (tx, mut body) = ChannelBody::new(); + + tx.send(Frame::data(Bytes::from("Error occurred"))) + .await + .unwrap(); + drop(tx); + + let frame = body.frame().await.unwrap().unwrap(); + let data = frame.into_data().unwrap(); + assert_eq!(data, Bytes::from("Error occurred")); + + // Create a Status error from the received data + let error = Status::internal(String::from_utf8(data.to_vec()).unwrap()); + assert_eq!(error.code(), tonic::Code::Internal); + assert_eq!(error.message(), "Error occurred"); +} + +#[nativelink_test] +async fn test_channel_body_abrupt_end() { + let (tx, mut body) = ChannelBody::new(); + + tx.send(Frame::data(Bytes::from("Frame1"))).await.unwrap(); + tx.send(Frame::data(Bytes::from("Frame2"))).await.unwrap(); + + drop(tx); + + assert_eq!( + body.frame().await.unwrap().unwrap().into_data().unwrap(), + Bytes::from("Frame1") + ); + assert_eq!( + body.frame().await.unwrap().unwrap().into_data().unwrap(), + Bytes::from("Frame2") + ); + + // Ensure that the stream has ended after dropping the sender + assert!(body.frame().await.is_none()); +} + +#[nativelink_test] +async fn test_channel_body_trailers() { + let (tx, mut body) = ChannelBody::new(); + + tx.send(Frame::data(Bytes::from("Data"))).await.unwrap(); + + let mut trailers = HeaderMap::new(); + trailers.insert("X-Trailer", "Value".parse().unwrap()); + tx.send(Frame::trailers(trailers.clone())).await.unwrap(); + + assert_eq!( + body.frame().await.unwrap().unwrap().into_data().unwrap(), + Bytes::from("Data") + ); + + let received_trailers = body + .frame() + .await + .unwrap() + .unwrap() + .into_trailers() + .unwrap(); + assert_eq!(received_trailers, trailers); + + drop(tx); + assert!(body.frame().await.is_none()); +} + +#[nativelink_test] +async fn test_channel_body_capacity() { + let (tx, body) = ChannelBody::new(); + + let send_task = spawn!("send", async move { + for i in 0..40 { + tx.send(Frame::data(Bytes::from(format!("Frame{i}")))) + .await + .unwrap(); + } + }); + + let receive_task = spawn!("receive", async move { + let mut body = body; + for i in 0..40 { + let frame = body.frame().await.unwrap().unwrap(); + assert_eq!(frame.into_data().unwrap(), Bytes::from(format!("Frame{i}"))); + } + assert!(body.frame().await.is_none()); + }); + + let (send_result, receive_result) = future::join(send_task, receive_task).await; + send_result.unwrap(); + receive_result.unwrap(); +} + +#[nativelink_test] +async fn test_channel_body_custom_buffer_size() { + let (tx, mut body) = ChannelBody::with_buffer_size(5); + + for i in 0..5 { + tx.send(Frame::data(Bytes::from(format!("Frame{i}")))) + .await + .unwrap(); + } + + // Attempt to send a frame when the buffer is full + let send_result = tx.try_send(Frame::data(Bytes::from("Frame5"))); + assert!(send_result.is_err()); + + for i in 0..5 { + let frame = body.frame().await.unwrap().unwrap(); + assert_eq!(frame.into_data().unwrap(), Bytes::from(format!("Frame{i}"))); + } +} + +#[nativelink_test] +async fn test_channel_body_is_end_stream() { + let (tx, body) = ChannelBody::new(); + + assert!(!body.is_end_stream()); + + tx.send(Frame::data(Bytes::from("Data"))).await.unwrap(); + assert!(!body.is_end_stream()); + + drop(tx); + assert!(body.is_end_stream()); +} + +#[nativelink_test] +async fn test_channel_body_frame_types() { + let (tx, mut body) = ChannelBody::new(); + + tx.send(Frame::data(Bytes::from("Data"))).await.unwrap(); + + let mut trailers = HeaderMap::new(); + trailers.insert("X-Trailer", "Value".parse().unwrap()); + tx.send(Frame::trailers(trailers.clone())).await.unwrap(); + + let data_frame = body.frame().await.unwrap().unwrap(); + assert!(data_frame.is_data()); + assert_eq!(data_frame.into_data().unwrap(), Bytes::from("Data")); + + let trailer_frame = body.frame().await.unwrap().unwrap(); + assert!(trailer_frame.is_trailers()); + assert_eq!(trailer_frame.into_trailers().unwrap(), trailers); +} + +#[nativelink_test] +async fn test_channel_body_error_scenarios() { + let (tx, mut body) = ChannelBody::new(); + + tx.send(Frame::data(Bytes::from("Data"))).await.unwrap(); + + drop(tx); + + let frame = body.frame().await.unwrap().unwrap(); + assert_eq!(frame.into_data().unwrap(), Bytes::from("Data")); + + // Ensure that the stream has ended after dropping the sender + assert!(body.frame().await.is_none()); +} + +#[nativelink_test] +async fn test_channel_body_send_error() { + let (tx, body) = ChannelBody::with_buffer_size(1); + + tx.send(Frame::data(Bytes::from("Data"))).await.unwrap(); + + // Attempt to send when the buffer is full + let result = tx.try_send(Frame::data(Bytes::from("More Data"))); + assert!(matches!(result, Err(TrySendError::Full(_)))); + + drop(body); + + // Attempt to send after the receiver has been dropped + let result = tx.try_send(Frame::data(Bytes::from("Even More Data"))); + assert!(matches!(result, Err(TrySendError::Closed(_)))); +} + +#[nativelink_test] +async fn test_channel_body_backpressure() { + let (tx, mut body) = ChannelBody::with_buffer_size(2); + + // Fill the buffer + tx.send(Frame::data(Bytes::from("Frame1"))).await.unwrap(); + tx.send(Frame::data(Bytes::from("Frame2"))).await.unwrap(); + + // This should block because the buffer is full + let send_task = spawn!("send", async move { + tx.send(Frame::data(Bytes::from("Frame3"))).await.unwrap(); + }); + + // Give some time for the send_task to block + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Read a frame, which should unblock the sender + let frame = body.frame().await.unwrap().unwrap(); + assert_eq!(frame.into_data().unwrap(), Bytes::from("Frame1")); + + // Wait for the send_task to complete + send_task.await.unwrap(); + + // Verify that all frames were received + let frame = body.frame().await.unwrap().unwrap(); + assert_eq!(frame.into_data().unwrap(), Bytes::from("Frame2")); + let frame = body.frame().await.unwrap().unwrap(); + assert_eq!(frame.into_data().unwrap(), Bytes::from("Frame3")); +} + +#[nativelink_test] +async fn test_channel_body_cancellation() { + let (tx, body) = ChannelBody::new(); + let tx_clone = tx.clone(); // Create a clone for later use + + let send_task = spawn!("send", async move { + for i in 0..5 { + match tx.send(Frame::data(Bytes::from(format!("Frame{i}")))).await { + Ok(()) => {} + Err(_) => break, // Stop if the receiver has been dropped + } + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + } + }); + + let receive_task = spawn!("receive", async move { + let mut body = body; + for _ in 0..2 { + match body.frame().await { + Some(Ok(frame)) => { + println!("Received: {:?}", frame.into_data().unwrap()); + } + _ => break, + } + } + // Simulate cancellation by dropping the body + drop(body); + }); + + let (send_result, receive_result) = future::join(send_task, receive_task).await; + send_result.unwrap(); + receive_result.unwrap(); + + // Use the cloned sender to check if the channel is closed + assert!(tx_clone.is_closed()); +} + +#[nativelink_test] +async fn test_channel_body_timeout() { + let (tx, mut body) = ChannelBody::new(); + + // Try to receive a frame with a timeout + let result = timeout(tokio::time::Duration::from_millis(100), body.frame()).await; + assert!(result.is_err(), "Expected timeout error"); + + // Send a frame + tx.send(Frame::data(Bytes::from("Frame1"))).await.unwrap(); + + // This should now succeed within the timeout + let result = timeout(tokio::time::Duration::from_millis(100), body.frame()).await; + assert!( + result.is_ok(), + "Expected frame to be received within timeout" + ); + let frame = result.unwrap().unwrap().unwrap(); + assert_eq!(frame.into_data().unwrap(), Bytes::from("Frame1")); + + // Close the channel + drop(tx); + + // This should complete immediately with None, not timeout + let result = timeout(tokio::time::Duration::from_millis(100), body.frame()).await; + assert!(result.is_ok(), "Expected immediate completion"); + assert!( + result.unwrap().is_none(), + "Expected None as channel is closed" + ); +} + +#[nativelink_test] +async fn test_channel_body_mixed_frame_types() { + let (tx, mut body) = ChannelBody::new(); + + let mut trailers1 = HeaderMap::new(); + trailers1.insert("X-Trailer-1", "Value1".parse().unwrap()); + let mut trailers2 = HeaderMap::new(); + trailers2.insert("X-Trailer-2", "Value2".parse().unwrap()); + + // Send a mix of data frames and trailers + tx.send(Frame::data(Bytes::from("Data1"))).await.unwrap(); + tx.send(Frame::trailers(trailers1.clone())).await.unwrap(); + tx.send(Frame::data(Bytes::from("Data2"))).await.unwrap(); + tx.send(Frame::trailers(trailers2.clone())).await.unwrap(); + tx.send(Frame::data(Bytes::from("Data3"))).await.unwrap(); + + // Drop the sender to signal the end of the stream + drop(tx); + + // Verify that frames are received in the correct order and type + let frame = body.frame().await.unwrap().unwrap(); + assert_eq!(frame.into_data().unwrap(), Bytes::from("Data1")); + + let frame = body.frame().await.unwrap().unwrap(); + assert_eq!(frame.into_trailers().unwrap(), trailers1); + + let frame = body.frame().await.unwrap().unwrap(); + assert_eq!(frame.into_data().unwrap(), Bytes::from("Data2")); + + let frame = body.frame().await.unwrap().unwrap(); + assert_eq!(frame.into_trailers().unwrap(), trailers2); + + let frame = body.frame().await.unwrap().unwrap(); + assert_eq!(frame.into_data().unwrap(), Bytes::from("Data3")); + + // Ensure the stream has ended + assert!(body.frame().await.is_none()); +} + +#[nativelink_test] +async fn test_channel_body_small_buffer() { + let (tx, mut body) = ChannelBody::with_buffer_size(1); + + // Test concurrent sending and receiving with a small buffer + let send_task = spawn!("send", async move { + for i in 0..5 { + tx.send(Frame::data(Bytes::from(format!("Frame{i}")))) + .await + .unwrap(); + } + }); + + let receive_task = spawn!("receive", async move { + for i in 0..5 { + let frame = body.frame().await.unwrap().unwrap(); + assert_eq!(frame.into_data().unwrap(), Bytes::from(format!("Frame{i}"))); + } + assert!(body.frame().await.is_none()); + }); + + let (send_result, receive_result) = future::join(send_task, receive_task).await; + send_result.unwrap(); + receive_result.unwrap(); +} + +#[nativelink_test] +async fn test_channel_body_large_buffer() { + const BUFFER_SIZE: usize = 1_000_000; // 1 million frames + const FRAME_COUNT: usize = 1_000_000; + + // Test with a very large buffer to ensure it can handle high volumes + let (tx, mut body) = ChannelBody::with_buffer_size(BUFFER_SIZE); + + let send_task = spawn!("send", async move { + for i in 0..FRAME_COUNT { + tx.send(Frame::data(Bytes::from(format!("Frame{i}")))) + .await + .unwrap(); + } + }); + + let receive_task = spawn!("receive", async move { + for i in 0..FRAME_COUNT { + let frame = body.frame().await.unwrap().unwrap(); + assert_eq!(frame.into_data().unwrap(), Bytes::from(format!("Frame{i}"))); + } + assert!(body.frame().await.is_none()); + }); + + let (send_result, receive_result) = future::join(send_task, receive_task).await; + send_result.unwrap(); + receive_result.unwrap(); +} diff --git a/nativelink-worker/BUILD.bazel b/nativelink-worker/BUILD.bazel index e55bf1a3e..bf6db8a3e 100644 --- a/nativelink-worker/BUILD.bazel +++ b/nativelink-worker/BUILD.bazel @@ -70,10 +70,12 @@ rust_test_suite( "//nativelink-store", "//nativelink-util", "@crates//:async-lock", + "@crates//:bytes", "@crates//:futures", "@crates//:hex", - "@crates//:hyper", + "@crates//:hyper-1.4.1", "@crates//:once_cell", + "@crates//:pin-project-lite", "@crates//:pretty_assertions", "@crates//:prost", "@crates//:prost-types", diff --git a/nativelink-worker/Cargo.toml b/nativelink-worker/Cargo.toml index 241f951ae..2b0e4fb8b 100644 --- a/nativelink-worker/Cargo.toml +++ b/nativelink-worker/Cargo.toml @@ -14,30 +14,31 @@ nativelink-store = { path = "../nativelink-store" } # functionality out of the schedulers. nativelink-scheduler = { path = "../nativelink-scheduler" } -async-lock = "3.3.0" -bytes = "1.6.0" +async-lock = "3.4.0" +bytes = "1.6.1" filetime = "0.2.23" formatx = "0.2.2" futures = "0.3.30" hex = "0.4.3" -parking_lot = "0.12.2" -prost = "0.12.4" +parking_lot = "0.12.3" +prost = "0.13.1" relative-path = "1.9.3" scopeguard = "1.2.0" -serde = "1.0.201" +serde = "1.0.204" serde_json5 = "0.1.0" shlex = "1.3.0" -tokio = { version = "1.37.0", features = ["sync", "rt", "process"] } +tokio = { version = "1.38.0", features = ["sync", "rt", "process"] } tokio-stream = { version = "0.1.15", features = ["fs"] } -tonic = { version = "0.11.0", features = ["gzip", "tls"] } +tonic = { version = "0.12.0", features = ["gzip", "tls"] } tracing = "0.1.40" -uuid = { version = "1.8.0", features = ["v4"] } +uuid = { version = "1.10.0", features = ["v4"] } [dev-dependencies] nativelink-macro = { path = "../nativelink-macro" } -hyper = "0.14.28" +hyper = "1.4.1" +hyper-util = "0.1.6" once_cell = "1.19.0" pretty_assertions = "1.4.0" -prost-types = "0.12.4" +prost-types = "0.13.1" rand = "0.8.5" diff --git a/nativelink-worker/src/running_actions_manager.rs b/nativelink-worker/src/running_actions_manager.rs index a22910065..05eb8e5c1 100644 --- a/nativelink-worker/src/running_actions_manager.rs +++ b/nativelink-worker/src/running_actions_manager.rs @@ -1789,7 +1789,6 @@ impl RunningActionsManager for RunningActionsManagerImpl { .wrap(async move { let queued_timestamp = start_execute .queued_timestamp - .clone() .and_then(|time| time.try_into().ok()) .unwrap_or(SystemTime::UNIX_EPOCH); let action_info = self.create_action_info(start_execute, queued_timestamp).await?; diff --git a/nativelink-worker/tests/local_worker_test.rs b/nativelink-worker/tests/local_worker_test.rs index aef1c6e6a..2a6316b93 100644 --- a/nativelink-worker/tests/local_worker_test.rs +++ b/nativelink-worker/tests/local_worker_test.rs @@ -29,6 +29,7 @@ mod utils { pub(crate) mod mock_running_actions_manager; } +use hyper::body::Frame; use nativelink_config::cas_server::{LocalWorkerConfig, WorkerProperty}; use nativelink_error::{make_err, make_input_err, Code, Error}; use nativelink_macro::nativelink_test; @@ -142,7 +143,7 @@ async fn reconnect_on_server_disconnect_test() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box (HyperSender, Response>) { - let (tx, body) = Body::channel(); +pub fn setup_grpc_stream() -> ( + mpsc::Sender>, + Response>, +) { + let (tx, body) = ChannelBody::new(); let mut codec = ProstCodec::::default(); - // Note: This is an undocumented function. let stream = Streaming::new_request(codec.decoder(), body, Some(CompressionEncoding::Gzip), None); (tx, Response::new(stream)) } +// pub fn setup_grpc_stream() -> (HyperSender, Response>) { +// let (tx, body) = Body::channel(); +// let mut codec = ProstCodec::::default(); +// // Note: This is an undocumented function. +// let stream = +// Streaming::new_request(codec.decoder(), body, Some(CompressionEncoding::Gzip), None); +// (tx, Response::new(stream)) +// } + pub async fn setup_local_worker_with_config(local_worker_config: LocalWorkerConfig) -> TestContext { let mock_worker_api_client = MockWorkerApiClient::new(); let mock_worker_api_client_clone = mock_worker_api_client.clone(); @@ -225,7 +237,7 @@ pub struct TestContext { pub actions_manager: Arc, pub maybe_streaming_response: Option>>, - pub maybe_tx_stream: Option, + pub maybe_tx_stream: Option>>, _drop_guard: JoinHandleDropGuard>, } diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index 49142778f..47a88e2cd 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -21,8 +21,10 @@ use async_lock::Mutex as AsyncMutex; use axum::Router; use clap::Parser; use futures::future::{select_all, BoxFuture, Either, OptionFuture, TryFutureExt}; -use hyper::server::conn::Http; +use hyper::server::conn::http2; use hyper::{Response, StatusCode}; +use hyper_util::rt::tokio::TokioIo; +use hyper_util::service::TowerToHyperService; use mimalloc::MiMalloc; use nativelink_config::cas_server::{ CasConfig, GlobalConfig, HttpCompressionAlgorithm, ListenerConfig, ServerConfig, WorkerConfig, @@ -67,7 +69,6 @@ use tokio_rustls::rustls::{RootCertStore, ServerConfig as TlsServerConfig}; use tokio_rustls::TlsAcceptor; use tonic::codec::CompressionEncoding; use tonic::transport::Server as TonicServer; -use tower::util::ServiceExt; use tracing::{error_span, event, trace_span, Level}; #[global_allocator] @@ -426,8 +427,9 @@ async fn inner_main( let health_registry = health_registry_builder.lock().await.build(); let mut svc = Router::new() + .merge(tonic_services.into_router()) // This is the default service that executes if no other endpoint matches. - .fallback_service(tonic_services.into_service().map_err(|e| panic!("{e}"))); + .fallback((StatusCode::NOT_FOUND, "Not Found")); if let Some(health_cfg) = services.health { let path = if health_cfg.path.is_empty() { @@ -451,7 +453,7 @@ async fn inner_main( }; svc = svc.route_service( path, - axum::routing::get(move |_request: hyper::Request| { + axum::routing::get(move |_request: hyper::Request| { Arc::new(OriginContext::new()).wrap_async( trace_span!("prometheus_ctx"), async move { @@ -631,46 +633,46 @@ async fn inner_main( let socket_addr = http_config.socket_address.parse::()?; let tcp_listener = TcpListener::bind(&socket_addr).await?; - let mut http = Http::new().with_executor(TaskExecutor::default()); + let mut http = http2::Builder::new(TaskExecutor::default()); let http_config = &http_config.advanced_http; if let Some(value) = http_config.http2_keep_alive_interval { - http.http2_keep_alive_interval(Duration::from_secs(u64::from(value))); + http.keep_alive_interval(Duration::from_secs(u64::from(value))); } if let Some(value) = http_config.experimental_http2_max_pending_accept_reset_streams { - http.http2_max_pending_accept_reset_streams(usize::try_from(value).err_tip(|| { + http.max_pending_accept_reset_streams(usize::try_from(value).err_tip(|| { "Could not convert experimental_http2_max_pending_accept_reset_streams" })?); } if let Some(value) = http_config.experimental_http2_initial_stream_window_size { - http.http2_initial_stream_window_size(value); + http.initial_stream_window_size(value); } if let Some(value) = http_config.experimental_http2_initial_connection_window_size { - http.http2_initial_connection_window_size(value); + http.initial_connection_window_size(value); } if let Some(value) = http_config.experimental_http2_adaptive_window { - http.http2_adaptive_window(value); + http.adaptive_window(value); } if let Some(value) = http_config.experimental_http2_max_frame_size { - http.http2_max_frame_size(value); + http.max_frame_size(value); } if let Some(value) = http_config.experimental_http2_max_concurrent_streams { - http.http2_max_concurrent_streams(value); + http.max_concurrent_streams(value); } if let Some(value) = http_config.experimental_http2_keep_alive_timeout { - http.http2_keep_alive_timeout(Duration::from_secs(u64::from(value))); + http.keep_alive_timeout(Duration::from_secs(u64::from(value))); } if let Some(value) = http_config.experimental_http2_max_send_buf_size { - http.http2_max_send_buf_size( + http.max_send_buf_size( usize::try_from(value).err_tip(|| "Could not convert http2_max_send_buf_size")?, ); } if let Some(true) = http_config.experimental_http2_enable_connect_protocol { - http.http2_enable_connect_protocol(); + http.enable_connect_protocol(); } if let Some(value) = http_config.experimental_http2_max_header_list_size { - http.http2_max_header_list_size(value); + http.max_header_list_size(value); } event!(Level::WARN, "Ready, listening on {socket_addr}",); @@ -731,8 +733,8 @@ async fn inner_main( let serve_connection = if let Some(tls_acceptor) = maybe_tls_acceptor { match tls_acceptor.accept(tcp_stream).await { Ok(tls_stream) => Either::Left(http.serve_connection( - tls_stream, - svc, + TokioIo::new(tls_stream), + TowerToHyperService::new(svc), )), Err(err) => { event!(Level::ERROR, ?err, "Failed to accept tls stream"); @@ -741,8 +743,8 @@ async fn inner_main( } } else { Either::Right(http.serve_connection( - tcp_stream, - svc, + TokioIo::new(tcp_stream), + TowerToHyperService::new(svc), )) };