diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 15b1ab4..c60c04f 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -37,7 +37,30 @@ jobs: uses: actions-rs/cargo@v1.0.3 with: command: check - args: --all-targets --all-features --workspace + args: --all-targets --workspace + + build-futures: + name: Cargo check futures channel + runs-on: ubuntu-latest + steps: + - name: Checkout sources + uses: actions/checkout@v2 + + - name: Install Rust stable toolchain + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + + - name: Rust Cache + uses: Swatinem/rust-cache@v1.3.0 + + - name: Build + uses: actions-rs/cargo@v1.0.3 + with: + command: check + args: --no-default-features --features futures_channel --manifest-path metered-channel/Cargo.toml fmt: name: Cargo fmt @@ -85,6 +108,28 @@ jobs: with: command: test args: --all-targets --workspace + tests-futures: + name: Cargo test with futures + runs-on: ubuntu-latest + steps: + - name: Checkout sources + uses: actions/checkout@v2 + + - name: Install Rust stable toolchain + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + + - name: Rust Cache + uses: Swatinem/rust-cache@v1.3.0 + + - name: Cargo test + uses: actions-rs/cargo@v1.0.3 + with: + command: test + args: --no-default-features --features futures_channel --manifest-path metered-channel/Cargo.toml clippy: name: Cargo clippy diff --git a/Cargo.lock b/Cargo.lock index 194ceaf..49abb35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4,9 +4,9 @@ version = 3 [[package]] name = "aho-corasick" -version = "0.7.19" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4f55bd91a0978cbfd91c457a164bab8b4001c833b7f323132c0a4e1922dd44e" +checksum = "43f6cb1bf222025340178f382c426f13757b2960e89779dfcb319c32542a5a41" dependencies = [ "memchr", ] @@ -36,13 +36,13 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.57" +version = "0.1.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76464446b8bc32758d7e88ee1a804d9914cd9b1cb264c029899680b0be29826f" +checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.22", ] [[package]] @@ -51,7 +51,7 @@ version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" dependencies = [ - "hermit-abi", + "hermit-abi 0.1.19", "libc", "winapi", ] @@ -62,6 +62,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "basic-toml" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c0de75129aa8d0cceaf750b89013f0e08804d6ec61416da787b35ad0d7cddf1" +dependencies = [ + "serde", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -70,27 +79,27 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "blake2" -version = "0.10.4" +version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9cf849ee05b2ee5fba5e36f97ff8ec2533916700fc0758d40d92136a42f3388" +checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe" dependencies = [ "digest", ] [[package]] name = "block-buffer" -version = "0.10.3" +version = "0.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69cce20737498f97b993470a6e536b8523f0af7892a4f928cceb1ac5e52ebe7e" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" dependencies = [ "generic-array", ] [[package]] name = "bumpalo" -version = "3.12.0" +version = "3.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0d261e256854913907f67ed06efbc3338dfe6179796deefc1ff763fc1aee5535" +checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1" [[package]] name = "cast" @@ -139,7 +148,7 @@ checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123" dependencies = [ "bitflags", "clap_lex", - "indexmap", + "indexmap 1.9.3", "textwrap", ] @@ -154,9 +163,9 @@ dependencies = [ [[package]] name = "coarsetime" -version = "0.1.22" +version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "454038500439e141804c655b4cd1bc6a70bcb95cd2bc9463af5661b6956f0e46" +checksum = "a90d114103adbc625300f346d4d09dfb4ab1c4a8df6868435dd903392ecf4354" dependencies = [ "libc", "once_cell", @@ -166,9 +175,9 @@ dependencies = [ [[package]] name = "concurrent-queue" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c278839b831783b70278b14df4d45e1beb1aad306c07bb796637de9a0e323e8e" +checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c" dependencies = [ "crossbeam-utils", ] @@ -251,9 +260,9 @@ dependencies = [ [[package]] name = "crossbeam-queue" -version = "0.3.6" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cd42583b04998a5363558e5f9291ee5a5ff6b49944332103f251e7479a82aa7" +checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add" dependencies = [ "cfg-if", "crossbeam-utils", @@ -261,9 +270,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.12" +version = "0.8.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edbafec5fa1f196ca66527c1b12c2ec4745ca14b50f1ad8f9f6f720b55d11fac" +checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" dependencies = [ "cfg-if", ] @@ -288,14 +297,14 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn", + "syn 1.0.109", ] [[package]] name = "digest" -version = "0.10.5" +version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adfbc57365a37acbd2ebf2b64d7e69bb766e2fea813521ed536f5d0520dcf86c" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", "crypto-common", @@ -320,26 +329,26 @@ checksum = "558e40ea573c374cf53507fd240b7ee2f5477df7cfebdb97323ec61c719399c5" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] name = "dyn-clone" -version = "1.0.9" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f94fa09c2aeea5b8839e414b7b841bf429fd25b9c522116ac97ee87856d88b2" +checksum = "68b0cf012f1230e43cd00ebb729c6bb58707ecfa8ad08b52ef3a4ccd2697fc30" [[package]] name = "either" -version = "1.8.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797" +checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91" [[package]] name = "env_logger" -version = "0.9.1" +version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c90bf5f19754d10198ccb95b70664fc925bd1fc090a0fd9a6ebc54acc8cd6272" +checksum = "a12e6657c4c97ebab115a42dcee77225f7f482cdd841cf7088c657a42e9e00e7" dependencies = [ "atty", "humantime", @@ -348,6 +357,12 @@ dependencies = [ "termcolor", ] +[[package]] +name = "equivalent" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88bffebc5d80432c9b140ee17875ff173a8ab62faad5b257da912bd2f6c1c0a1" + [[package]] name = "event-listener" version = "2.5.3" @@ -374,15 +389,15 @@ checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" [[package]] name = "fs-err" -version = "2.8.1" +version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64db3e262960f0662f43a6366788d5f10f7f244b8f7d7d987f560baf5ded5c50" +checksum = "0845fa252299212f0389d64ba26f34fa32cfe41588355f21ed507c59a0f64541" [[package]] name = "futures" -version = "0.3.24" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f21eda599937fba36daeb58a22e8f5cee2d14c4a17b5b7739c7c8e5e3b8230c" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" dependencies = [ "futures-channel", "futures-core", @@ -395,9 +410,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.24" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30bdd20c28fadd505d0fd6712cdfcb0d4b5648baf45faef7f852afb2399bb050" +checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" dependencies = [ "futures-core", "futures-sink", @@ -405,15 +420,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.24" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e5aa3de05362c3fb88de6531e6296e85cde7739cccad4b9dfeeb7f6ebce56bf" +checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" [[package]] name = "futures-executor" -version = "0.3.24" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ff63c23854bee61b6e9cd331d523909f238fc7636290b96826e9cfa5faa00ab" +checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" dependencies = [ "futures-core", "futures-task", @@ -423,32 +438,32 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.24" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbf4d2a7a308fd4578637c0b17c7e1c7ba127b8f6ba00b29f717e9655d85eb68" +checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" [[package]] name = "futures-macro" -version = "0.3.24" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42cd15d1c7456c04dbdf7e88bcd69760d74f3a798d6444e16974b505b0e62f17" +checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.22", ] [[package]] name = "futures-sink" -version = "0.3.24" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21b20ba5a92e727ba30e72834706623d94ac93a725410b6a6b6fbc1b07f7ba56" +checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" [[package]] name = "futures-task" -version = "0.3.24" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6508c467c73851293f390476d4491cf4d227dbabcd4170f3bb6044959b294f1" +checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" [[package]] name = "futures-timer" @@ -458,9 +473,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" [[package]] name = "futures-util" -version = "0.3.24" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44fb6cb1be61cc1d2e43b262516aafcf63b241cffdb1d3fa115f91d9c7b09c90" +checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" dependencies = [ "futures-channel", "futures-core", @@ -476,9 +491,9 @@ dependencies = [ [[package]] name = "generic-array" -version = "0.14.6" +version = "0.14.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bff49e947297f3312447abdca79f45f4738097cc82b06e72054d2223f601f1b9" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" dependencies = [ "typenum", "version_check", @@ -486,9 +501,9 @@ dependencies = [ [[package]] name = "glob" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "half" @@ -502,6 +517,12 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -511,6 +532,15 @@ dependencies = [ "libc", ] +[[package]] +name = "hermit-abi" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7" +dependencies = [ + "libc", +] + [[package]] name = "humantime" version = "2.1.0" @@ -519,12 +549,22 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "indexmap" -version = "1.9.1" +version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", - "hashbrown", + "hashbrown 0.12.3", +] + +[[package]] +name = "indexmap" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d" +dependencies = [ + "equivalent", + "hashbrown 0.14.0", ] [[package]] @@ -538,15 +578,15 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.3" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c8af84674fe1f223a982c933a0ee1086ac4d4052aa0fb8060c12c6ad838e754" +checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6" [[package]] name = "js-sys" -version = "0.3.61" +version = "0.3.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "445dde2150c55e483f3d8416706b97ec8e8237c307e5b7b4b8dd15e6af2a0730" +checksum = "c5f195fe497f702db0f318b07fdd68edb16955aed830df8363d837542f8f935a" dependencies = [ "wasm-bindgen", ] @@ -559,18 +599,15 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.134" +version = "0.2.147" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "329c933548736bc49fd575ee68c89e8be4d260064184389a5b77517cddd99ffb" +checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" [[package]] name = "log" -version = "0.4.17" +version = "0.4.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" -dependencies = [ - "cfg-if", -] +checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" [[package]] name = "memchr" @@ -604,19 +641,19 @@ dependencies = [ [[package]] name = "num_cpus" -version = "1.13.1" +version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" +checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b" dependencies = [ - "hermit-abi", + "hermit-abi 0.2.6", "libc", ] [[package]] name = "once_cell" -version = "1.15.0" +version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e82dad04139b71a90c080c8463fe0dc7902db5192d939bd0950f074d014339e1" +checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" [[package]] name = "oorandom" @@ -626,7 +663,7 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" [[package]] name = "orchestra" -version = "0.2.1" +version = "0.3.0" dependencies = [ "async-trait", "criterion", @@ -644,18 +681,18 @@ dependencies = [ [[package]] name = "orchestra-proc-macro" -version = "0.2.1" +version = "0.3.0" dependencies = [ "assert_matches", "expander", - "indexmap", + "indexmap 1.9.3", "itertools", "orchestra", "petgraph", "proc-macro-crate", "proc-macro2", "quote", - "syn", + "syn 1.0.109", "thiserror", "tracing", ] @@ -668,32 +705,32 @@ checksum = "4d5d9eb14b174ee9aa2ef96dc2b94637a2d4b6e7cb873c7e171f0c20c6cf3eac" [[package]] name = "petgraph" -version = "0.6.2" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6d5014253a1331579ce62aa67443b4a658c5e7dd03d4bc6d302b94474888143" +checksum = "4dd7d28ee937e54fe3080c91faa1c3a46c06de6252988a7f4592ba2310ef22a4" dependencies = [ "fixedbitset", - "indexmap", + "indexmap 1.9.3", ] [[package]] name = "pin-project" -version = "1.0.12" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc" +checksum = "c95a7476719eab1e366eaf73d0260af3021184f18177925b07f54b30089ceead" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.0.12" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55" +checksum = "39407670928234ebc5e6e580247dd567ad73a3578460c5990f9503df207e8f07" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.22", ] [[package]] @@ -738,7 +775,7 @@ dependencies = [ [[package]] name = "prioritized-metered-channel" -version = "0.4.0" +version = "0.5.0" dependencies = [ "assert_matches", "async-channel", @@ -756,29 +793,28 @@ dependencies = [ [[package]] name = "proc-macro-crate" -version = "1.2.1" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eda0fc3b0fb7c975631757e14d9049da17374063edb6ebbcbc54d880d4fe94e9" +checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919" dependencies = [ "once_cell", - "thiserror", - "toml", + "toml_edit", ] [[package]] name = "proc-macro2" -version = "1.0.47" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ea3d908b0e36316caf9e9e2c4625cdde190a7e6f440d794667ed17a1855e725" +checksum = "7b368fba921b0dce7e60f5e04ec15e565b3303972b42bcfde1d0713b881959eb" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.21" +version = "1.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbe448f377a7d6961e30f5955f9b8d106c3f5e449d493ee1b125c1d43c2b5179" +checksum = "1b9ab9c7eadfd8df19006f1cf1a4aed13540ed5cbc047010ece5826e10825488" dependencies = [ "proc-macro2", ] @@ -807,9 +843,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.6.0" +version = "1.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c4eb3267174b8c6c2f654116623910a0fef09c4753f8dd83db29c48a0df988b" +checksum = "d0ab3ca65655bb1e41f2a8c8cd662eb4fb035e67c3f78da1d61dffe89d07300f" dependencies = [ "aho-corasick", "memchr", @@ -818,9 +854,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.6.27" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244" +checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78" [[package]] name = "rustc_version" @@ -833,15 +869,15 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.9" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97477e48b4cf8603ad5f7aaf897467cf42ab4218a38ef76fb14c2d6773a6d6a8" +checksum = "4f3208ce4d8448b3f3e7d168a73f5e0c43a61e32930de3bceeccedb388b6bf06" [[package]] name = "ryu" -version = "1.0.11" +version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09" +checksum = "f91339c0467de62360649f8d3e185ca8de4224ff281f66000de5eb2a77a79041" [[package]] name = "same-file" @@ -860,35 +896,35 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] name = "semver" -version = "1.0.14" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e25dfac463d778e353db5be2449d1cce89bd6fd23c9f1ea21310ce6e5a1b29c4" +checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed" [[package]] name = "serde" -version = "1.0.145" +version = "1.0.164" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "728eb6351430bccb993660dfffc5a72f91ccc1295abaa8ce19b27ebe4f75568b" +checksum = "9e8c8cf938e98f769bc164923b06dce91cea1751522f46f8466461af04c9027d" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.145" +version = "1.0.164" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81fa1584d3d1bcacd84c277a0dfe21f5b0f6accf4a23d04d4c6d61f1af522b4c" +checksum = "d9735b638ccc51c28bf6914d90a2e9725b377144fc612c49a611fddd1b631d68" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.22", ] [[package]] name = "serde_json" -version = "1.0.85" +version = "1.0.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e55a28e3aaef9d5ce0506d0a14dbba8054ddc7e499ef522dd8b26859ec9d4a44" +checksum = "46266871c240a00b8f503b877622fe33430b3c7d963bdc0f2adc511e54a1eae3" dependencies = [ "itoa", "ryu", @@ -897,18 +933,18 @@ dependencies = [ [[package]] name = "slab" -version = "0.4.7" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef" +checksum = "6528351c9bc8ab22353f9d776db39a20288e8d6c37ef8cfe3317cf875eecfc2d" dependencies = [ "autocfg", ] [[package]] name = "subtle" -version = "2.4.1" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" +checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" [[package]] name = "syn" @@ -921,11 +957,22 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "syn" +version = "2.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2efbeae7acf4eabd6bcdcbd11c92f45231ddda7539edc7806bd1a04a03b24616" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "termcolor" -version = "1.1.3" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" +checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6" dependencies = [ "winapi-util", ] @@ -938,22 +985,22 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" [[package]] name = "thiserror" -version = "1.0.37" +version = "1.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e" +checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.37" +version = "1.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb" +checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.22", ] [[package]] @@ -967,19 +1014,27 @@ dependencies = [ ] [[package]] -name = "toml" -version = "0.5.9" +name = "toml_datetime" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b" + +[[package]] +name = "toml_edit" +version = "0.19.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d82e1a7758622a465f8cee077614c73484dac5b836c02ff6a40d5d1010324d7" +checksum = "266f016b7f039eec8a1a80dfe6156b633d208b9fccca5e4db1d6775b0c4e34a7" dependencies = [ - "serde", + "indexmap 2.0.0", + "toml_datetime", + "winnow", ] [[package]] name = "tracing" -version = "0.1.36" +version = "0.1.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fce9567bd60a67d08a16488756721ba392f24f29006402881e43b19aac64307" +checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if", "log", @@ -990,50 +1045,50 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.22" +version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11c75893af559bc8e10716548bdef5cb2b983f8e637db9d0e15126b61b484ee2" +checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.22", ] [[package]] name = "tracing-core" -version = "0.1.29" +version = "0.1.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aeea4303076558a00714b823f9ad67d58a3bbda1df83d8827d21193156e22f7" +checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" dependencies = [ "once_cell", ] [[package]] name = "trybuild" -version = "1.0.65" +version = "1.0.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e13556ba7dba80b3c76d1331989a341290c77efcf688eca6c307ee3066383dd" +checksum = "501dbdbb99861e4ab6b60eb6a7493956a9defb644fd034bc4a5ef27c693c8a3a" dependencies = [ + "basic-toml", "glob", "once_cell", "serde", "serde_derive", "serde_json", "termcolor", - "toml", ] [[package]] name = "typenum" -version = "1.15.0" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987" +checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" [[package]] name = "unicode-ident" -version = "1.0.4" +version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcc811dc4066ac62f84f11307873c4850cb653bfa9b1719cee2bd2204a4bc5dd" +checksum = "b15811caf2415fb889178633e7724bad2509101cde276048e013b9def5e51fa0" [[package]] name = "version_check" @@ -1059,9 +1114,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.84" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31f8dcbc21f30d9b8f2ea926ecb58f6b91192c17e9d33594b3df58b2007ca53b" +checksum = "7706a72ab36d8cb1f80ffbf0e071533974a60d0a308d01a5d0375bf60499a342" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -1069,24 +1124,24 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.84" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95ce90fd5bcc06af55a641a86428ee4229e44e07033963a2290a8e241607ccb9" +checksum = "5ef2b6d3c510e9625e5fe6f509ab07d66a760f0885d858736483c32ed7809abd" dependencies = [ "bumpalo", "log", "once_cell", "proc-macro2", "quote", - "syn", + "syn 2.0.22", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-macro" -version = "0.2.84" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c21f77c0bedc37fd5dc21f897894a5ca01e7bb159884559461862ae90c0b4c5" +checksum = "dee495e55982a3bd48105a7b947fd2a9b4a8ae3010041b9e0faab3f9cd028f1d" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -1094,28 +1149,28 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.84" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2aff81306fcac3c7515ad4e177f521b5c9a15f2b08f4e32d823066102f35a5f6" +checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.22", "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.84" +version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0046fef7e28c3804e5e38bfa31ea2a0f73905319b677e57ebe37e49358989b5d" +checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" [[package]] name = "web-sys" -version = "0.3.61" +version = "0.3.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e33b99f4b23ba3eec1a53ac264e35a755f00e966e0065077d6027c0f575b0b97" +checksum = "9b85cbef8c220a6abc02aefd892dfc0fc23afb1c6a426316ec33253a3877249b" dependencies = [ "js-sys", "wasm-bindgen", @@ -1151,3 +1206,12 @@ name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "winnow" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca0ace3845f0d96209f0375e6d367e3eb87eb65d27d445bdc9f1843a26f39448" +dependencies = [ + "memchr", +] diff --git a/Cargo.toml b/Cargo.toml index 3f7ec81..ff109a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,11 @@ [workspace] members = ["orchestra", "metered-channel", "orchestra/proc-macro"] +resolver = "2" [workspace.package] authors = ["Parity Technologies "] edition = "2021" -version = "0.2.1" +version = "0.3.0" readme = "README.md" license = "MIT OR Apache-2.0" repository = "https://github.com/paritytech/orchestra" diff --git a/metered-channel/Cargo.toml b/metered-channel/Cargo.toml index a14d2af..ea44360 100644 --- a/metered-channel/Cargo.toml +++ b/metered-channel/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "prioritized-metered-channel" -version = "0.4.0" +version = "0.5.0" authors = ["Parity Technologies "] edition = "2021" description = "Channels with built-in observability and message priorizitazion (coming soon™)" @@ -10,7 +10,7 @@ license = "MIT OR Apache-2.0" [dependencies] futures = "0.3.21" futures-timer = "3.0.2" -async-channel = "1.8.0" +async-channel = { version = "1.8.0", optional = true } derive_more = "0.99" tracing = "0.1.35" thiserror = "1.0.31" @@ -24,3 +24,9 @@ assert_matches = "1.5" env_logger = "0.9" log = "0.4" tracing = { version = "0.1.35", features = ["log"] } + +[features] +default = ["async_channel"] +async_channel = ["dep:async-channel"] +futures_channel = [] + diff --git a/metered-channel/src/bounded.rs b/metered-channel/src/bounded.rs index 8524a9e..9033bd0 100644 --- a/metered-channel/src/bounded.rs +++ b/metered-channel/src/bounded.rs @@ -14,9 +14,18 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -//! Metered variant of bounded mpsc channels to be able to extract metrics. +#[cfg(feature = "async_channel")] +use async_channel::{ + bounded as bounded_channel, Receiver, Sender, TryRecvError, TrySendError as ChannelTrySendError, +}; + +#[cfg(feature = "futures_channel")] +use futures::{ + channel::mpsc::channel as bounded_channel, + channel::mpsc::{Receiver, Sender, TryRecvError}, + sink::SinkExt, +}; -use async_channel::{bounded, RecvError, TryRecvError}; use futures::{ stream::Stream, task::{Context, Poll}, @@ -24,11 +33,11 @@ use futures::{ use std::{pin::Pin, result}; use super::{measure_tof_check, CoarseInstant, MaybeTimeOfFlight, Meter}; -pub use async_channel::TrySendError; /// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`. pub fn channel(capacity: usize) -> (MeteredSender, MeteredReceiver) { - let (tx, rx) = bounded::>(capacity); + let (tx, rx) = bounded_channel::>(capacity); + let shared_meter = Meter::default(); let tx = MeteredSender { meter: shared_meter.clone(), inner: tx }; let rx = MeteredReceiver { meter: shared_meter, inner: rx }; @@ -40,27 +49,82 @@ pub fn channel(capacity: usize) -> (MeteredSender, MeteredReceiver) { pub struct MeteredReceiver { // count currently contained messages meter: Meter, - inner: async_channel::Receiver>, + inner: Receiver>, } /// A bounded channel error #[derive(thiserror::Error, Debug)] pub enum SendError { - #[error("Bounded channel has been disconnected")] - Disconnected(T), + #[error("Bounded channel has been closed")] + Closed(T), + #[error("Bounded channel has been closed and the original message is lost")] + Terminated, } impl SendError { + /// Returns the inner value. + pub fn into_inner(self) -> Option { + match self { + Self::Closed(t) => Some(t), + Self::Terminated => None, + } + } +} + +/// A bounded channel error when trying to send a message (transparently wraps the inner error type) +#[derive(thiserror::Error, Debug)] +pub enum TrySendError { + #[error("Bounded channel has been closed")] + Closed(T), + #[error("Bounded channel is full")] + Full(T), +} + +impl TrySendError { /// Returns the inner value. pub fn into_inner(self) -> T { match self { - Self::Disconnected(t) => t, + Self::Closed(t) => t, + Self::Full(t) => t, + } + } + + /// Returns `true` if we could not send to channel as it was full + pub fn is_full(&self) -> bool { + match self { + Self::Closed(_) => false, + Self::Full(_) => true, + } + } + + /// Returns `true` if we could not send to channel as it was disconnected + pub fn is_disconnected(&self) -> bool { + match self { + Self::Closed(_) => true, + Self::Full(_) => false, } } } +/// Error when receiving from a closed bounded channel +#[derive(thiserror::Error, PartialEq, Eq, Clone, Copy, Debug)] +pub struct RecvError {} + +impl std::fmt::Display for RecvError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "receiving from an empty and closed channel") + } +} + +#[cfg(feature = "async_channel")] +impl From for RecvError { + fn from(_: async_channel::RecvError) -> Self { + RecvError {} + } +} + impl std::ops::Deref for MeteredReceiver { - type Target = async_channel::Receiver>; + type Target = Receiver>; fn deref(&self) -> &Self::Target { &self.inner } @@ -75,7 +139,7 @@ impl std::ops::DerefMut for MeteredReceiver { impl Stream for MeteredReceiver { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match async_channel::Receiver::poll_next(Pin::new(&mut self.inner), cx) { + match Receiver::poll_next(Pin::new(&mut self.inner), cx) { Poll::Ready(maybe_value) => Poll::Ready(self.maybe_meter_tof(maybe_value)), Poll::Pending => Poll::Pending, } @@ -111,6 +175,16 @@ impl MeteredReceiver { } /// Attempt to receive the next item. + #[cfg(feature = "futures_channel")] + pub fn try_next(&mut self) -> Result, TryRecvError> { + match self.inner.try_next()? { + Some(value) => Ok(self.maybe_meter_tof(Some(value))), + None => Ok(None), + } + } + + /// Attempt to receive the next item. + #[cfg(feature = "async_channel")] pub fn try_next(&mut self) -> Result, TryRecvError> { match self.inner.try_recv() { Ok(value) => Ok(self.maybe_meter_tof(Some(value))), @@ -119,15 +193,17 @@ impl MeteredReceiver { } /// Receive the next item. + #[cfg(feature = "async_channel")] pub async fn recv(&mut self) -> Result { match self.inner.recv().await { Ok(value) => Ok(self.maybe_meter_tof(Some(value)).expect("wrapped value is always Some, qed")), - Err(err) => Err(err), + Err(err) => Err(err.into()), } } /// Attempt to receive the next item without blocking + #[cfg(feature = "async_channel")] pub fn try_recv(&mut self) -> Result { match self.inner.try_recv() { Ok(value) => @@ -136,6 +212,7 @@ impl MeteredReceiver { } } + #[cfg(feature = "async_channel")] /// Returns the current number of messages in the channel pub fn len(&self) -> usize { self.inner.len() @@ -153,7 +230,7 @@ impl futures::stream::FusedStream for MeteredReceiver { #[derive(Debug)] pub struct MeteredSender { meter: Meter, - inner: async_channel::Sender>, + inner: Sender>, } impl Clone for MeteredSender { @@ -163,7 +240,7 @@ impl Clone for MeteredSender { } impl std::ops::Deref for MeteredSender { - type Target = async_channel::Sender>; + type Target = Sender>; fn deref(&self) -> &Self::Target { &self.inner } @@ -199,35 +276,76 @@ impl MeteredSender { match self.try_send(msg) { Err(send_err) => { if !send_err.is_full() { - return Err(SendError::Disconnected(send_err.into_inner().into())) + return Err(SendError::Closed(send_err.into_inner().into())) } self.meter.note_blocked(); self.meter.note_sent(); // we are going to do full blocking send, so we have to note it here let msg = send_err.into_inner().into(); - let fut = self.inner.send(msg); - futures::pin_mut!(fut); - fut.await.map_err(|err| { - self.meter.retract_sent(); - SendError::Disconnected(err.0.into()) - }) + self.send_to_channel(msg).await }, _ => Ok(()), } } + // A helper routine to send a message to the channel after `try_send` returned that a channel is full + #[cfg(feature = "async_channel")] + async fn send_to_channel( + &mut self, + msg: MaybeTimeOfFlight, + ) -> result::Result<(), SendError> { + let fut = self.inner.send(msg); + futures::pin_mut!(fut); + fut.await.map_err(|err| { + self.meter.retract_sent(); + SendError::Closed(err.0.into()) + }) + } + + #[cfg(feature = "futures_channel")] + async fn send_to_channel( + &mut self, + msg: MaybeTimeOfFlight, + ) -> result::Result<(), SendError> { + let fut = self.inner.send(msg); + futures::pin_mut!(fut); + fut.await.map_err(|_| { + self.meter.retract_sent(); + // Futures channel does not provide a way to save the original message, + // so to avoid `T: Clone` bound we just return a generic error + SendError::Terminated + }) + } + + #[cfg(feature = "futures_channel")] + /// Attempt to send message or fail immediately. + pub fn try_send(&mut self, msg: T) -> result::Result<(), TrySendError> { + let msg = self.prepare_with_tof(msg); // note_sent is called in here + self.inner.try_send(msg).map_err(|e| { + self.meter.retract_sent(); // we didn't send it, so we need to undo the note_send + if e.is_full() { + TrySendError::Full(e.into_inner().into()) + } else { + TrySendError::Closed(e.into_inner().into()) + } + }) + } + + #[cfg(feature = "async_channel")] /// Attempt to send message or fail immediately. pub fn try_send(&mut self, msg: T) -> result::Result<(), TrySendError> { let msg = self.prepare_with_tof(msg); // note_sent is called in here self.inner.try_send(msg).map_err(|e| { self.meter.retract_sent(); // we didn't send it, so we need to undo the note_send match e { - TrySendError::Full(inner_error) => TrySendError::Full(inner_error.into()), - TrySendError::Closed(inner_error) => TrySendError::Closed(inner_error.into()), + ChannelTrySendError::Full(inner_error) => TrySendError::Full(inner_error.into()), + ChannelTrySendError::Closed(inner_error) => + TrySendError::Closed(inner_error.into()), } }) } + #[cfg(feature = "async_channel")] /// Returns the current number of messages in the channel pub fn len(&self) -> usize { self.inner.len() diff --git a/metered-channel/src/lib.rs b/metered-channel/src/lib.rs index 16c7d04..7de62a8 100644 --- a/metered-channel/src/lib.rs +++ b/metered-channel/src/lib.rs @@ -17,6 +17,12 @@ //! Metered variant of mpsc channels to be able to extract metrics. #![allow(clippy::all)] +#[cfg(all(feature = "async_channel", feature = "futures_channel",))] +compile_error!("`async_channel` and `futures_channel` are mutually exclusive features"); + +#[cfg(not(any(feature = "async_channel", feature = "futures_channel")))] +compile_error!("Must build with either `async_channel` or `futures_channel` features"); + use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, diff --git a/metered-channel/src/tests.rs b/metered-channel/src/tests.rs index 43bb488..a5ed9a5 100644 --- a/metered-channel/src/tests.rs +++ b/metered-channel/src/tests.rs @@ -159,7 +159,12 @@ fn failed_send_does_not_inc_sent() { #[test] fn blocked_send_is_metered() { + // Async channel and futures channel have different semantics for + // capacity (futures channel capacity is actually `capacity + 1`) + #[cfg(feature = "async_channel")] let (bounded_sender, mut bounded_receiver) = channel::(2); + #[cfg(feature = "futures_channel")] + let (bounded_sender, mut bounded_receiver) = channel::(1); block_on(async move { let mut sender1 = bounded_sender.clone(); @@ -170,13 +175,13 @@ fn blocked_send_is_metered() { assert!(sender1.send(Msg::default()).await.is_ok()); }, async move { - bounded_receiver.recv().await.unwrap(); + bounded_receiver.next().await.unwrap(); assert_matches!( bounded_receiver.meter().read(), Readout { sent: 3, received: 1, blocked: 1, .. } ); - bounded_receiver.recv().await.unwrap(); - bounded_receiver.recv().await.unwrap(); + bounded_receiver.next().await.unwrap(); + bounded_receiver.next().await.unwrap(); assert_matches!( bounded_receiver.meter().read(), Readout { sent: 3, received: 3, blocked: 1, .. } diff --git a/metered-channel/src/unbounded.rs b/metered-channel/src/unbounded.rs index 6349f3a..f1d4063 100644 --- a/metered-channel/src/unbounded.rs +++ b/metered-channel/src/unbounded.rs @@ -21,14 +21,24 @@ use futures::{ task::{Context, Poll}, }; -use async_channel::{TryRecvError, TrySendError}; +#[cfg(feature = "async_channel")] +use async_channel::{unbounded as unbounded_channel, Receiver, Sender, TryRecvError, TrySendError}; + +#[cfg(feature = "futures_channel")] +use futures::{ + channel::mpsc::unbounded as unbounded_channel, + channel::mpsc::{ + TryRecvError, TrySendError, UnboundedReceiver as Receiver, UnboundedSender as Sender, + }, +}; + use std::{pin::Pin, result}; use super::{measure_tof_check, CoarseInstant, MaybeTimeOfFlight, Meter}; /// Create a wrapped `mpsc::channel` pair of `MeteredSender` and `MeteredReceiver`. pub fn unbounded() -> (UnboundedMeteredSender, UnboundedMeteredReceiver) { - let (tx, rx) = async_channel::unbounded::>(); + let (tx, rx) = unbounded_channel::>(); let shared_meter = Meter::default(); let tx = UnboundedMeteredSender { meter: shared_meter.clone(), inner: tx }; let rx = UnboundedMeteredReceiver { meter: shared_meter, inner: rx }; @@ -40,11 +50,11 @@ pub fn unbounded() -> (UnboundedMeteredSender, UnboundedMeteredReceiver pub struct UnboundedMeteredReceiver { // count currently contained messages meter: Meter, - inner: async_channel::Receiver>, + inner: Receiver>, } impl std::ops::Deref for UnboundedMeteredReceiver { - type Target = async_channel::Receiver>; + type Target = Receiver>; fn deref(&self) -> &Self::Target { &self.inner } @@ -59,7 +69,7 @@ impl std::ops::DerefMut for UnboundedMeteredReceiver { impl Stream for UnboundedMeteredReceiver { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match async_channel::Receiver::poll_next(Pin::new(&mut self.inner), cx) { + match Receiver::poll_next(Pin::new(&mut self.inner), cx) { Poll::Ready(maybe_value) => Poll::Ready(self.maybe_meter_tof(maybe_value)), Poll::Pending => Poll::Pending, } @@ -95,6 +105,16 @@ impl UnboundedMeteredReceiver { } /// Attempt to receive the next item. + #[cfg(feature = "futures_channel")] + pub fn try_next(&mut self) -> Result, TryRecvError> { + match self.inner.try_next()? { + Some(value) => Ok(self.maybe_meter_tof(Some(value))), + None => Ok(None), + } + } + + /// Attempt to receive the next item. + #[cfg(feature = "async_channel")] pub fn try_next(&mut self) -> Result, TryRecvError> { match self.inner.try_recv() { Ok(value) => Ok(self.maybe_meter_tof(Some(value))), @@ -103,6 +123,7 @@ impl UnboundedMeteredReceiver { } /// Returns the current number of messages in the channel + #[cfg(feature = "async_channel")] pub fn len(&self) -> usize { self.inner.len() } @@ -119,7 +140,7 @@ impl futures::stream::FusedStream for UnboundedMeteredReceiver { #[derive(Debug)] pub struct UnboundedMeteredSender { meter: Meter, - inner: async_channel::Sender>, + inner: Sender>, } impl Clone for UnboundedMeteredSender { @@ -129,7 +150,7 @@ impl Clone for UnboundedMeteredSender { } impl std::ops::Deref for UnboundedMeteredSender { - type Target = async_channel::Sender>; + type Target = Sender>; fn deref(&self) -> &Self::Target { &self.inner } @@ -158,6 +179,17 @@ impl UnboundedMeteredSender { } /// Attempt to send message or fail immediately. + #[cfg(feature = "futures_channel")] + pub fn unbounded_send(&self, msg: T) -> result::Result<(), TrySendError>> { + let msg = self.prepare_with_tof(msg); + self.inner.unbounded_send(msg).map_err(|e| { + self.meter.retract_sent(); + e + }) + } + + /// Attempt to send message or fail immediately. + #[cfg(feature = "async_channel")] pub fn unbounded_send(&self, msg: T) -> result::Result<(), TrySendError>> { let msg = self.prepare_with_tof(msg); self.inner.try_send(msg).map_err(|e| { @@ -167,6 +199,7 @@ impl UnboundedMeteredSender { } /// Returns the current number of messages in the channel + #[cfg(feature = "async_channel")] pub fn len(&self) -> usize { self.inner.len() } diff --git a/orchestra/Cargo.toml b/orchestra/Cargo.toml index 7839907..b14dfa1 100644 --- a/orchestra/Cargo.toml +++ b/orchestra/Cargo.toml @@ -14,8 +14,8 @@ tracing = "0.1.35" futures = "0.3" async-trait = "0.1" thiserror = "1" -metered = { package = "prioritized-metered-channel", version = "0.4.0", path = "../metered-channel" } -orchestra-proc-macro = { version = "0.2.0", path = "./proc-macro" } +metered = { package = "prioritized-metered-channel", version = "0.5.0", path = "../metered-channel", default-features = false } +orchestra-proc-macro = { version = "0.3.0", path = "./proc-macro" } futures-timer = "3.0.2" pin-project = "1.0" dyn-clonable = "0.9" @@ -39,7 +39,7 @@ name = "bench_main" harness = false [features] -default = ["deny_unconsumed_messages","deny_unsent_messages"] +default = ["deny_unconsumed_messages","deny_unsent_messages","async_channel"] # Generate a file containing the generated code that # is used via `include_str!`. expand = ["orchestra-proc-macro/expand"] @@ -52,3 +52,7 @@ dotgraph = ["orchestra-proc-macro/dotgraph"] deny_unconsumed_messages = ["orchestra-proc-macro/deny_unconsumed_messages"] # Creates a compile error if unsent messages are encountered deny_unsent_messages = ["orchestra-proc-macro/deny_unsent_messages"] +# Use async channel in the subsystem message channels (default) +async_channel = ["metered/async_channel"] +# Use compatibility futures channel in the subsystem message channels +futures_channel = ["metered/futures_channel"] diff --git a/orchestra/proc-macro/src/impl_orchestra.rs b/orchestra/proc-macro/src/impl_orchestra.rs index 127b5b1..8075bdb 100644 --- a/orchestra/proc-macro/src/impl_orchestra.rs +++ b/orchestra/proc-macro/src/impl_orchestra.rs @@ -61,6 +61,9 @@ pub(crate) fn impl_orchestra_struct(info: &OrchestraInfo) -> proc_macro2::TokenS /// Capacity of a signal channel between a subsystem and the orchestra. const SIGNAL_CHANNEL_CAPACITY: usize = #signal_channel_capacity; + /// Timeout to wait for a signal to be processed by the target subsystem. If this timeout is exceeded, the + /// orchestra terminates with an error. + const SIGNAL_TIMEOUT: ::std::time::Duration = ::std::time::Duration::from_secs(10); /// The log target tag. const LOG_TARGET: &str = #log_target; @@ -129,12 +132,45 @@ pub(crate) fn impl_orchestra_struct(info: &OrchestraInfo) -> proc_macro2::TokenS /// Broadcast a signal to all subsystems. pub async fn broadcast_signal(&mut self, signal: #signal_ty) -> ::std::result::Result<(), #error_ty > { + let mut delayed_signals : #support_crate ::futures::stream::FuturesUnordered<::std::pin::Pin<::std::boxed::Box>>>> + = #support_crate ::futures::stream::FuturesUnordered::new(); #( + // Use fast path if possible. #feature_gates - self. #subsystem_name .send_signal(signal.clone()).await?; + if let Err(e) = self. #subsystem_name .try_send_signal(signal.clone()) { + match e { + #support_crate::TrySendError::Full(sig) => { + let instance = self. #subsystem_name .instance.as_mut().expect("checked in try_send_signal"); + delayed_signals.push(::std::boxed::Box::pin(async move { + match instance.tx_signal.send(sig).timeout(SIGNAL_TIMEOUT).await { + None => { + Err(#error_ty :: from( + #support_crate ::OrchestraError::SubsystemStalled(instance.name, "signal", ::std::any::type_name::<#signal_ty>()) + )) + } + Some(res) => { + let res = res.map_err(|_| #error_ty :: from( + #support_crate ::OrchestraError::QueueError + )); + if res.is_ok() { + instance.signals_received += 1; + } + res + } + } + })); + }, + _ => return Err(#error_ty :: from(#support_crate ::OrchestraError::QueueError)) + } + } )* let _ = signal; + // If fast path failed, wait for all delayed signals with no specific order + while let Some(res) = delayed_signals.next().await { + res?; + } + Ok(()) } @@ -255,12 +291,21 @@ pub(crate) fn impl_orchestrated_subsystem(info: &OrchestraInfo) -> proc_macro2:: } } + /// Tries to send a signal to the wrapped subsystem without waiting. + pub fn try_send_signal(&mut self, signal: #signal) -> ::std::result::Result<(), #support_crate :: TrySendError<#signal> > { + if let Some(ref mut instance) = self.instance { + instance.tx_signal.try_send(signal)?; + instance.signals_received += 1; + Ok(()) + } else { + Ok(()) + } + } + /// Send a signal to the wrapped subsystem. /// /// If the inner `instance` is `None`, nothing is happening. pub async fn send_signal(&mut self, signal: #signal) -> ::std::result::Result<(), #error_ty > { - const SIGNAL_TIMEOUT: ::std::time::Duration = ::std::time::Duration::from_secs(10); - if let Some(ref mut instance) = self.instance { match instance.tx_signal.send(signal).timeout(SIGNAL_TIMEOUT).await { None => { diff --git a/orchestra/src/lib.rs b/orchestra/src/lib.rs index 9eb05e0..9972f8b 100644 --- a/orchestra/src/lib.rs +++ b/orchestra/src/lib.rs @@ -90,6 +90,9 @@ use std::sync::{ #[doc(hidden)] pub use std::time::Duration; +#[doc(hidden)] +pub use metered::TrySendError; + #[doc(hidden)] pub use futures_timer::Delay;