Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loop_fn with poll_peek randomly hangs despite data available #965

Closed
georgmu opened this issue Mar 8, 2019 · 8 comments
Closed

loop_fn with poll_peek randomly hangs despite data available #965

georgmu opened this issue Mar 8, 2019 · 8 comments

Comments

@georgmu
Copy link

georgmu commented Mar 8, 2019

Version

tokio 0.1.16

$ cargo tree | grep tokio | grep -v '(*)' | grep -o tokio.*
tokio v0.1.16
tokio-codec v0.1.1
tokio-io v0.1.12
tokio-current-thread v0.1.5
tokio-executor v0.1.6
tokio-fs v0.1.6
tokio-threadpool v0.1.12
tokio-reactor v0.1.9
tokio-sync v0.1.3
tokio-tcp v0.1.3
tokio-timer v0.2.10
tokio-udp v0.1.3
tokio-uds v0.2.5

Platform

Linux nb-georg 4.20.11-200.fc29.x86_64 #1 SMP Wed Feb 20 15:56:08 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux

Description

I am trying to develop a library for an sniproxy, forwarding TLS tcp streams to a target, but peeking into the stream to get the SNI field to chose the target by a manager. I have the following gist which is broken down to forward everything to a local https server after the first peek succeeded:
https://gist.github.com/georgmu/5cc90160a8b2efb1c560f017409b0527

This works from time to time, but randomly hangs. If it hangs, the CPU runs at 100% and I uncomment the "Not ready", this will spam the console.

The hangs happen more often in release mode than in debug mode.

