diff --git a/tokio-stream/src/wrappers/watch.rs b/tokio-stream/src/wrappers/watch.rs index 96e1a744fc4..6e4ae71cc6d 100644 --- a/tokio-stream/src/wrappers/watch.rs +++ b/tokio-stream/src/wrappers/watch.rs @@ -11,30 +11,28 @@ use std::task::{Context, Poll}; /// [`tokio::sync::watch::Receiver`]: struct@tokio::sync::watch::Receiver /// [`Stream`]: trait@crate::Stream pub struct WatchStream { - inner: Pin>>, - _marker: std::marker::PhantomData, + inner: Pin>>, } -impl WatchStream { +impl WatchStream { /// Create a new `WatchStream`. pub fn new(mut rx: Receiver) -> Self { let stream = stream! { loop { match rx.changed().await { - Ok(item) => yield item, + Ok(_) => yield (*rx.borrow()).clone(), Err(_) => break, } } }; Self { inner: Box::pin(stream), - _marker: std::marker::PhantomData, } } } impl Stream for WatchStream { - type Item = (); + type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.inner).poll_next(cx)