diff --git a/Cargo.toml b/Cargo.toml index f68b0e58a8..9a4de87db1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,9 +21,9 @@ include = [ [dependencies] bytes = "0.4.6" -futures-core-preview = "=0.3.0-alpha.19" -futures-channel-preview = "=0.3.0-alpha.19" -futures-util-preview = "=0.3.0-alpha.19" +futures-core = "0.3.1" +futures-channel = "0.3.1" +futures-util = "0.3.1" http = "0.1.15" http-body = "=0.2.0-alpha.3" httparse = "1.0" @@ -48,6 +48,7 @@ tokio-timer = { version = "=0.3.0-alpha.6", optional = true } [dev-dependencies] +futures-util-a19 = { version = "=0.3.0-alpha.19", package = "futures-util-preview" } matches = "0.1" num_cpus = "1.0" pretty_env_logger = "0.3" diff --git a/src/body/body.rs b/src/body/body.rs index 631d6951d1..7a8784118f 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -487,7 +487,7 @@ impl From> for Body { impl Sender { /// Check to see if this `Sender` can send more data. pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { - match self.abort_tx.poll_cancel(cx) { + match self.abort_tx.poll_canceled(cx) { Poll::Ready(()) => return Poll::Ready(Err(crate::Error::new_closed())), Poll::Pending => (), // fallthrough } diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 9eb69d2ec3..4bd1d37059 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -194,10 +194,10 @@ impl Callback { } } - pub(crate) fn poll_cancel(&mut self, cx: &mut task::Context<'_>) -> Poll<()> { + pub(crate) fn poll_canceled(&mut self, cx: &mut task::Context<'_>) -> Poll<()> { match *self { - Callback::Retry(ref mut tx) => tx.poll_cancel(cx), - Callback::NoRetry(ref mut tx) => tx.poll_cancel(cx), + Callback::Retry(ref mut tx) => tx.poll_canceled(cx), + Callback::NoRetry(ref mut tx) => tx.poll_canceled(cx), } } @@ -229,7 +229,7 @@ impl Callback { }, Poll::Pending => { // check if the callback is canceled - ready!(cb.as_mut().unwrap().poll_cancel(cx)); + ready!(cb.as_mut().unwrap().poll_canceled(cx)); trace!("send_when canceled"); Poll::Ready(()) }, diff --git a/src/client/mod.rs b/src/client/mod.rs index c2c8afb193..592db82c23 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -64,8 +64,7 @@ use std::sync::Arc; use std::time::Duration; use futures_channel::oneshot; -use futures_util::future::{self, FutureExt as _, Either}; -use futures_util::try_future::TryFutureExt as _; +use futures_util::future::{self, FutureExt as _, TryFutureExt as _, Either}; use http::{Method, Request, Response, Uri, Version}; use http::header::{HeaderValue, HOST}; use http::uri::Scheme; diff --git a/src/client/pool.rs b/src/client/pool.rs index 021dd72924..e13d2245a0 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -733,9 +733,6 @@ impl Future for IdleTask { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - // Interval is a Stream - use futures_core::Stream; - loop { match Pin::new(&mut self.pool_drop_notifier).poll(cx) { Poll::Ready(Ok(n)) => match n {}, diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 9e344c9b02..e9dbb88eed 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -511,7 +511,7 @@ where match self.rx.poll_next(cx) { Poll::Ready(Some((req, mut cb))) => { // check that future hasn't been canceled already - match cb.poll_cancel(cx) { + match cb.poll_canceled(cx) { Poll::Ready(()) => { trace!("request canceled"); Poll::Ready(None) @@ -579,7 +579,7 @@ where fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { match self.callback { - Some(ref mut cb) => match cb.poll_cancel(cx) { + Some(ref mut cb) => match cb.poll_canceled(cx) { Poll::Ready(()) => { trace!("callback receiver has dropped"); Poll::Ready(Err(())) diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index d05dd1c3b8..6b7760f60e 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -1,7 +1,6 @@ use futures_channel::{mpsc, oneshot}; -use futures_util::future::{self, FutureExt as _, Either}; +use futures_util::future::{self, FutureExt as _, TryFutureExt as _, Either}; use futures_util::stream::StreamExt as _; -use futures_util::try_future::TryFutureExt as _; use h2::client::{Builder, SendRequest}; use tokio_io::{AsyncRead, AsyncWrite}; diff --git a/tests/client.rs b/tests/client.rs index da1a852892..05df6d111d 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -14,9 +14,8 @@ use hyper::{Body, Client, Method, Request, StatusCode}; use futures_core::{Future, Stream, TryFuture}; use futures_channel::oneshot; -use futures_util::future::{self, FutureExt}; -use futures_util::try_future::{self, TryFutureExt}; -use futures_util::try_stream::TryStreamExt; +use futures_util::future::{self, FutureExt, TryFutureExt}; +use futures_util::stream::TryStreamExt; use tokio::runtime::current_thread::Runtime; use tokio_net::tcp::TcpStream; @@ -256,7 +255,7 @@ macro_rules! test { let rx = rx.expect("thread panicked"); - rt.block_on(try_future::try_join(res, rx).map_ok(|r| r.0)).map(move |mut resp| { + rt.block_on(future::try_join(res, rx).map_ok(|r| r.0)).map(move |mut resp| { // Always check that HttpConnector has set the "extra" info... let extra = resp .extensions_mut() @@ -931,10 +930,8 @@ mod dispatch_impl { use futures_core::{self, Future}; use futures_channel::{mpsc, oneshot}; - use futures_util::future::FutureExt; - use futures_util::stream::StreamExt; - use futures_util::try_future::TryFutureExt; - use futures_util::try_stream::TryStreamExt; + use futures_util::future::{FutureExt, TryFutureExt}; + use futures_util::stream::{StreamExt, TryStreamExt}; use tokio::runtime::current_thread::Runtime; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_net::tcp::TcpStream; @@ -1673,7 +1670,7 @@ mod dispatch_impl { let res1 = client.get(url.clone()); let res2 = client.get(url.clone()); let res3 = client.get(url.clone()); - rt.block_on(try_future::try_join3(res1, res2, res3)).unwrap(); + rt.block_on(future::try_join3(res1, res2, res3)).unwrap(); // Since the client doesn't know it can ALPN at first, it will have // started 3 connections. But, the server above will only handle 1, @@ -1792,9 +1789,8 @@ mod conn { use std::time::{Duration}; use futures_channel::oneshot; - use futures_util::future::{self, poll_fn, FutureExt}; - use futures_util::try_future::TryFutureExt; - use futures_util::try_stream::TryStreamExt; + use futures_util::future::{self, poll_fn, FutureExt, TryFutureExt}; + use futures_util::stream::TryStreamExt; use tokio::runtime::current_thread::Runtime; use tokio_io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _}; use tokio_net::tcp::{TcpListener as TkTcpListener, TcpStream}; diff --git a/tests/server.rs b/tests/server.rs index 0c5b563808..7e2d9a4787 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -15,10 +15,11 @@ use std::time::Duration; use futures_channel::oneshot; use futures_core::ready; use futures_core::future::BoxFuture; -use futures_util::future::{self, Either, FutureExt}; -use futures_util::stream::StreamExt; -use futures_util::try_future::{self, TryFutureExt}; -//use futures_util::try_stream::TryStreamExt; +use futures_util::future::{self, Either, FutureExt, TryFutureExt}; +#[cfg(feature = "unstable-stream")] +use futures_util::stream::StreamExt as _; +// TODO: remove once tokio is updated to futures 0.3 +use futures_util_a19::stream::StreamExt as _; use http::header::{HeaderName, HeaderValue}; use tokio_net::driver::Handle; use tokio_net::tcp::{TcpListener, TcpStream as TkTcpStream}; @@ -879,7 +880,7 @@ fn disable_keep_alive_mid_request() { .map_err(|_| unreachable!()) .and_then(|socket| { let srv = Http::new().serve_connection(socket, HelloWorld); - try_future::try_select(srv, rx1) + future::try_select(srv, rx1) .then(|r| { match r { Ok(Either::Left(_)) => panic!("expected rx first"), @@ -938,7 +939,7 @@ fn disable_keep_alive_post_request() { _debug: dropped2, }; let server = Http::new().serve_connection(transport, HelloWorld); - try_future::try_select(server, rx1) + future::try_select(server, rx1) .then(|r| { match r { Ok(Either::Left(_)) => panic!("expected rx first"), diff --git a/tests/support/mod.rs b/tests/support/mod.rs index 858802f86b..924f607b09 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -8,7 +8,7 @@ use hyper::client::HttpConnector; use hyper::service::{make_service_fn, service_fn}; pub use std::net::SocketAddr; -pub use futures_util::{future, try_future, FutureExt as _, StreamExt as _, TryFutureExt as _, TryStreamExt as _}; +pub use futures_util::{future, FutureExt as _, StreamExt as _, TryFutureExt as _, TryStreamExt as _}; //pub use self::futures_channel::oneshot; pub use hyper::{HeaderMap, StatusCode}; pub use tokio::runtime::current_thread::Runtime;