Test environment: start a local https server listening on port 8443 (or change the IP/Port in the forward function (not in main() - I was not able to forward it into the forward function without borrow checker problems)
Then run curl https://localhost:5555/ or openssl s_client -connect localhost:5555

The data is threre. I can check that in the socket statistics:

$ ss -tanp | grep 5555
LISTEN       0        128                                      127.0.0.1:5555                      0.0.0.0:*      users:(("sniproxy",pid=15231,fd=3))           
ESTAB        0        0                                        127.0.0.1:36348                   127.0.0.1:5555   users:(("openssl",pid=17266,fd=3))            
ESTAB        303      0                                        127.0.0.1:5555                    127.0.0.1:36348  users:(("sniproxy",pid=15231,fd=16))  

There are 303 bytes available for sniproxy.
Looking at strace, the function does no syscalls to receive the data, just looping in userspace. But as mentioned above, the closure which calls poll_peek() gets called.
Here is a backtrace of a hung process:

#0  0x00007fa317c824c0 in nanosleep () from /lib64/libpthread.so.0
#1  0x0000558249f14c01 in std::sys::unix::thread::Thread::sleep () at src/libstd/sys/unix/thread.rs:155
#2  std::thread::sleep () at src/libstd/thread/mod.rs:778
#3  0x0000558249dee3d3 in sniproxy::forward::{{closure}}::{{closure}} (source=...) at src/main.rs:39
#4  0x0000558249de666e in <futures::future::loop_fn::LoopFn<A, F> as futures::future::Future>::poll (self=0x7fa310001f08) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.25/src/future/loop_fn.rs:95
#5  0x0000558249decf4f in <futures::future::chain::Chain<A, B, C>>::poll (self=0x7fa310001f00, f=...) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.25/src/future/chain.rs:26
#6  0x0000558249df24aa in <futures::future::and_then::AndThen<A, B, F> as futures::future::Future>::poll (self=0x7fa310001f00) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.25/src/future/and_then.rs:32
#7  0x0000558249dec2bf in <futures::future::chain::Chain<A, B, C>>::poll (self=0x7fa310001ef8, f=...) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.25/src/future/chain.rs:26
#8  0x0000558249df24da in <futures::future::and_then::AndThen<A, B, F> as futures::future::Future>::poll (self=0x7fa310001ef8) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.25/src/future/and_then.rs:32
#9  0x0000558249deb83f in <futures::future::chain::Chain<A, B, C>>::poll (self=0x7fa310001ef0, f=...) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.25/src/future/chain.rs:26
#10 0x0000558249df250a in <futures::future::and_then::AndThen<A, B, F> as futures::future::Future>::poll (self=0x7fa310001ef0) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.25/src/future/and_then.rs:32
#11 0x0000558249de82f0 in <futures::future::map_err::MapErr<A, F> as futures::future::Future>::poll (self=0x7fa310001ef0) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.25/src/future/map_err.rs:30
#12 0x0000558249f0e415 in <alloc::boxed::Box<F> as futures::future::Future>::poll (self=0x7fa310001ff8) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.25/src/future/mod.rs:113
#13 0x0000558249e89bf4 in <futures::task_impl::Spawn<T>>::poll_future_notify::{{closure}} (f=0x7fa310001ff8) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.25/src/task_impl/mod.rs:326
#14 0x0000558249e89e71 in <futures::task_impl::Spawn<T>>::enter::{{closure}} () at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.25/src/task_impl/mod.rs:396
#15 0x0000558249e73ca7 in futures::task_impl::std::set (task=0x7fa317900e30, f=...) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.25/src/task_impl/std/mod.rs:78
#16 0x0000558249e89e0f in <futures::task_impl::Spawn<T>>::enter (self=0x7fa310001fd0, unpark=..., f=...) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.25/src/task_impl/mod.rs:396
#17 0x0000558249e899a8 in <futures::task_impl::Spawn<T>>::poll_fn_notify (self=0x7fa310001fd0, notify=0x7fa3179018e0, id=140338324840368, f=...) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.25/src/task_impl/mod.rs:288
#18 0x0000558249e89b58 in <futures::task_impl::Spawn<T>>::poll_future_notify (self=0x7fa310001fd0, notify=0x7fa3179018e0, id=140338324840368) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.25/src/task_impl/mod.rs:326
#19 0x0000558249e8d502 in tokio_threadpool::task::Task::run::{{closure}} () at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-threadpool-0.1.12/src/task/mod.rs:145
#20 0x0000558249e60db9 in core::ops::function::FnOnce::call_once () at /rustc/2aa4c46cfdd726e97360c2734835aa3515e8c858/src/libcore/ops/function.rs:231
#21 0x0000558249e70c89 in <std::panic::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once (self=..., _args=()) at /rustc/2aa4c46cfdd726e97360c2734835aa3515e8c858/src/libstd/panic.rs:309
#22 0x0000558249e897d3 in std::panicking::try::do_call (data=0x7fa317901120 "\b\024\220\027\243\177\000") at /rustc/2aa4c46cfdd726e97360c2734835aa3515e8c858/src/libstd/panicking.rs:297
#23 0x0000558249f2251a in __rust_maybe_catch_panic () at src/libpanic_unwind/lib.rs:92
#24 0x0000558249e89590 in std::panicking::try (f=...) at /rustc/2aa4c46cfdd726e97360c2734835aa3515e8c858/src/libstd/panicking.rs:276
#25 0x0000558249e71ab1 in std::panic::catch_unwind (f=...) at /rustc/2aa4c46cfdd726e97360c2734835aa3515e8c858/src/libstd/panic.rs:388
#26 0x0000558249e8cf30 in tokio_threadpool::task::Task::run (self=0x7fa310001fb0, unpark=0x7fa3179018e0) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-threadpool-0.1.12/src/task/mod.rs:130
#27 0x0000558249e6b0ff in tokio_threadpool::worker::Worker::run_task2 (self=0x7fa3179026e0, task=0x7fa317901710, notify=0x7fa3179018e0) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-threadpool-0.1.12/src/worker/mod.rs:567
#28 0x0000558249e6a9b4 in tokio_threadpool::worker::Worker::run_task (self=0x7fa3179026e0, task=..., notify=0x7fa3179018e0) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-threadpool-0.1.12/src/worker/mod.rs:459
#29 0x0000558249e6a02a in tokio_threadpool::worker::Worker::try_run_owned_task (self=0x7fa3179026e0, notify=0x7fa3179018e0) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-threadpool-0.1.12/src/worker/mod.rs:390
#30 0x0000558249e69a2d in tokio_threadpool::worker::Worker::try_run_task (self=0x7fa3179026e0, notify=0x7fa3179018e0) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-threadpool-0.1.12/src/worker/mod.rs:297
#31 0x0000558249e69886 in tokio_threadpool::worker::Worker::run (self=0x7fa3179026e0) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-threadpool-0.1.12/src/worker/mod.rs:241
#32 0x0000558249df6a1c in tokio::runtime::threadpool::builder::Builder::build::{{closure}}::{{closure}}::{{closure}}::{{closure}} () at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.16/src/runtime/threadpool/builder.rs:340
#33 0x0000558249df6e8e in tokio_timer::timer::handle::with_default::{{closure}} (current=0x7fa317904588) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-timer-0.2.10/src/timer/handle.rs:94
#34 0x0000558249df8197 in <std::thread::local::LocalKey<T>>::try_with (self=0x558249fa14d8, f=...) at /rustc/2aa4c46cfdd726e97360c2734835aa3515e8c858/src/libstd/thread/local.rs:299
#35 0x0000558249df7bb3 in <std::thread::local::LocalKey<T>>::with (self=0x558249fa14d8, f=...) at /rustc/2aa4c46cfdd726e97360c2734835aa3515e8c858/src/libstd/thread/local.rs:245
#36 0x0000558249df6cd3 in tokio_timer::timer::handle::with_default (handle=0x55824a7d6218, enter=0x7fa3179023c0, f=...) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-timer-0.2.10/src/timer/handle.rs:81
#37 0x0000558249df6a7c in tokio::runtime::threadpool::builder::Builder::build::{{closure}}::{{closure}}::{{closure}} (enter=0x7fa3179023c0) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.16/src/runtime/threadpool/builder.rs:339
#38 0x0000558249e025f1 in tokio_timer::clock::clock::with_default::{{closure}} (cell=0x7fa317903360) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-timer-0.2.10/src/clock/clock.rs:141
#39 0x0000558249df7f3c in <std::thread::local::LocalKey<T>>::try_with (self=0x558249fa17c8, f=...) at /rustc/2aa4c46cfdd726e97360c2734835aa3515e8c858/src/libstd/thread/local.rs:299
#40 0x0000558249df7c3d in <std::thread::local::LocalKey<T>>::with (self=0x558249fa17c8, f=...) at /rustc/2aa4c46cfdd726e97360c2734835aa3515e8c858/src/libstd/thread/local.rs:245
#41 0x0000558249e024de in tokio_timer::clock::clock::with_default (clock=0x55824a7da648, enter=0x7fa3179023c0, f=...) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-timer-0.2.10/src/clock/clock.rs:124
#42 0x0000558249df6acd in tokio::runtime::threadpool::builder::Builder::build::{{closure}}::{{closure}} (enter=0x7fa3179023c0) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.16/src/runtime/threadpool/builder.rs:338
#43 0x0000558249df723f in tokio_reactor::with_default::{{closure}} (current=0x7fa3179045a8) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-reactor-0.1.9/src/lib.rs:237
#44 0x0000558249df8653 in <std::thread::local::LocalKey<T>>::try_with (self=0x558249fa1518, f=...) at /rustc/2aa4c46cfdd726e97360c2734835aa3515e8c858/src/libstd/thread/local.rs:299
#45 0x0000558249df7cbd in <std::thread::local::LocalKey<T>>::with (self=0x558249fa1518, f=...) at /rustc/2aa4c46cfdd726e97360c2734835aa3515e8c858/src/libstd/thread/local.rs:245
#46 0x0000558249df7005 in tokio_reactor::with_default (handle=0x55824a7d5578, enter=0x7fa3179023c0, f=...) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-reactor-0.1.9/src/lib.rs:217
#47 0x0000558249df6b70 in tokio::runtime::threadpool::builder::Builder::build::{{closure}} (w=0x7fa3179026e0, enter=0x7fa3179023c0) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.16/src/runtime/threadpool/builder.rs:337
#48 0x0000558249e54343 in tokio_threadpool::callback::Callback::call (self=0x55824a7e21d8, worker=0x7fa3179026e0, enter=0x7fa3179023c0) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-threadpool-0.1.12/src/callback.rs:22
#49 0x0000558249e6951f in tokio_threadpool::worker::Worker::do_run::{{closure}}::{{closure}} (enter=0x7fa3179023c0) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-threadpool-0.1.12/src/worker/mod.rs:127
#50 0x0000558249e70ecb in tokio_executor::global::with_default::{{closure}} (cell=0x7fa317904520) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-executor-0.1.6/src/global.rs:192
#51 0x0000558249e60610 in <std::thread::local::LocalKey<T>>::try_with (self=0x558249fa44b8, f=...) at /rustc/2aa4c46cfdd726e97360c2734835aa3515e8c858/src/libstd/thread/local.rs:299
#52 0x0000558249e5fa23 in <std::thread::local::LocalKey<T>>::with (self=0x558249fa44b8, f=...) at /rustc/2aa4c46cfdd726e97360c2734835aa3515e8c858/src/libstd/thread/local.rs:245
#53 0x0000558249e70da6 in tokio_executor::global::with_default (executor=0x7fa3179023b8, enter=0x7fa3179023c0, f=...) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-executor-0.1.6/src/global.rs:162
#54 0x0000558249e695cd in tokio_threadpool::worker::Worker::do_run::{{closure}} (c=0x7fa3179045e8) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-threadpool-0.1.12/src/worker/mod.rs:125
#55 0x0000558249e5feda in <std::thread::local::LocalKey<T>>::try_with (self=0x558249fa3930, f=...) at /rustc/2aa4c46cfdd726e97360c2734835aa3515e8c858/src/libstd/thread/local.rs:299
#56 0x0000558249e5f9ae in <std::thread::local::LocalKey<T>>::with (self=0x558249fa3930, f=...) at /rustc/2aa4c46cfdd726e97360c2734835aa3515e8c858/src/libstd/thread/local.rs:245
#57 0x0000558249e693f8 in tokio_threadpool::worker::Worker::do_run (self=0x7fa3179026e0) at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-threadpool-0.1.12/src/worker/mod.rs:116
#58 0x0000558249e6f735 in tokio_threadpool::pool::Pool::spawn_thread::{{closure}} () at /home/georg/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-threadpool-0.1.12/src/pool/mod.rs:344
#59 0x0000558249e5ef8e in std::sys_common::backtrace::__rust_begin_short_backtrace (f=...) at /rustc/2aa4c46cfdd726e97360c2734835aa3515e8c858/src/libstd/sys_common/backtrace.rs:135
#60 0x0000558249e59c72 in std::thread::Builder::spawn_unchecked::{{closure}}::{{closure}} () at /rustc/2aa4c46cfdd726e97360c2734835aa3515e8c858/src/libstd/thread/mod.rs:469
#61 0x0000558249e70c52 in <std::panic::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once (self=..., _args=()) at /rustc/2aa4c46cfdd726e97360c2734835aa3515e8c858/src/libstd/panic.rs:309
#62 0x0000558249e8985c in std::panicking::try::do_call (data=0x7fa317902920 "\200 ~J\202U\000") at /rustc/2aa4c46cfdd726e97360c2734835aa3515e8c858/src/libstd/panicking.rs:297
#63 0x0000558249f2251a in __rust_maybe_catch_panic () at src/libpanic_unwind/lib.rs:92
#64 0x0000558249e896c6 in std::panicking::try (f=...) at /rustc/2aa4c46cfdd726e97360c2734835aa3515e8c858/src/libstd/panicking.rs:276
#65 0x0000558249e71af2 in std::panic::catch_unwind (f=...) at /rustc/2aa4c46cfdd726e97360c2734835aa3515e8c858/src/libstd/panic.rs:388
#66 0x0000558249e59a56 in std::thread::Builder::spawn_unchecked::{{closure}} () at /rustc/2aa4c46cfdd726e97360c2734835aa3515e8c858/src/libstd/thread/mod.rs:468
#67 0x0000558249e5dac3 in <F as alloc::boxed::FnBox<A>>::call_box (self=0x55824a7e5780, args=()) at /rustc/2aa4c46cfdd726e97360c2734835aa3515e8c858/src/liballoc/boxed.rs:734
#68 0x0000558249f21cae in _$LT$alloc..boxed..Box$LT$$LP$dyn$u20$alloc..boxed..FnBox$LT$A$C$$u20$Output$u3d$R$GT$$u20$$u2b$$u20$$u27$a$RP$$GT$$u20$as$u20$core..ops..function..FnOnce$LT$A$GT$$GT$::call_once::h2263113ac6678036 () at /rustc/2aa4c46cfdd726e97360c2734835aa3515e8c858/src/liballoc/boxed.rs:744
#69 std::sys_common::thread::start_thread () at src/libstd/sys_common/thread.rs:14
#70 std::sys::unix::thread::Thread::new::thread_start () at src/libstd/sys/unix/thread.rs:81
#71 0x00007fa317c7858e in start_thread () from /lib64/libpthread.so.0
#72 0x00007fa317b8c6a3 in clone () from /lib64/libc.so.6
@georgmu
Copy link
Author

georgmu commented Mar 11, 2019

Some observations:

  • if I sleep longer (15 milliseconds or more), the problem seems to disappear.
  • if I comment out the try_ready line in tokio-tcp/src/stream.rs:361 the problems are gone.
    • So it looks like some epoll problem. I am looking at strace output, but I am not very familiar with epoll.

@blckngm
Copy link
Contributor

blckngm commented Mar 16, 2019

Returning pure Results in loop_fn is essentially busy looping. It certainly won't work.

Use poll_fn the example works OK:

--- a/src/main.rs
+++ b/src/main.rs
@@ -1,4 +1,4 @@
-use futures::future::{loop_fn, ok, Loop};
+use futures::future::{ok, poll_fn};
 use futures::{Async, Future, Stream};
 use tokio::io;
 use tokio::io::AsyncRead;
@@ -24,19 +24,17 @@ fn forward(bind_ip: &str, local_port: i32, _target_host: &str, _target_port: i32
     let server = listener
         .incoming()
         .for_each(|source| {
-            let sni_task = loop_fn(source, |mut source| {
+            // Workaround FnMut.
+            let mut source = Some(source);
+            let sni_task = poll_fn(move || {
                 let mut buf = [0; 1000];
-                match source.poll_peek(&mut buf) {
+                match source.as_mut().unwrap().poll_peek(&mut buf) {
                     Ok(Async::Ready(n)) => {
                         println!("Received {} bytes", n);
                         // TODO check SNI
-                        Ok(Loop::Break(source))
-                    }
-                    Ok(Async::NotReady) => {
-                        // println!("Not ready");
-                        std::thread::sleep(std::time::Duration::from_micros(100));
-                        Ok(Loop::Continue(source))
+                        Ok(Async::Ready(source.take().unwrap()))
                     }
+                    Ok(Async::NotReady) => Ok(Async::NotReady),
                     Err(x) => {
                         println!("poll_peek error: {:?}", x);
                         Err(x)

BTW I don't think poll_peek is the right tool here. The data returned can be shorter than enough to determine the SNI host. And calling it in a loop won't help, because “successive calls return the same data”.

@georgmu
Copy link
Author

georgmu commented Mar 16, 2019

@sopium Thank you for your input.

I could also consume the buffer to avoid the poll_peek, but then I have difficulties calling the copy operation below. I see no option to combine my read data with the rest to make the io::copy work.

Reagarding poll_peek: it returns the same data means it does not consume it, but there could be more data available, thus the buffer will get filled more.

Regardless of the design flaws in my code: I think that the poll_peek not seeing data is still a bug

@blckngm
Copy link
Contributor

blckngm commented Mar 16, 2019

@sopium Thank you for your input.

I could also consume the buffer to avoid the poll_peek, but then I have difficulties calling the copy operation below. I see no option to combine my read data with the rest to make the io::copy work.

You should be able to write_all and_then copy. The stream is returned from write_all so it can be reused.

Reagarding poll_peek: it returns the same data means it does not consume it, but there could be more data available, thus the buffer will get filled more.

Yeah, it's true that the buffer will get filled more. But poll_peek will always return Ready after the first time it returns Ready, even if there is no more data. You won't be able to wait for more data.

Regardless of the design flaws in my code: I think that the poll_peek not seeing data is still a bug

You have to return NotReady, so that the reactor get to run and the socket is marked ready to be read.

@georgmu
Copy link
Author

georgmu commented Mar 16, 2019

You have to return NotReady, so that the reactor get to run and the socket is marked ready to be read.

That was my plan, but I will now try to do the write_all + and_then.

The code above was just a prototype. But in the example, the poll_peek never returned a single Async::Ready(n), and this is a bug if there are 300 bytes available. I think there is an error in the epoll_wait assumption that there is no data, but I didn't find the right place to dig deeper into it.

@carllerche
Copy link
Member

I believe that @sopium has provided the answer, the error lies in incorrect usage of the Tokio APIs. As such, closing. If this is a misinterpretation, please comment and I can reopen.

@carllerche
Copy link
Member

carllerche commented Jul 15, 2019 via email

@georgmu
Copy link
Author

georgmu commented Jul 16, 2019

Sorry, I missed something and deleted my comment minutes after posting it, but the mail was already out. Closing this is okay.

The other problem I have seems unrelated. When woken up once, I cannot re-peek the data if there is not enough data (say: if n < 10 return Ok(Async::NotReady)) In this case the peek is not triggered again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants