diff --git a/BUILD.bazel b/BUILD.bazel index 3ce902f18..149ca30f2 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -28,7 +28,8 @@ rust_binary( "@crates//:axum", "@crates//:clap", "@crates//:futures", - "@crates//:hyper", + "@crates//:hyper-1.4.1", + "@crates//:hyper-util", "@crates//:mimalloc", "@crates//:opentelemetry", "@crates//:opentelemetry-prometheus", diff --git a/Cargo.lock b/Cargo.lock index a6fe674ae..badda7cfd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -57,9 +57,9 @@ checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" [[package]] name = "anstream" -version = "0.6.14" +version = "0.6.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "418c75fa768af9c03be99d17643f93f79bbba589895012a80e3452a19ddda15b" +checksum = "64e15c1ab1f89faffbf04a634d5e1962e9074f2741eef6d97f3c4e322426d526" dependencies = [ "anstyle", "anstyle-parse", @@ -72,33 +72,33 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.7" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "038dfcf04a5feb68e9c60b21c9625a54c2c0616e79b72b0fd87075a056ae1d1b" +checksum = "1bec1de6f59aedf83baf9ff929c98f2ad654b97c9510f4e70cf6f661d49fd5b1" [[package]] name = "anstyle-parse" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c03a11a9034d92058ceb6ee011ce58af4a9bf61491aa7e1e59ecd24bd40d22d4" +checksum = "eb47de1e80c2b463c735db5b217a0ddc39d612e7ac9e2e96a5aed1f57616c1cb" dependencies = [ "utf8parse", ] [[package]] name = "anstyle-query" -version = "1.1.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad186efb764318d35165f1758e7dcef3b10628e26d41a44bc5550652e6804391" +checksum = "6d36fc52c7f6c869915e99412912f22093507da8d9e942ceaf66fe4b7c14422a" dependencies = [ "windows-sys 0.52.0", ] [[package]] name = "anstyle-wincon" -version = "3.0.3" +version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61a38449feb7068f52bb06c12759005cf459ee52bb4adc1d5a7c4322d716fb19" +checksum = "5bf74e1b6e971609db8ca7a9ce79fd5768ab6ae46441c572e46cf596f59e57f8" dependencies = [ "anstyle", "windows-sys 0.52.0", @@ -118,9 +118,9 @@ checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" [[package]] name = "arrayref" -version = "0.3.7" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b4930d2cb77ce62f89ee5d5289b4ac049559b1c45539271f5ed4fdc7db34545" +checksum = "9d151e35f61089500b617991b791fc8bfd237ae50cd5950803758a179b41e67a" [[package]] name = "arrayvec" @@ -169,7 +169,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] @@ -180,9 +180,15 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "autocfg" version = "1.3.0" @@ -258,9 +264,9 @@ dependencies = [ [[package]] name = "aws-sdk-s3" -version = "1.41.0" +version = "1.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "605ea81e6d88d9afdfb16369e809c037e1b08c937452ee475eaa72c4233d9685" +checksum = "558bbcec8db82a1a8af1610afcb3b10d00652d25ad366a0558eecdff2400a1d1" dependencies = [ "ahash 0.8.11", "aws-credential-types", @@ -293,9 +299,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.35.0" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc3ef4ee9cdd19ec6e8b10d963b79637844bbf41c31177b77a188eaa941e69f7" +checksum = "6acca681c53374bf1d9af0e317a41d12a44902ca0f2d1e10e5cb5bb98ed74f35" dependencies = [ "aws-credential-types", "aws-runtime", @@ -315,9 +321,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.36.0" +version = "1.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "527f3da450ea1f09f95155dba6153bd0d83fe0923344a12e1944dfa5d0b32064" +checksum = "b79c6bdfe612503a526059c05c9ccccbf6bd9530b003673cb863e547fd7c0c9a" dependencies = [ "aws-credential-types", "aws-runtime", @@ -337,9 +343,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.35.0" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94316606a4aa2cb7a302388411b8776b3fbd254e8506e2dc43918286d8212e9b" +checksum = "32e6ecdb2bd756f3b2383e6f0588dc10a4e65f5d551e70a56e0bfe0c884673ce" dependencies = [ "aws-credential-types", "aws-runtime", @@ -499,7 +505,7 @@ dependencies = [ "aws-smithy-types", "bytes", "fastrand", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "http-body 1.0.1", @@ -586,18 +592,19 @@ dependencies = [ [[package]] name = "axum" -version = "0.6.20" +version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" dependencies = [ "async-trait", "axum-core", - "bitflags 1.3.2", "bytes", "futures-util", - "http 0.2.12", - "http-body 0.4.6", - "hyper 0.14.30", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.4.1", + "hyper-util", "itoa", "matchit", "memchr", @@ -609,28 +616,33 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 1.0.1", "tokio", "tower", "tower-layer", "tower-service", + "tracing", ] [[package]] name = "axum-core" -version = "0.3.4" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" dependencies = [ "async-trait", "bytes", "futures-util", - "http 0.2.12", - "http-body 0.4.6", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", "mime", + "pin-project-lite", "rustversion", + "sync_wrapper 0.1.2", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -758,7 +770,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", "syn_derive", ] @@ -825,9 +837,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.5" +version = "1.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "324c74f2155653c90b04f25b2a47a8a631360cb908f92a772695f430c7e31052" +checksum = "2aba8f4e9906c7ce3c73463f62a7f0c65183ada1a2d47e397cc8810827f9694f" [[package]] name = "cfg-if" @@ -843,9 +855,9 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" [[package]] name = "clap" -version = "4.5.9" +version = "4.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64acc1846d54c1fe936a78dc189c34e28d3f5afc348403f28ecf53660b9b8462" +checksum = "35723e6a11662c2afb578bcf0b88bf6ea8e21282a953428f240574fcc3a2b5b3" dependencies = [ "clap_builder", "clap_derive", @@ -853,9 +865,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.9" +version = "4.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fb8393d67ba2e7bfaf28a23458e4e2b543cc73a99595511eb207fdb8aede942" +checksum = "49eb96cbfa7cfa35017b7cd548c75b14c3118c98b423041d70562665e07fb0fa" dependencies = [ "anstream", "anstyle", @@ -865,27 +877,27 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.8" +version = "4.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bac35c6dafb060fd4d275d9a4ffae97917c13a6327903a8be2153cd964f7085" +checksum = "5d029b67f89d30bbb547c89fd5161293c0aec155fc691d7924b64550662db93e" dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] name = "clap_lex" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b82cf0babdbd58558212896d1a4272303a57bdb245c2bf1147185fb45640e70" +checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" [[package]] name = "colorchoice" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" +checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0" [[package]] name = "combine" @@ -913,8 +925,7 @@ dependencies = [ [[package]] name = "console-api" version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a257c22cd7e487dd4a13d413beabc512c5052f0bc048db0da6a84c3d8a6142fd" +source = "git+https://github.com/tokio-rs/console?rev=5f6faa2#5f6faa22d944735c2b8c312cac03b35a4ab228ef" dependencies = [ "futures-core", "prost", @@ -926,8 +937,7 @@ dependencies = [ [[package]] name = "console-subscriber" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31c4cc54bae66f7d9188996404abdf7fdfa23034ef8e43478c8810828abad758" +source = "git+https://github.com/tokio-rs/console?rev=5f6faa2#5f6faa22d944735c2b8c312cac03b35a4ab228ef" dependencies = [ "console-api", "crossbeam-channel", @@ -935,6 +945,7 @@ dependencies = [ "futures-task", "hdrhistogram", "humantime", + "hyper-util", "prost", "prost-types", "serde", @@ -1327,7 +1338,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] @@ -1423,6 +1434,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "h2" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.1.0", + "indexmap 2.2.6", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -1566,7 +1596,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "httparse", @@ -1587,9 +1617,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" dependencies = [ "bytes", + "futures-channel", + "futures-util", + "h2 0.4.5", "http 1.1.0", "http-body 1.0.1", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", "tokio", + "want", ] [[package]] @@ -1611,14 +1650,15 @@ dependencies = [ [[package]] name = "hyper-timeout" -version = "0.4.1" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" dependencies = [ - "hyper 0.14.30", + "hyper 1.4.1", + "hyper-util", "pin-project-lite", "tokio", - "tokio-io-timeout", + "tower-service", ] [[package]] @@ -1628,12 +1668,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3ab92f4f49ee4fb4f997c784b7a2e0fa70050211e0b6a287f898c3c9785ca956" dependencies = [ "bytes", + "futures-channel", "futures-util", "http 1.1.0", "http-body 1.0.1", "hyper 1.4.1", "pin-project-lite", + "socket2", "tokio", + "tower", + "tower-service", + "tracing", ] [[package]] @@ -1669,15 +1714,15 @@ dependencies = [ [[package]] name = "is_terminal_polyfill" -version = "1.70.0" +version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8478577c03552c21db0e2724ffb8986a5ce7af88107e6be5d2ee6e158c12800" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" [[package]] name = "itertools" -version = "0.12.1" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" dependencies = [ "either", ] @@ -1857,13 +1902,14 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.11" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +checksum = "4569e456d394deccd22ce1c1913e6ea0e54519f577285001215d33557431afe4" dependencies = [ + "hermit-abi", "libc", "wasi", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -1886,7 +1932,8 @@ dependencies = [ "axum", "clap", "futures", - "hyper 0.14.30", + "hyper 1.4.1", + "hyper-util", "mimalloc", "nativelink-config", "nativelink-error", @@ -1948,7 +1995,7 @@ version = "0.4.0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] @@ -1985,7 +2032,7 @@ version = "0.4.0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] @@ -2040,9 +2087,13 @@ version = "0.4.0" dependencies = [ "async-lock", "async-trait", + "axum", "bytes", "futures", - "hyper 0.14.30", + "http-body 1.0.1", + "http-body-util", + "hyper 1.4.1", + "hyper-util", "maplit", "nativelink-config", "nativelink-error", @@ -2128,7 +2179,8 @@ dependencies = [ "console-subscriber", "futures", "hex", - "hyper 0.14.30", + "http-body-util", + "hyper 1.4.1", "hyper-util", "lru", "mock_instant", @@ -2165,7 +2217,8 @@ dependencies = [ "formatx", "futures", "hex", - "hyper 0.14.30", + "hyper 1.4.1", + "hyper-util", "nativelink-config", "nativelink-error", "nativelink-macro", @@ -2236,21 +2289,11 @@ dependencies = [ "autocfg", ] -[[package]] -name = "num_cpus" -version = "1.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" -dependencies = [ - "hermit-abi", - "libc", -] - [[package]] name = "object" -version = "0.36.1" +version = "0.36.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "081b846d1d56ddfc18fdf1a922e4f6e07a11768ea1b92dec44e42b72712ccfce" +checksum = "3f203fa8daa7bb185f760ae12bd8e097f63d17041dcdcaf675ac54cdf863170e" dependencies = [ "memchr", ] @@ -2418,7 +2461,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] @@ -2459,7 +2502,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] @@ -2513,7 +2556,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e" dependencies = [ "proc-macro2", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] @@ -2575,9 +2618,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.12.6" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" +checksum = "e13db3d3fde688c61e2446b4d843bc27a7e8af269a69440c0308021dc92333cc" dependencies = [ "bytes", "prost-derive", @@ -2585,9 +2628,9 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.12.6" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" +checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1" dependencies = [ "bytes", "heck", @@ -2600,28 +2643,28 @@ dependencies = [ "prost", "prost-types", "regex", - "syn 2.0.68", + "syn 2.0.72", "tempfile", ] [[package]] name = "prost-derive" -version = "0.12.6" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" +checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca" dependencies = [ "anyhow", "itertools", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] name = "prost-types" -version = "0.12.6" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0" +checksum = "cee5168b05f49d4b0ca581206eb14a7b22fafd963efe729ac48eb03266e25cc2" dependencies = [ "prost", ] @@ -2751,7 +2794,7 @@ checksum = "8dfe1dc77e38e260bbd53e98d3aec64add3cdf5d773e38d344c63660196117f5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] @@ -2987,7 +3030,22 @@ dependencies = [ "log", "ring", "rustls-pki-types", - "rustls-webpki 0.102.5", + "rustls-webpki 0.102.6", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls" +version = "0.23.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044" +dependencies = [ + "log", + "once_cell", + "ring", + "rustls-pki-types", + "rustls-webpki 0.102.6", "subtle", "zeroize", ] @@ -3054,9 +3112,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.102.5" +version = "0.102.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9a6fccd794a42c2c105b513a2f62bc3fd8f3ba57a4593677ceb0bd035164d78" +checksum = "8e6b52d4fda176fd835fdc55a835d4a89b8499cad995885a21149d5ad62f852e" dependencies = [ "ring", "rustls-pki-types", @@ -3077,9 +3135,9 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" [[package]] name = "scc" -version = "2.1.4" +version = "2.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4465c22496331e20eb047ff46e7366455bc01c0c02015c4a376de0b2cd3a1af" +checksum = "1fadf67e3cf23f8b11a6c8c48a16cb2437381503615acd91094ec7b4686a5a53" dependencies = [ "sdd", ] @@ -3111,9 +3169,9 @@ dependencies = [ [[package]] name = "sdd" -version = "1.5.0" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e806d6633ef141556fef75e345275e35652e9c045bbbc21e6ecfce3e9aa2638" +checksum = "85f05a494052771fc5bd0619742363b5e24e5ad72ab3111ec2e27925b8edc5f3" [[package]] name = "seahash" @@ -3181,7 +3239,7 @@ checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] @@ -3251,7 +3309,7 @@ checksum = "82fe9db325bcef1fbcde82e078a5cc4efdf787e96b3b9cf45b50b529f2083d67" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] @@ -3267,9 +3325,9 @@ dependencies = [ [[package]] name = "sha1_smol" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" [[package]] name = "sha2" @@ -3403,9 +3461,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.68" +version = "2.0.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "901fa70d88b9d6c98022e23b4136f9f3e54e4662c3bc1bd1d84a42a9a0f0c1e9" +checksum = "dc4b9b9bf2add8093d3f2c0204471e951b2285580335de42f9d2534f3ae7a8af" dependencies = [ "proc-macro2", "quote", @@ -3421,7 +3479,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] @@ -3430,6 +3488,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sync_wrapper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" + [[package]] name = "tap" version = "1.0.1" @@ -3450,22 +3514,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.62" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2675633b1499176c2dff06b0856a27976a8f9d436737b4cf4f312d4d91d8bbb" +checksum = "c0342370b38b6a11b6cc11d6a805569958d54cfa061a29969c3b5ce2ea405724" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.62" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d20468752b09f49e909e55a5d338caa8bedf615594e9d80bc4c565d30faf798c" +checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] @@ -3525,43 +3589,32 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.38.1" +version = "1.39.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb2caba9f80616f438e09748d5acda951967e1ea58508ef53d9c6402485a46df" +checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1" dependencies = [ "backtrace", "bytes", "libc", "mio", - "num_cpus", "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", "tokio-macros", "tracing", - "windows-sys 0.48.0", -] - -[[package]] -name = "tokio-io-timeout" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" -dependencies = [ - "pin-project-lite", - "tokio", + "windows-sys 0.52.0", ] [[package]] name = "tokio-macros" -version = "2.3.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] @@ -3596,6 +3649,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" +dependencies = [ + "rustls 0.23.12", + "rustls-pki-types", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.15" @@ -3623,9 +3687,9 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "0.6.6" +version = "0.6.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4badfd56924ae69bcc9039335b2e017639ce3f9b001c393c1b2d1ef846ce2cbf" +checksum = "f8fb9f64314842840f1d940ac544da178732128f1c78c21772e876579e0da1db" [[package]] name = "toml_edit" @@ -3640,28 +3704,30 @@ dependencies = [ [[package]] name = "tonic" -version = "0.11.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" +checksum = "38659f4a91aba8598d27821589f5db7dddd94601e7a01b1e485a50e5484c7401" dependencies = [ "async-stream", "async-trait", "axum", - "base64 0.21.7", + "base64 0.22.1", "bytes", "flate2", - "h2", - "http 0.2.12", - "http-body 0.4.6", - "hyper 0.14.30", + "h2 0.4.5", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.4.1", "hyper-timeout", + "hyper-util", "percent-encoding", "pin-project", "prost", "rustls-pemfile 2.1.2", - "rustls-pki-types", + "socket2", "tokio", - "tokio-rustls 0.25.0", + "tokio-rustls 0.26.0", "tokio-stream", "tower", "tower-layer", @@ -3671,15 +3737,15 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.11.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be4ef6dd70a610078cb4e338a0f79d06bc759ff1b22d2120c2ff02ae264ba9c2" +checksum = "568392c5a2bd0020723e3f387891176aabafe36fd9fcd074ad309dfa0c8eb964" dependencies = [ "prettyplease", "proc-macro2", "prost-build", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] @@ -3734,7 +3800,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] @@ -3891,9 +3957,9 @@ checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" [[package]] name = "version_check" -version = "0.9.4" +version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" [[package]] name = "vsimd" @@ -3937,7 +4003,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", "wasm-bindgen-shared", ] @@ -3959,7 +4025,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -4184,7 +4250,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.68", + "syn 2.0.72", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index d03e54497..6f83364a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,20 +43,21 @@ nativelink-worker = { path = "nativelink-worker" } nativelink-metric = { path = "nativelink-metric" } nativelink-metric-collector = { path = "nativelink-metric-collector" } -async-lock = "3.3.0" -axum = "0.6.20" +async-lock = "3.4.0" +axum = "0.7.5" -clap = { version = "4.5.4", features = ["derive"] } +clap = { version = "4.5.9", features = ["derive"] } futures = "0.3.30" -hyper = { version = "0.14.28" } -mimalloc = "0.1.41" -parking_lot = "0.12.2" +hyper = { version = "1.4.1" } +hyper-util = "0.1.6" +mimalloc = "0.1.43" +parking_lot = "0.12.3" rustls-pemfile = "2.1.2" scopeguard = "1.2.0" serde_json5 = "0.1.0" -tokio = { version = "1.37.0", features = ["rt-multi-thread", "signal"] } +tokio = { version = "1.38.0", features = ["rt-multi-thread", "signal"] } tokio-rustls = "0.25.0" -tonic = { version = "0.11.0", features = ["gzip", "tls"] } +tonic = { version = "0.12.0", features = ["gzip", "tls"] } tower = "0.4.13" tracing = "0.1.40" opentelemetry_sdk = { version = "0.23.0", features = ["metrics"] } diff --git a/nativelink-config/Cargo.toml b/nativelink-config/Cargo.toml index e33c6c3a6..5d031bd36 100644 --- a/nativelink-config/Cargo.toml +++ b/nativelink-config/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies] byte-unit = "5.1.4" humantime = "2.1.0" -serde = { version = "1.0.203", features = ["derive"] } +serde = { version = "1.0.204", features = ["derive"] } serde_json5 = "0.1.0" shellexpand = "3.1.0" diff --git a/nativelink-error/Cargo.toml b/nativelink-error/Cargo.toml index d687953f9..f7c501344 100644 --- a/nativelink-error/Cargo.toml +++ b/nativelink-error/Cargo.toml @@ -12,9 +12,9 @@ nativelink-proto = { path = "../nativelink-proto" } nativelink-metric = { path = "../nativelink-metric" } hex = "0.4.3" -prost = "0.12.4" -prost-types = "0.12.4" -redis = "0.25.2" -serde = { version = "1.0.201", features = ["derive"] } -tokio = { version = "1.37.0" } -tonic = { version = "0.11.0", features = ["gzip"] } +prost = "0.13.1" +prost-types = "0.13.1" +redis = "0.25.4" +serde = { version = "1.0.204", features = ["derive"] } +tokio = { version = "1.38.0" } +tonic = { version = "0.12.0", features = ["gzip"] } diff --git a/nativelink-error/src/lib.rs b/nativelink-error/src/lib.rs index 44e449dc3..776f312ad 100644 --- a/nativelink-error/src/lib.rs +++ b/nativelink-error/src/lib.rs @@ -159,6 +159,12 @@ impl From for Error { } } +impl From for Error { + fn from(err: prost::UnknownEnumValue) -> Self { + make_err!(Code::Internal, "{}", err.to_string()) + } +} + impl From for Error { fn from(err: std::num::TryFromIntError) -> Self { make_err!(Code::InvalidArgument, "{}", err.to_string()) diff --git a/nativelink-macro/Cargo.toml b/nativelink-macro/Cargo.toml index 33012d6ef..ba2c7d973 100644 --- a/nativelink-macro/Cargo.toml +++ b/nativelink-macro/Cargo.toml @@ -7,8 +7,6 @@ edition = "2021" proc-macro = true [dependencies] -# TODO(allada) We currently need to pin these to specific version. -# Some down-stream can't be upgraded just yet. -proc-macro2 = "=1.0.86" -quote = "=1.0.36" -syn = "=2.0.68" +proc-macro2 = "1.0.86" +quote = "1.0.36" +syn = "2.0.71" diff --git a/nativelink-metric/nativelink-metric-macro-derive/Cargo.toml b/nativelink-metric/nativelink-metric-macro-derive/Cargo.toml index 07f79baed..8464944b4 100644 --- a/nativelink-metric/nativelink-metric-macro-derive/Cargo.toml +++ b/nativelink-metric/nativelink-metric-macro-derive/Cargo.toml @@ -7,8 +7,6 @@ edition = "2021" proc-macro = true [dependencies] -# TODO(allada) We currently need to pin these to specific version. -# Some down-stream can't be upgraded just yet. -proc-macro2 = { version = "=1.0.86", features = ["proc-macro", "span-locations"] } -quote = "=1.0.36" -syn = { version = "=2.0.68", features = ["extra-traits", "full", "fold"] } +proc-macro2 = { version = "1.0.86", features = ["proc-macro", "span-locations"] } +quote = "1.0.36" +syn = { version = "2.0.68", features = ["extra-traits", "full", "fold"] } diff --git a/nativelink-proto/Cargo.toml b/nativelink-proto/Cargo.toml index a7d22c0f4..d5a3c90d4 100644 --- a/nativelink-proto/Cargo.toml +++ b/nativelink-proto/Cargo.toml @@ -19,10 +19,10 @@ path = "genproto/lib.rs" doctest = false [dependencies] -prost = "0.12.4" -prost-types = "0.12.4" -tonic = { version = "0.11.0", features = ["gzip"] } +prost = "0.13.1" +prost-types = "0.13.1" +tonic = { version = "0.12.0", features = ["gzip"] } [dev-dependencies] -prost-build = "0.12.4" -tonic-build = "0.11.0" +prost-build = "0.13.1" +tonic-build = "0.12.0" diff --git a/nativelink-proto/genproto/build.bazel.remote.execution.v2.pb.rs b/nativelink-proto/genproto/build.bazel.remote.execution.v2.pb.rs index 0e6f8aa30..9a070fe7a 100644 --- a/nativelink-proto/genproto/build.bazel.remote.execution.v2.pb.rs +++ b/nativelink-proto/genproto/build.bazel.remote.execution.v2.pb.rs @@ -913,7 +913,7 @@ pub struct OutputSymlink { } /// An `ExecutionPolicy` can be used to control the scheduling of the action. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ExecutionPolicy { /// The priority (relative importance) of this action. Generally, a lower value /// means that the action should be run sooner than actions having a greater @@ -930,7 +930,7 @@ pub struct ExecutionPolicy { /// A `ResultsCachePolicy` is used for fine-grained control over how action /// outputs are stored in the CAS and Action Cache. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ResultsCachePolicy { /// The priority (relative importance) of this content in the overall cache. /// Generally, a lower value means a longer retention time or other advantage, @@ -1066,7 +1066,7 @@ pub struct ExecuteResponse { /// field][google.longrunning.Operation.done] of the /// [Operation][google.longrunning.Operation] and terminate the stream. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ExecutionStage {} /// Nested message and enum types in `ExecutionStage`. pub mod execution_stage { @@ -1500,7 +1500,7 @@ pub struct ServerCapabilities { /// The digest function used for converting values into keys for CAS and Action /// Cache. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct DigestFunction {} /// Nested message and enum types in `DigestFunction`. pub mod digest_function { @@ -1638,7 +1638,7 @@ pub mod digest_function { } /// Describes the server/instance capabilities for updating the action cache. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct ActionCacheUpdateCapabilities { #[prost(bool, tag = "1")] pub update_enabled: bool, @@ -1657,7 +1657,7 @@ pub struct PriorityCapabilities { pub mod priority_capabilities { /// Supported range of priorities, including boundaries. #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Message)] + #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct PriorityRange { /// The minimum numeric value for this priority range, which represents the /// most urgent task or longest retained item. @@ -1671,7 +1671,7 @@ pub mod priority_capabilities { } /// Describes how the server treats absolute symlink targets. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct SymlinkAbsolutePathStrategy {} /// Nested message and enum types in `SymlinkAbsolutePathStrategy`. pub mod symlink_absolute_path_strategy { @@ -1724,7 +1724,7 @@ pub mod symlink_absolute_path_strategy { } /// Compression formats which may be supported. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct Compressor {} /// Nested message and enum types in `Compressor`. pub mod compressor { @@ -3046,19 +3046,17 @@ pub mod execution_server { /// respect the information provided. #[derive(Debug)] pub struct ExecutionServer { - inner: _Inner, + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); impl ExecutionServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -3121,7 +3119,6 @@ pub mod execution_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/build.bazel.remote.execution.v2.Execution/Execute" => { #[allow(non_camel_case_types)] @@ -3153,7 +3150,6 @@ pub mod execution_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ExecuteSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -3200,7 +3196,6 @@ pub mod execution_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = WaitExecutionSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -3222,8 +3217,11 @@ pub mod execution_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) .body(empty_body()) .unwrap(), ) @@ -3244,16 +3242,6 @@ pub mod execution_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } impl tonic::server::NamedService for ExecutionServer { const NAME: &'static str = "build.bazel.remote.execution.v2.Execution"; } @@ -3323,19 +3311,17 @@ pub mod action_cache_server { /// respect the information provided. #[derive(Debug)] pub struct ActionCacheServer { - inner: _Inner, + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); impl ActionCacheServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -3398,7 +3384,6 @@ pub mod action_cache_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/build.bazel.remote.execution.v2.ActionCache/GetActionResult" => { #[allow(non_camel_case_types)] @@ -3429,7 +3414,6 @@ pub mod action_cache_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = GetActionResultSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -3476,7 +3460,6 @@ pub mod action_cache_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = UpdateActionResultSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -3498,8 +3481,11 @@ pub mod action_cache_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) .body(empty_body()) .unwrap(), ) @@ -3520,16 +3506,6 @@ pub mod action_cache_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } impl tonic::server::NamedService for ActionCacheServer { const NAME: &'static str = "build.bazel.remote.execution.v2.ActionCache"; } @@ -3793,19 +3769,17 @@ pub mod content_addressable_storage_server { /// respect the information provided. #[derive(Debug)] pub struct ContentAddressableStorageServer { - inner: _Inner, + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); impl ContentAddressableStorageServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -3869,7 +3843,6 @@ pub mod content_addressable_storage_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/build.bazel.remote.execution.v2.ContentAddressableStorage/FindMissingBlobs" => { #[allow(non_camel_case_types)] @@ -3904,7 +3877,6 @@ pub mod content_addressable_storage_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = FindMissingBlobsSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -3954,7 +3926,6 @@ pub mod content_addressable_storage_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = BatchUpdateBlobsSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -4004,7 +3975,6 @@ pub mod content_addressable_storage_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = BatchReadBlobsSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -4052,7 +4022,6 @@ pub mod content_addressable_storage_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = GetTreeSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -4074,8 +4043,11 @@ pub mod content_addressable_storage_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) .body(empty_body()) .unwrap(), ) @@ -4096,16 +4068,6 @@ pub mod content_addressable_storage_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } impl tonic::server::NamedService for ContentAddressableStorageServer { const NAME: &'static str = "build.bazel.remote.execution.v2.ContentAddressableStorage"; @@ -4144,19 +4106,17 @@ pub mod capabilities_server { /// returned will pertain to that instance. #[derive(Debug)] pub struct CapabilitiesServer { - inner: _Inner, + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); impl CapabilitiesServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -4219,7 +4179,6 @@ pub mod capabilities_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/build.bazel.remote.execution.v2.Capabilities/GetCapabilities" => { #[allow(non_camel_case_types)] @@ -4250,7 +4209,6 @@ pub mod capabilities_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = GetCapabilitiesSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -4272,8 +4230,11 @@ pub mod capabilities_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) .body(empty_body()) .unwrap(), ) @@ -4294,16 +4255,6 @@ pub mod capabilities_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } impl tonic::server::NamedService for CapabilitiesServer { const NAME: &'static str = "build.bazel.remote.execution.v2.Capabilities"; } diff --git a/nativelink-proto/genproto/com.github.trace_machina.nativelink.remote_execution.pb.rs b/nativelink-proto/genproto/com.github.trace_machina.nativelink.remote_execution.pb.rs index 268b5e3ce..aff36a78c 100644 --- a/nativelink-proto/genproto/com.github.trace_machina.nativelink.remote_execution.pb.rs +++ b/nativelink-proto/genproto/com.github.trace_machina.nativelink.remote_execution.pb.rs @@ -459,19 +459,17 @@ pub mod worker_api_server { /// / to determine which jobs the worker can process. #[derive(Debug)] pub struct WorkerApiServer { - inner: _Inner, + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); impl WorkerApiServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -534,7 +532,6 @@ pub mod worker_api_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/com.github.trace_machina.nativelink.remote_execution.WorkerApi/ConnectWorker" => { #[allow(non_camel_case_types)] @@ -566,7 +563,6 @@ pub mod worker_api_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ConnectWorkerSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -612,7 +608,6 @@ pub mod worker_api_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = KeepAliveSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -658,7 +653,6 @@ pub mod worker_api_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = GoingAwaySvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -702,7 +696,6 @@ pub mod worker_api_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ExecutionResponseSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -724,8 +717,11 @@ pub mod worker_api_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) .body(empty_body()) .unwrap(), ) @@ -746,16 +742,6 @@ pub mod worker_api_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } impl tonic::server::NamedService for WorkerApiServer { const NAME: &'static str = "com.github.trace_machina.nativelink.remote_execution.WorkerApi"; } diff --git a/nativelink-proto/genproto/google.bytestream.pb.rs b/nativelink-proto/genproto/google.bytestream.pb.rs index caf3cee02..4cd7889e6 100644 --- a/nativelink-proto/genproto/google.bytestream.pb.rs +++ b/nativelink-proto/genproto/google.bytestream.pb.rs @@ -85,7 +85,7 @@ pub struct WriteRequest { } /// Response object for ByteStream.Write. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct WriteResponse { /// The number of bytes that have been processed for the given resource. #[prost(int64, tag = "1")] @@ -101,7 +101,7 @@ pub struct QueryWriteStatusRequest { } /// Response object for ByteStream.QueryWriteStatus. #[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] +#[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct QueryWriteStatusResponse { /// The number of bytes that have been processed for the given resource. #[prost(int64, tag = "1")] @@ -427,19 +427,17 @@ pub mod byte_stream_server { /// The errors returned by the service are in the Google canonical error space. #[derive(Debug)] pub struct ByteStreamServer { - inner: _Inner, + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); impl ByteStreamServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -502,7 +500,6 @@ pub mod byte_stream_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/google.bytestream.ByteStream/Read" => { #[allow(non_camel_case_types)] @@ -534,7 +531,6 @@ pub mod byte_stream_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ReadSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -582,7 +578,6 @@ pub mod byte_stream_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = WriteSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -628,7 +623,6 @@ pub mod byte_stream_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = QueryWriteStatusSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -650,8 +644,11 @@ pub mod byte_stream_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) .body(empty_body()) .unwrap(), ) @@ -672,16 +669,6 @@ pub mod byte_stream_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } impl tonic::server::NamedService for ByteStreamServer { const NAME: &'static str = "google.bytestream.ByteStream"; } diff --git a/nativelink-proto/genproto/google.devtools.build.v1.pb.rs b/nativelink-proto/genproto/google.devtools.build.v1.pb.rs index 2ea78bb11..b86683a6b 100644 --- a/nativelink-proto/genproto/google.devtools.build.v1.pb.rs +++ b/nativelink-proto/genproto/google.devtools.build.v1.pb.rs @@ -195,7 +195,7 @@ pub mod build_event { /// Notification of the end of a build event stream published by a build /// component other than CONTROLLER (See StreamId.BuildComponents). #[allow(clippy::derive_partial_eq_without_eq)] - #[derive(Clone, PartialEq, ::prost::Message)] + #[derive(Clone, Copy, PartialEq, ::prost::Message)] pub struct BuildComponentStreamFinished { /// How the event stream finished. #[prost(enumeration = "build_component_stream_finished::FinishType", tag = "1")] @@ -765,19 +765,17 @@ pub mod publish_build_event_server { /// more than one build tool stream for an invocation attempt of a build. #[derive(Debug)] pub struct PublishBuildEventServer { - inner: _Inner, + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); impl PublishBuildEventServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -840,7 +838,6 @@ pub mod publish_build_event_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/google.devtools.build.v1.PublishBuildEvent/PublishLifecycleEvent" => { #[allow(non_camel_case_types)] @@ -875,7 +872,6 @@ pub mod publish_build_event_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = PublishLifecycleEventSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -931,7 +927,6 @@ pub mod publish_build_event_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = PublishBuildToolEventStreamSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -953,8 +948,11 @@ pub mod publish_build_event_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) .body(empty_body()) .unwrap(), ) @@ -975,16 +973,6 @@ pub mod publish_build_event_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } impl tonic::server::NamedService for PublishBuildEventServer { const NAME: &'static str = "google.devtools.build.v1.PublishBuildEvent"; diff --git a/nativelink-proto/genproto/google.longrunning.pb.rs b/nativelink-proto/genproto/google.longrunning.pb.rs index 39eff2c83..1b177d538 100644 --- a/nativelink-proto/genproto/google.longrunning.pb.rs +++ b/nativelink-proto/genproto/google.longrunning.pb.rs @@ -495,19 +495,17 @@ pub mod operations_server { /// so developers can have a consistent client experience. #[derive(Debug)] pub struct OperationsServer { - inner: _Inner, + inner: Arc, accept_compression_encodings: EnabledCompressionEncodings, send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, } - struct _Inner(Arc); impl OperationsServer { pub fn new(inner: T) -> Self { Self::from_arc(Arc::new(inner)) } pub fn from_arc(inner: Arc) -> Self { - let inner = _Inner(inner); Self { inner, accept_compression_encodings: Default::default(), @@ -570,7 +568,6 @@ pub mod operations_server { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); match req.uri().path() { "/google.longrunning.Operations/ListOperations" => { #[allow(non_camel_case_types)] @@ -601,7 +598,6 @@ pub mod operations_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = ListOperationsSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -647,7 +643,6 @@ pub mod operations_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = GetOperationSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -693,7 +688,6 @@ pub mod operations_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = DeleteOperationSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -739,7 +733,6 @@ pub mod operations_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = CancelOperationSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -785,7 +778,6 @@ pub mod operations_server { let max_encoding_message_size = self.max_encoding_message_size; let inner = self.inner.clone(); let fut = async move { - let inner = inner.0; let method = WaitOperationSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) @@ -807,8 +799,11 @@ pub mod operations_server { Ok( http::Response::builder() .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") + .header("grpc-status", tonic::Code::Unimplemented as i32) + .header( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ) .body(empty_body()) .unwrap(), ) @@ -829,16 +824,6 @@ pub mod operations_server { } } } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } impl tonic::server::NamedService for OperationsServer { const NAME: &'static str = "google.longrunning.Operations"; } diff --git a/nativelink-scheduler/Cargo.toml b/nativelink-scheduler/Cargo.toml index ce5ad5885..9f1222199 100644 --- a/nativelink-scheduler/Cargo.toml +++ b/nativelink-scheduler/Cargo.toml @@ -14,11 +14,11 @@ nativelink-metric = { path = "../nativelink-metric" } # files somewhere else. nativelink-store = { path = "../nativelink-store" } -async-lock = "3.3.0" -async-trait = "0.1.80" -blake3 = "1.5.1" -prost = "0.12.4" -uuid = { version = "1.8.0", features = ["v4"] } +async-lock = "3.4.0" +async-trait = "0.1.81" +blake3 = "1.5.2" +prost = "0.13.1" +uuid = { version = "1.10.0", features = ["v4"] } futures = "0.3.30" hashbrown = "0.14" lru = "0.12.3" @@ -26,9 +26,9 @@ mock_instant = "0.3.2" parking_lot = "0.12.2" rand = "0.8.5" scopeguard = "1.2.0" -tokio = { version = "1.37.0", features = ["sync", "rt", "parking_lot"] } +tokio = { version = "1.38.0", features = ["sync", "rt", "parking_lot"] } tokio-stream = { version = "0.1.15", features = ["sync"] } -tonic = { version = "0.11.0", features = ["gzip", "tls"] } +tonic = { version = "0.12.0", features = ["gzip", "tls"] } tracing = "0.1.40" redis = { version = "0.25.2", features = ["aio", "tokio", "json"] } serde = "1.0.203" diff --git a/nativelink-service/BUILD.bazel b/nativelink-service/BUILD.bazel index d44af8cbb..5368683fa 100644 --- a/nativelink-service/BUILD.bazel +++ b/nativelink-service/BUILD.bazel @@ -28,9 +28,12 @@ rust_library( "//nativelink-scheduler", "//nativelink-store", "//nativelink-util", + "@crates//:axum", "@crates//:bytes", "@crates//:futures", - "@crates//:hyper", + "@crates//:http-body", + "@crates//:http-body-util", + "@crates//:hyper-1.4.1", "@crates//:parking_lot", "@crates//:prost", "@crates//:rand", @@ -70,7 +73,9 @@ rust_test_suite( "@crates//:async-lock", "@crates//:bytes", "@crates//:futures", - "@crates//:hyper", + "@crates//:http-body-util", + "@crates//:hyper-1.4.1", + "@crates//:hyper-util", "@crates//:maplit", "@crates//:pretty_assertions", "@crates//:prost", diff --git a/nativelink-service/Cargo.toml b/nativelink-service/Cargo.toml index 9c9f15061..4945b5ebd 100644 --- a/nativelink-service/Cargo.toml +++ b/nativelink-service/Cargo.toml @@ -11,19 +11,22 @@ nativelink-util = { path = "../nativelink-util" } nativelink-store = { path = "../nativelink-store" } nativelink-scheduler = { path = "../nativelink-scheduler" } -bytes = "1.6.0" +axum = "0.7.5" +bytes = "1.6.1" futures = "0.3.30" -hyper = { version = "0.14.28" } +http-body = "1.0.1" +http-body-util = "0.1.2" +hyper = { version = "1.4.1" } serde_json5 = "0.1.0" -parking_lot = "0.12.2" -prost = "0.12.4" +parking_lot = "0.12.3" +prost = "0.13.1" rand = "0.8.5" -tokio = { version = "1.37.0", features = ["sync", "rt"] } +tokio = { version = "1.38.0", features = ["sync", "rt"] } tokio-stream = { version = "0.1.15", features = ["sync"] } -tonic = { version = "0.11.0", features = ["gzip", "tls"] } +tonic = { version = "0.12.0", features = ["gzip", "tls"] } tower = "0.4.13" tracing = "0.1.40" -uuid = { version = "1.8.0", features = ["v4"] } +uuid = { version = "1.10.0", features = ["v4"] } [dev-dependencies] nativelink-macro = { path = "../nativelink-macro" } @@ -31,7 +34,8 @@ nativelink-metric = { path = "../nativelink-metric" } async-trait = "0.1.80" async-lock = "3.3.0" -hyper = "0.14.28" +hyper = "1.4.1" +hyper-util = "0.1.6" maplit = "1.0.2" pretty_assertions = "1.4.0" -prost-types = "0.12.4" +prost-types = "0.13.1" diff --git a/nativelink-service/src/execution_server.rs b/nativelink-service/src/execution_server.rs index 623025cf2..bf47664df 100644 --- a/nativelink-service/src/execution_server.rs +++ b/nativelink-service/src/execution_server.rs @@ -100,7 +100,6 @@ impl InstanceInfo { )?; let timeout = action .timeout - .clone() .map(|v| Duration::new(v.seconds as u64, v.nanos as u32)) .unwrap_or(Duration::MAX); diff --git a/nativelink-service/src/health_server.rs b/nativelink-service/src/health_server.rs index c84e0ad19..b348286ba 100644 --- a/nativelink-service/src/health_server.rs +++ b/nativelink-service/src/health_server.rs @@ -12,14 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::convert::Infallible; use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use axum::body::Body; +use bytes::Bytes; use futures::StreamExt; +use http_body_util::Full; use hyper::header::{HeaderValue, CONTENT_TYPE}; -use hyper::{Body, Request, Response, StatusCode}; +use hyper::{Request, Response, StatusCode}; use nativelink_util::health_utils::{ HealthRegistry, HealthStatus, HealthStatusDescription, HealthStatusReporter, }; @@ -41,9 +45,9 @@ impl HealthServer { } } -impl Service> for HealthServer { - type Response = Response; - type Error = std::convert::Infallible; +impl Service> for HealthServer { + type Response = Response>; + type Error = Infallible; type Future = Pin> + Send>>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { @@ -57,6 +61,7 @@ impl Service> for HealthServer { async move { let health_status_descriptions: Vec = health_registry.health_status_report().collect().await; + match serde_json5::to_string(&health_status_descriptions) { Ok(body) => { let contains_failed_report = @@ -72,14 +77,14 @@ impl Service> for HealthServer { Ok(Response::builder() .status(status_code) .header(CONTENT_TYPE, HeaderValue::from_static(JSON_CONTENT_TYPE)) - .body(Body::from(body)) + .body(Full::new(Bytes::from(body))) .unwrap()) } Err(e) => Ok(Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .header(CONTENT_TYPE, HeaderValue::from_static(JSON_CONTENT_TYPE)) - .body(Body::from(format!("Internal Failure: {e:?}"))) + .body(Full::new(Bytes::from(format!("Internal Failure: {e:?}")))) .unwrap()), } }, diff --git a/nativelink-service/tests/bep_server_test.rs b/nativelink-service/tests/bep_server_test.rs index 27594d5b4..2770e7e86 100644 --- a/nativelink-service/tests/bep_server_test.rs +++ b/nativelink-service/tests/bep_server_test.rs @@ -16,7 +16,7 @@ use std::borrow::Cow; use std::sync::Arc; use futures::StreamExt; -use hyper::Body; +use hyper::body::Frame; use nativelink_config::cas_server::BepConfig; use nativelink_error::{Error, ResultExt}; use nativelink_macro::nativelink_test; @@ -36,6 +36,7 @@ use nativelink_service::bep_server::BepServer; use nativelink_store::default_store_factory::store_factory; use nativelink_store::store_manager::StoreManager; use nativelink_util::buf_channel::make_buf_channel_pair; +use nativelink_util::channel_body_for_tests::ChannelBody; use nativelink_util::common::encode_stream_proto; use nativelink_util::store_trait::{Store, StoreKey, StoreLike}; use pretty_assertions::assert_eq; @@ -153,9 +154,9 @@ async fn publish_build_tool_event_stream_test() -> Result<(), Box::default(); let stream = Streaming::new_request(codec.decoder(), body, None, None); let stream = bep_server @@ -300,7 +301,7 @@ async fn publish_build_tool_event_stream_test() -> Result<(), Box) -> (Sender, Streaming) { - let (tx, body) = Body::channel(); +fn make_stream( + encoding: Option, +) -> (mpsc::Sender>, Streaming) { + let (tx, body) = ChannelBody::new(); let mut codec = ProstCodec::::default(); let stream = Streaming::new_request(codec.decoder(), body, encoding, None); (tx, stream) } +type JoinHandle = JoinHandleDropGuard, tonic::Status>>; + fn make_stream_and_writer_spawn( bs_server: Arc, encoding: Option, -) -> ( - Sender, - JoinHandleDropGuard, tonic::Status>>, -) { +) -> (mpsc::Sender>, JoinHandle) { let (tx, stream) = make_stream(encoding); let join_handle = spawn!("bs_server_write", async move { bs_server.write(Request::new(stream)).await - },); + }); (tx, join_handle) } @@ -131,14 +137,28 @@ async fn server_and_client_stub( } let server_spawn = spawn!("grpc_server", async move { - let http = Http::new().with_executor(Executor); - let bs_service = bs_server.into_service(); + let http = auto::Builder::new(Executor); + let grpc_service = tonic::service::Routes::new(bs_server.into_service()); + + let adapted_service = tower::ServiceBuilder::new() + .map_request(|req: hyper::Request| { + let (parts, body) = req.into_parts(); + let body = body + .map_err(|e| tonic::Status::internal(e.to_string())) + .boxed_unsync(); + hyper::Request::from_parts(parts, body) + }) + .service(grpc_service); + + let hyper_service = TowerToHyperService::new(adapted_service); while let Some(stream) = rx.next().await { - let stream = stream.expect("Failed to get stream"); - http.serve_connection(stream, bs_service.clone()) - .await - .expect("Connection failed"); + http.serve_connection_with_upgrades( + TokioIo::new(stream.expect("Failed to get stream")), + hyper_service.clone(), + ) + .await + .expect("Connection failed"); } }); @@ -153,7 +173,7 @@ async fn server_and_client_stub( const MAX_BUFFER_SIZE: usize = 4096; let (client, server) = tokio::io::duplex(MAX_BUFFER_SIZE); tx.send(Ok(server)).unwrap(); - Result::<_, Error>::Ok(client) + Result::<_, Error>::Ok(TokioIo::new(client)) } })) .await @@ -173,7 +193,7 @@ pub async fn chunked_stream_receives_all_data() -> Result<(), Box Result<(), Box Result<(), Box> { ); let store = store_manager.get_store("main_cas").unwrap(); - let (mut tx, join_handle) = + let (tx, join_handle) = make_stream_and_writer_spawn(bs_server.clone(), Some(CompressionEncoding::Gzip)); const WRITE_DATA: &str = "12456789abcdefghijk"; @@ -278,7 +301,8 @@ pub async fn resume_write_success() -> Result<(), Box> { // Write first chunk of data. write_request.write_offset = 0; write_request.data = WRITE_DATA[..BYTE_SPLIT_OFFSET].into(); - tx.send_data(encode_stream_proto(&write_request)?).await?; + tx.send(Frame::data(encode_stream_proto(&write_request)?)) + .await?; } { // Now disconnect our stream. @@ -287,14 +311,15 @@ pub async fn resume_write_success() -> Result<(), Box> { assert_eq!(result.is_err(), true, "Expected error to be returned"); } // Now reconnect. - let (mut tx, join_handle) = + let (tx, join_handle) = make_stream_and_writer_spawn(bs_server, Some(CompressionEncoding::Gzip)); { // Write the remainder of our data. write_request.write_offset = BYTE_SPLIT_OFFSET as i64; write_request.finish_write = true; write_request.data = WRITE_DATA[BYTE_SPLIT_OFFSET..].into(); - tx.send_data(encode_stream_proto(&write_request)?).await?; + tx.send(Frame::data(encode_stream_proto(&write_request)?)) + .await?; } { // Now disconnect our stream. @@ -324,7 +349,7 @@ pub async fn restart_write_success() -> Result<(), Box> { ); let store = store_manager.get_store("main_cas").unwrap(); - let (mut tx, join_handle) = + let (tx, join_handle) = make_stream_and_writer_spawn(bs_server.clone(), Some(CompressionEncoding::Gzip)); const WRITE_DATA: &str = "12456789abcdefghijk"; @@ -349,7 +374,8 @@ pub async fn restart_write_success() -> Result<(), Box> { // Write first chunk of data. write_request.write_offset = 0; write_request.data = WRITE_DATA[..BYTE_SPLIT_OFFSET].into(); - tx.send_data(encode_stream_proto(&write_request)?).await?; + tx.send(Frame::data(encode_stream_proto(&write_request)?)) + .await?; } { // Now disconnect our stream. @@ -358,20 +384,22 @@ pub async fn restart_write_success() -> Result<(), Box> { assert_eq!(result.is_err(), true, "Expected error to be returned"); } // Now reconnect. - let (mut tx, join_handle) = + let (tx, join_handle) = make_stream_and_writer_spawn(bs_server, Some(CompressionEncoding::Gzip)); { // Write first chunk of data again. write_request.write_offset = 0; write_request.data = WRITE_DATA[..BYTE_SPLIT_OFFSET].into(); - tx.send_data(encode_stream_proto(&write_request)?).await?; + tx.send(Frame::data(encode_stream_proto(&write_request)?)) + .await?; } { // Write the remainder of our data. write_request.write_offset = BYTE_SPLIT_OFFSET as i64; write_request.finish_write = true; write_request.data = WRITE_DATA[BYTE_SPLIT_OFFSET..].into(); - tx.send_data(encode_stream_proto(&write_request)?).await?; + tx.send(Frame::data(encode_stream_proto(&write_request)?)) + .await?; } { // Now disconnect our stream. @@ -399,7 +427,7 @@ pub async fn restart_mid_stream_write_success() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box> make_bytestream_server(store_manager.as_ref(), None).expect("Failed to make server"), ); - let (mut tx, join_handle) = + let (tx, join_handle) = make_stream_and_writer_spawn(bs_server, Some(CompressionEncoding::Gzip)); const WRITE_DATA: &str = "12456789abcdefghijk"; @@ -567,13 +600,15 @@ pub async fn out_of_order_data_fails() -> Result<(), Box> // Write first chunk of data. write_request.write_offset = 0; write_request.data = WRITE_DATA[..BYTE_SPLIT_OFFSET].into(); - tx.send_data(encode_stream_proto(&write_request)?).await?; + tx.send(Frame::data(encode_stream_proto(&write_request)?)) + .await?; } { // Write data it already has. write_request.write_offset = (BYTE_SPLIT_OFFSET - 1) as i64; write_request.data = WRITE_DATA[(BYTE_SPLIT_OFFSET - 1)..].into(); - tx.send_data(encode_stream_proto(&write_request)?).await?; + tx.send(Frame::data(encode_stream_proto(&write_request)?)) + .await?; } assert!( join_handle.await.expect("Failed to join").is_err(), @@ -584,7 +619,7 @@ pub async fn out_of_order_data_fails() -> Result<(), Box> write_request.write_offset = (BYTE_SPLIT_OFFSET - 1) as i64; write_request.data = WRITE_DATA[(BYTE_SPLIT_OFFSET - 1)..].into(); assert!( - tx.send_data(encode_stream_proto(&write_request)?) + tx.send(Frame::data(encode_stream_proto(&write_request)?)) .await .is_err(), "Expected error to be returned" @@ -601,7 +636,7 @@ pub async fn upload_zero_byte_chunk() -> Result<(), Box> ); let store = store_manager.get_store("main_cas").unwrap(); - let (mut tx, join_handle) = + let (tx, join_handle) = make_stream_and_writer_spawn(bs_server, Some(CompressionEncoding::Gzip)); let resource_name = make_resource_name(0); @@ -614,7 +649,8 @@ pub async fn upload_zero_byte_chunk() -> Result<(), Box> { // Write our zero byte data. - tx.send_data(encode_stream_proto(&write_request)?).await?; + tx.send(Frame::data(encode_stream_proto(&write_request)?)) + .await?; // Wait for stream to finish. join_handle .await @@ -638,7 +674,7 @@ pub async fn disallow_negative_write_offset() -> Result<(), Box Result<(), Box Result<(), Box> { make_bytestream_server(store_manager.as_ref(), None).expect("Failed to make server"), ); - let (mut tx, join_handle) = + let (tx, join_handle) = make_stream_and_writer_spawn(bs_server, Some(CompressionEncoding::Gzip)); let resource_name = make_resource_name(100); @@ -678,7 +715,8 @@ pub async fn out_of_sequence_write() -> Result<(), Box> { { // Write our zero byte data. - tx.send_data(encode_stream_proto(&write_request)?).await?; + tx.send(Frame::data(encode_stream_proto(&write_request)?)) + .await?; // Expect the write command to fail. assert!(join_handle.await.expect("Failed to join").is_err()); } @@ -844,7 +882,7 @@ pub async fn test_query_write_status_smoke_test() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box>, + } +} + +// Note: At the moment this is only used in a few tests after tonic removed its +// concrete Body type. See `nativelink_util::buf_channel` for channel +// implementations used in non-test code and see the tests for example usage. +impl ChannelBody { + #[must_use] + pub fn new() -> (mpsc::Sender>, Self) { + Self::with_buffer_size(DEFAULT_CHANNEL_BODY_BUFFER_SIZE) + } + + #[must_use] + pub fn with_buffer_size(buffer_size: usize) -> (mpsc::Sender>, Self) { + let (tx, rx) = mpsc::channel(buffer_size); + (tx, Self { rx }) + } +} + +impl Body for ChannelBody { + type Data = Bytes; + type Error = tonic::Status; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + let frame = ready!(self.project().rx.poll_recv(cx)); + Poll::Ready(frame.map(Ok)) + } + + fn is_end_stream(&self) -> bool { + self.rx.is_closed() + } +} diff --git a/nativelink-util/src/connection_manager.rs b/nativelink-util/src/connection_manager.rs index 1d80f63c3..dacc0f1e8 100644 --- a/nativelink-util/src/connection_manager.rs +++ b/nativelink-util/src/connection_manager.rs @@ -429,7 +429,7 @@ pub struct ResponseFuture { /// This is mostly copied from tonic::transport::channel except it wraps it /// to allow messaging about connection success and failure. impl tonic::codegen::Service> for Connection { - type Response = tonic::codegen::http::Response; + type Response = tonic::codegen::http::Response; type Error = tonic::transport::Error; type Future = ResponseFuture; @@ -479,7 +479,7 @@ impl tonic::codegen::Service /// to allow messaging about connection failure. impl Future for ResponseFuture { type Output = - Result, tonic::transport::Error>; + Result, tonic::transport::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let result = Pin::new(&mut self.inner).poll(cx); diff --git a/nativelink-util/src/lib.rs b/nativelink-util/src/lib.rs index a4b22aac3..15d2ec976 100644 --- a/nativelink-util/src/lib.rs +++ b/nativelink-util/src/lib.rs @@ -14,6 +14,7 @@ pub mod action_messages; pub mod buf_channel; +pub mod channel_body_for_tests; pub mod chunked_stream; pub mod common; pub mod connection_manager; diff --git a/nativelink-util/tests/channel_body_for_tests_test.rs b/nativelink-util/tests/channel_body_for_tests_test.rs new file mode 100644 index 000000000..dec5b412a --- /dev/null +++ b/nativelink-util/tests/channel_body_for_tests_test.rs @@ -0,0 +1,206 @@ +// Copyright 2024 The NativeLink Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use bytes::Bytes; +use futures::future; +use http_body_util::BodyExt; +use hyper::body::{Body, Frame}; +use hyper::http::HeaderMap; +use nativelink_macro::nativelink_test; +use nativelink_util::channel_body_for_tests::ChannelBody; +use nativelink_util::spawn; +use tokio::sync::mpsc::error::TrySendError; + +#[nativelink_test] +async fn test_channel_body_hello() { + let (tx, mut body) = ChannelBody::new(); + tx.send(Frame::data(Bytes::from("Hello"))).await.unwrap(); + drop(tx); + let frame = body.frame().await.unwrap().unwrap(); + assert_eq!(frame.into_data().unwrap(), Bytes::from("Hello")); + assert!(body.frame().await.is_none()); +} + +#[nativelink_test] +async fn test_channel_body_concurrent() { + let (tx, body) = ChannelBody::new(); + + let send_task = spawn!("send", async move { + for i in 0..10 { + tx.send(Frame::data(Bytes::from(format!("Frame{i}")))) + .await + .unwrap(); + } + }); + + let receive_task = spawn!("receive", async move { + let mut body = body; + for i in 0..10 { + let frame = body.frame().await.unwrap().unwrap(); + assert_eq!(frame.into_data().unwrap(), Bytes::from(format!("Frame{i}"))); + } + }); + + let (send_result, receive_result) = future::join(send_task, receive_task).await; + send_result.unwrap(); + receive_result.unwrap(); +} + +#[nativelink_test] +async fn test_channel_body_abrupt_end() { + let (tx, mut body) = ChannelBody::new(); + + tx.send(Frame::data(Bytes::from("Frame1"))).await.unwrap(); + tx.send(Frame::data(Bytes::from("Frame2"))).await.unwrap(); + + drop(tx); + + assert_eq!( + body.frame().await.unwrap().unwrap().into_data().unwrap(), + Bytes::from("Frame1") + ); + assert_eq!( + body.frame().await.unwrap().unwrap().into_data().unwrap(), + Bytes::from("Frame2") + ); + + // Ensure that the stream has ended after dropping the sender + assert!(body.frame().await.is_none()); +} + +#[nativelink_test] +async fn test_channel_body_custom_buffer_size() { + let (tx, mut body) = ChannelBody::with_buffer_size(5); + + for i in 0..5 { + tx.send(Frame::data(Bytes::from(format!("Frame{i}")))) + .await + .unwrap(); + } + + // Attempt to send a frame when the buffer is full + let send_result = tx.try_send(Frame::data(Bytes::from("Frame5"))); + assert!(send_result.is_err()); + + for i in 0..5 { + let frame = body.frame().await.unwrap().unwrap(); + assert_eq!(frame.into_data().unwrap(), Bytes::from(format!("Frame{i}"))); + } +} + +#[nativelink_test] +async fn test_channel_body_is_end_stream() { + let (tx, body) = ChannelBody::new(); + + assert!(!body.is_end_stream()); + + tx.send(Frame::data(Bytes::from("Data"))).await.unwrap(); + assert!(!body.is_end_stream()); + + drop(tx); + assert!(body.is_end_stream()); +} + +#[nativelink_test] +async fn test_channel_body_send_error() { + let (tx, body) = ChannelBody::with_buffer_size(1); + + tx.send(Frame::data(Bytes::from("Data"))).await.unwrap(); + + // Attempt to send when the buffer is full + let result = tx.try_send(Frame::data(Bytes::from("More Data"))); + assert!(matches!(result, Err(TrySendError::Full(_)))); + + drop(body); + + // Attempt to send after the receiver has been dropped + let result = tx.try_send(Frame::data(Bytes::from("Even More Data"))); + assert!(matches!(result, Err(TrySendError::Closed(_)))); +} + +#[nativelink_test] +async fn test_channel_body_cancellation() { + let (tx, body) = ChannelBody::new(); + let tx_clone = tx.clone(); // Create a clone for later use + + let send_task = spawn!("send", async move { + for i in 0..5 { + match tx.send(Frame::data(Bytes::from(format!("Frame{i}")))).await { + Ok(()) => {} + Err(_) => break, // Stop if the receiver has been dropped + } + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + } + }); + + let receive_task = spawn!("receive", async move { + let mut body = body; + for _ in 0..2 { + match body.frame().await { + Some(Ok(frame)) => { + println!("Received: {:?}", frame.into_data().unwrap()); + } + _ => break, + } + } + // Simulate cancellation by dropping the body + drop(body); + }); + + let (send_result, receive_result) = future::join(send_task, receive_task).await; + send_result.unwrap(); + receive_result.unwrap(); + + // Use the cloned sender to check if the channel is closed + assert!(tx_clone.is_closed()); +} + +#[nativelink_test] +async fn test_channel_body_mixed_frame_types() { + let (tx, mut body) = ChannelBody::new(); + + let mut trailers1 = HeaderMap::new(); + trailers1.insert("X-Trailer-1", "Value1".parse().unwrap()); + let mut trailers2 = HeaderMap::new(); + trailers2.insert("X-Trailer-2", "Value2".parse().unwrap()); + + // Send a mix of data frames and trailers + tx.send(Frame::data(Bytes::from("Data1"))).await.unwrap(); + tx.send(Frame::trailers(trailers1.clone())).await.unwrap(); + tx.send(Frame::data(Bytes::from("Data2"))).await.unwrap(); + tx.send(Frame::trailers(trailers2.clone())).await.unwrap(); + tx.send(Frame::data(Bytes::from("Data3"))).await.unwrap(); + + // Drop the sender to signal the end of the stream + drop(tx); + + // Verify that frames are received in the correct order and type + let frame = body.frame().await.unwrap().unwrap(); + assert_eq!(frame.into_data().unwrap(), Bytes::from("Data1")); + + let frame = body.frame().await.unwrap().unwrap(); + assert_eq!(frame.into_trailers().unwrap(), trailers1); + + let frame = body.frame().await.unwrap().unwrap(); + assert_eq!(frame.into_data().unwrap(), Bytes::from("Data2")); + + let frame = body.frame().await.unwrap().unwrap(); + assert_eq!(frame.into_trailers().unwrap(), trailers2); + + let frame = body.frame().await.unwrap().unwrap(); + assert_eq!(frame.into_data().unwrap(), Bytes::from("Data3")); + + // Ensure the stream has ended + assert!(body.frame().await.is_none()); +} diff --git a/nativelink-worker/BUILD.bazel b/nativelink-worker/BUILD.bazel index d0c78662c..8171132f0 100644 --- a/nativelink-worker/BUILD.bazel +++ b/nativelink-worker/BUILD.bazel @@ -71,10 +71,12 @@ rust_test_suite( "//nativelink-store", "//nativelink-util", "@crates//:async-lock", + "@crates//:bytes", "@crates//:futures", "@crates//:hex", - "@crates//:hyper", + "@crates//:hyper-1.4.1", "@crates//:once_cell", + "@crates//:pin-project-lite", "@crates//:pretty_assertions", "@crates//:prost", "@crates//:prost-types", diff --git a/nativelink-worker/Cargo.toml b/nativelink-worker/Cargo.toml index a846e4502..12c734ebf 100644 --- a/nativelink-worker/Cargo.toml +++ b/nativelink-worker/Cargo.toml @@ -15,30 +15,31 @@ nativelink-metric = { path = "../nativelink-metric" } # functionality out of the schedulers. nativelink-scheduler = { path = "../nativelink-scheduler" } -async-lock = "3.3.0" -bytes = "1.6.0" +async-lock = "3.4.0" +bytes = "1.6.1" filetime = "0.2.23" formatx = "0.2.2" futures = "0.3.30" hex = "0.4.3" -parking_lot = "0.12.2" -prost = "0.12.4" +parking_lot = "0.12.3" +prost = "0.13.1" relative-path = "1.9.3" scopeguard = "1.2.0" -serde = "1.0.201" +serde = "1.0.204" serde_json5 = "0.1.0" shlex = "1.3.0" -tokio = { version = "1.37.0", features = ["sync", "rt", "process"] } +tokio = { version = "1.38.0", features = ["sync", "rt", "process"] } tokio-stream = { version = "0.1.15", features = ["fs"] } -tonic = { version = "0.11.0", features = ["gzip", "tls"] } +tonic = { version = "0.12.0", features = ["gzip", "tls"] } tracing = "0.1.40" -uuid = { version = "1.8.0", features = ["v4"] } +uuid = { version = "1.10.0", features = ["v4"] } [dev-dependencies] nativelink-macro = { path = "../nativelink-macro" } -hyper = "0.14.28" +hyper = "1.4.1" +hyper-util = "0.1.6" once_cell = "1.19.0" pretty_assertions = "1.4.0" -prost-types = "0.12.4" +prost-types = "0.13.1" rand = "0.8.5" diff --git a/nativelink-worker/src/running_actions_manager.rs b/nativelink-worker/src/running_actions_manager.rs index 8f6914edf..15d82de51 100644 --- a/nativelink-worker/src/running_actions_manager.rs +++ b/nativelink-worker/src/running_actions_manager.rs @@ -1789,7 +1789,6 @@ impl RunningActionsManager for RunningActionsManagerImpl { .wrap(async move { let queued_timestamp = start_execute .queued_timestamp - .clone() .and_then(|time| time.try_into().ok()) .unwrap_or(SystemTime::UNIX_EPOCH); let operation_id: OperationId = start_execute diff --git a/nativelink-worker/tests/local_worker_test.rs b/nativelink-worker/tests/local_worker_test.rs index 1036db07d..3feff1b5c 100644 --- a/nativelink-worker/tests/local_worker_test.rs +++ b/nativelink-worker/tests/local_worker_test.rs @@ -29,6 +29,7 @@ mod utils { pub(crate) mod mock_running_actions_manager; } +use hyper::body::Frame; use nativelink_config::cas_server::{LocalWorkerConfig, WorkerProperty}; use nativelink_error::{make_err, make_input_err, Code, Error}; use nativelink_macro::nativelink_test; @@ -142,7 +143,7 @@ async fn reconnect_on_server_disconnect_test() -> Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box Result<(), Box (HyperSender, Response>) { - let (tx, body) = Body::channel(); +pub fn setup_grpc_stream() -> ( + mpsc::Sender>, + Response>, +) { + let (tx, body) = ChannelBody::new(); let mut codec = ProstCodec::::default(); - // Note: This is an undocumented function. let stream = Streaming::new_request(codec.decoder(), body, Some(CompressionEncoding::Gzip), None); (tx, Response::new(stream)) @@ -225,7 +228,7 @@ pub struct TestContext { pub actions_manager: Arc, pub maybe_streaming_response: Option>>, - pub maybe_tx_stream: Option, + pub maybe_tx_stream: Option>>, _drop_guard: JoinHandleDropGuard>, } diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs index fdc07f1ef..21f1b073f 100644 --- a/src/bin/nativelink.rs +++ b/src/bin/nativelink.rs @@ -21,8 +21,10 @@ use async_lock::Mutex as AsyncMutex; use axum::Router; use clap::Parser; use futures::future::{select_all, BoxFuture, Either, OptionFuture, TryFutureExt}; -use hyper::server::conn::Http; -use hyper::{Body, Response, StatusCode}; +use hyper::{Response, StatusCode}; +use hyper_util::rt::tokio::TokioIo; +use hyper_util::server::conn::auto; +use hyper_util::service::TowerToHyperService; use mimalloc::MiMalloc; use nativelink_config::cas_server::{ CasConfig, GlobalConfig, HttpCompressionAlgorithm, ListenerConfig, ServerConfig, WorkerConfig, @@ -72,7 +74,6 @@ use tokio_rustls::rustls::{RootCertStore, ServerConfig as TlsServerConfig}; use tokio_rustls::TlsAcceptor; use tonic::codec::CompressionEncoding; use tonic::transport::Server as TonicServer; -use tower::util::ServiceExt; use tracing::{error_span, event, trace_span, Level}; use tracing_subscriber::layer::SubscriberExt; @@ -441,8 +442,9 @@ async fn inner_main( let health_registry = health_registry_builder.lock().await.build(); let mut svc = Router::new() + .merge(tonic_services.into_router()) // This is the default service that executes if no other endpoint matches. - .fallback_service(tonic_services.into_service().map_err(|e| panic!("{e}"))); + .fallback((StatusCode::NOT_FOUND, "Not Found")); if let Some(health_cfg) = services.health { let path = if health_cfg.path.is_empty() { @@ -469,7 +471,7 @@ async fn inner_main( svc = svc.route_service( path, - axum::routing::get(move |request: hyper::Request| { + axum::routing::get(move |request: hyper::Request| { Arc::new(OriginContext::new()).wrap_async( trace_span!("prometheus_ctx"), async move { @@ -538,7 +540,8 @@ async fn inner_main( }, )? }; - let mut response = Response::new(Body::from(json_data)); + let mut response = + Response::new(axum::body::Body::from(json_data)); response.headers_mut().insert( hyper::header::CONTENT_TYPE, hyper::header::HeaderValue::from_static( @@ -561,7 +564,8 @@ async fn inner_main( TextEncoder::new() .encode(®istry.gather(), &mut result) .unwrap(); - let mut response = Response::new(Body::from(result)); + let mut response = + Response::new(axum::body::Body::from(result)); // Per spec we should probably use `application/openmetrics-text; version=1.0.0; charset=utf-8` // https://github.com/OpenObservability/OpenMetrics/blob/1386544931307dff279688f332890c31b6c5de36/specification/OpenMetrics.md#overall-structure // However, this makes debugging more difficult, so we use the old text/plain instead. @@ -722,46 +726,49 @@ async fn inner_main( let socket_addr = http_config.socket_address.parse::()?; let tcp_listener = TcpListener::bind(&socket_addr).await?; - let mut http = Http::new().with_executor(TaskExecutor::default()); + let mut http = auto::Builder::new(TaskExecutor::default()); let http_config = &http_config.advanced_http; if let Some(value) = http_config.http2_keep_alive_interval { - http.http2_keep_alive_interval(Duration::from_secs(u64::from(value))); + http.http2() + .keep_alive_interval(Duration::from_secs(u64::from(value))); } if let Some(value) = http_config.experimental_http2_max_pending_accept_reset_streams { - http.http2_max_pending_accept_reset_streams(usize::try_from(value).err_tip(|| { - "Could not convert experimental_http2_max_pending_accept_reset_streams" - })?); + http.http2() + .max_pending_accept_reset_streams(usize::try_from(value).err_tip(|| { + "Could not convert experimental_http2_max_pending_accept_reset_streams" + })?); } if let Some(value) = http_config.experimental_http2_initial_stream_window_size { - http.http2_initial_stream_window_size(value); + http.http2().initial_stream_window_size(value); } if let Some(value) = http_config.experimental_http2_initial_connection_window_size { - http.http2_initial_connection_window_size(value); + http.http2().initial_connection_window_size(value); } if let Some(value) = http_config.experimental_http2_adaptive_window { - http.http2_adaptive_window(value); + http.http2().adaptive_window(value); } if let Some(value) = http_config.experimental_http2_max_frame_size { - http.http2_max_frame_size(value); + http.http2().max_frame_size(value); } if let Some(value) = http_config.experimental_http2_max_concurrent_streams { - http.http2_max_concurrent_streams(value); + http.http2().max_concurrent_streams(value); } if let Some(value) = http_config.experimental_http2_keep_alive_timeout { - http.http2_keep_alive_timeout(Duration::from_secs(u64::from(value))); + http.http2() + .keep_alive_timeout(Duration::from_secs(u64::from(value))); } if let Some(value) = http_config.experimental_http2_max_send_buf_size { - http.http2_max_send_buf_size( + http.http2().max_send_buf_size( usize::try_from(value).err_tip(|| "Could not convert http2_max_send_buf_size")?, ); } if let Some(true) = http_config.experimental_http2_enable_connect_protocol { - http.http2_enable_connect_protocol(); + http.http2().enable_connect_protocol(); } if let Some(value) = http_config.experimental_http2_max_header_list_size { - http.http2_max_header_list_size(value); + http.http2().max_header_list_size(value); } event!(Level::WARN, "Ready, listening on {socket_addr}",); @@ -828,8 +835,8 @@ async fn inner_main( let serve_connection = if let Some(tls_acceptor) = maybe_tls_acceptor { match tls_acceptor.accept(tcp_stream).await { Ok(tls_stream) => Either::Left(http.serve_connection( - tls_stream, - svc, + TokioIo::new(tls_stream), + TowerToHyperService::new(svc), )), Err(err) => { event!(Level::ERROR, ?err, "Failed to accept tls stream"); @@ -838,8 +845,8 @@ async fn inner_main( } } else { Either::Right(http.serve_connection( - tcp_stream, - svc, + TokioIo::new(tcp_stream), + TowerToHyperService::new(svc), )) };