Skip to content

DelayQueue never firing new items after move #1668

@gwik

Description

@gwik

Version

tokio = "0.1.22"
futures = "0.1.29"

Platform

Linux 4.18.0-26-generic #27-Ubuntu SMP Tue Jul 2 15:36:16 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux

Subcrates

tokio::timer::delay_queue

Description

Moving DelayQueue make it to never fires scheduled items in the future.
Also other timers in the runtime seems stuck.
I read that timers are thread local so maybe the timers are inserted in the wrong thread ??
The code looks valid (to me and my very little experience of rust and tokio) however I would have expect the compilation to fail if it isn't.

I tried this code:

extern crate tokio;
extern crate futures;

use futures::prelude::*;
use futures::sync::BiLock;
use std::time::Instant;
use tokio::timer::*;

struct DelayWrapper {
    queue: DelayQueue<String>,
}

impl DelayWrapper {
    fn split(self) -> (DelaySink, DelayStream) {
        let (sink_lock, stream_lock) = BiLock::new(self);
        (DelaySink(sink_lock), DelayStream(stream_lock))
    }
}

struct DelayStream(BiLock<DelayWrapper>);

impl Stream for DelayStream {
    type Item = String;
    type Error = Error;

    fn poll(&mut self) -> Poll<Option<String>, Error> {
        let start = Instant::now();

        loop {
            println!("POLL {:?}", Instant::now().duration_since(start));
            match self.0.poll_lock() {
                Async::Ready(mut inner) => {

                    println!("READY TO POLL {:?}", Instant::now().duration_since(start));

                    match inner.queue.poll() {
                        Ok(Async::Ready(Some(expired))) => {
                            let s = expired.into_inner();
                            println!("FIRED {:}", s);
                            return Ok(Async::Ready(Some(s)));
                        },
                        Ok(Async::Ready(None)) => (),
                        Ok(Async::NotReady) => (),
                        Err(e) => return Err(e),
                    }
                },
                Async::NotReady => (),
            }
        }
    }
}

struct DelaySink(BiLock<DelayWrapper>);

impl Sink for DelaySink {
    type SinkItem = (String, Instant);
    type SinkError = ();

    fn start_send(&mut self, item: (String, Instant)) -> StartSend<(String, Instant), ()> {
        match self.0.poll_lock() {
            Async::Ready(mut inner) => {
                println!("INSERT: {:} {:?}", item.0, item.1);
                inner.queue.insert_at(item.0, item.1);
                Ok(AsyncSink::Ready)
            },
            Async::NotReady => Ok(AsyncSink::NotReady(item)),
        }
    }

    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
        Ok(Async::Ready(()))
    }

    fn close(&mut self) -> Poll<(), Self::SinkError> {
        Ok(Async::Ready(()))
    }
}

#[cfg(test)]
mod tests {

    use tokio::prelude::*;
    use tokio::timer::Delay;


    #[test]
    fn it_works() {
        use std::time::{Duration, Instant};

        let when = Instant::now() + Duration::from_millis(100);

        tokio::run({
            Delay::new(when)
                .map_err(|e| panic!("timer failed; err={:?}", e))
                .and_then(|_| {
                    println!("Hello world!");
                    Ok(())
                })
        })
    }

    #[test]
    fn timers_are_stuck() {

        use futures::future::lazy;
        use tokio::timer::DelayQueue;
        use super::*;

        tokio::run(lazy(||{
            use std::time::Duration;

            let w = DelayWrapper{
                queue: DelayQueue::new(),
            };

            let (sink, stream) = w.split();

            let samples = vec!(
                (
                    "before now will fire".to_string(),
                    Instant::now() - Duration::from_millis(500),
                ),
                (
                    "after now will never fire".to_string(),
                    Instant::now() + Duration::from_millis(1000),                    
                ),
                (
                    "before now will fire again".to_string(),
                    Instant::now() - Duration::from_millis(500),                    
                ),
            );

            let n = samples.len();

            tokio::spawn(lazy(|| {
                let input = stream::iter_ok(samples);
                sink.send_all(input).map(|_| ()).map_err(|err| panic!(err))
            }));

            stream
                .timeout(Duration::from_secs(5))
                .take(n as u64)
                .map(|out| println!("out: {:}", out))
                .map_err(|err| panic!(err)) // timeout never fires either
                .fold((), |_, _| Ok(()))
        }));
    }

}

env RUSTBACKTRACE=full cargo +nightly test -- --nocapture | rg -e INSERT -e FIRED

INSERT: before now will fire Instant { tv_sec: 46909, tv_nsec: 845376644 }
FIRED before now will fire
INSERT: after now will never fire Instant { tv_sec: 46911, tv_nsec: 345377016 }
INSERT: before now will fire again Instant { tv_sec: 46909, tv_nsec: 845377332 }
FIRED before now will fire again

Then the program get stuck forever, timeout on the stream never fires either.

Metadata

Metadata

Assignees

No one assigned

    Labels

    A-tokioArea: The main tokio crateC-bugCategory: This is a bug.M-timeModule: tokio/timeT-v0.1.xTopic: tokio 0.1.x

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions