diff --git a/Cargo.lock b/Cargo.lock index 9e73cde34..062c2e45a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -274,6 +274,7 @@ dependencies = [ "crossbeam-channel", "crossbeam-utils", "futures", + "futures-task", "hdrhistogram", "humantime", "parking_lot", diff --git a/console-subscriber/Cargo.toml b/console-subscriber/Cargo.toml index d3c414e3b..c69eedcc8 100644 --- a/console-subscriber/Cargo.toml +++ b/console-subscriber/Cargo.toml @@ -40,7 +40,7 @@ tonic = { version = "0.9", features = ["transport"] } tracing-core = "0.1.24" tracing = "0.1.26" tracing-subscriber = { version = "0.3.11", default-features = false, features = ["fmt", "registry"] } -futures = { version = "0.3", default-features = false } +futures-task = { version = "0.3", default-features = false } hdrhistogram = { version = "7.3.0", default-features = false, features = ["serialization"] } # The parking_lot dependency is renamed, because we want our `parking_lot` # feature to also enable `tracing-subscriber`'s parking_lot feature flag. diff --git a/console-subscriber/src/aggregator/mod.rs b/console-subscriber/src/aggregator/mod.rs index 4496cba28..1b20cd6c6 100644 --- a/console-subscriber/src/aggregator/mod.rs +++ b/console-subscriber/src/aggregator/mod.rs @@ -7,7 +7,6 @@ use console_api as proto; use proto::resources::resource; use tokio::sync::{mpsc, Notify}; -use futures::FutureExt; use std::{ sync::{ atomic::{AtomicBool, Ordering::*}, @@ -221,7 +220,7 @@ impl Aggregator { // to be woken when the flush interval has elapsed, or when the // channel is almost full. let mut drained = false; - while let Some(event) = self.events.recv().now_or_never() { + while let Some(event) = recv_now_or_never(&mut self.events) { match event { Some(event) => { self.update_state(event); @@ -500,6 +499,16 @@ impl Aggregator { } } +fn recv_now_or_never(receiver: &mut mpsc::Receiver) -> Option> { + let waker = futures_task::noop_waker(); + let mut cx = std::task::Context::from_waker(&waker); + + match receiver.poll_recv(&mut cx) { + std::task::Poll::Ready(opt) => Some(opt), + std::task::Poll::Pending => None, + } +} + // ==== impl Flush === impl Flush {