Skip to content

Commit c89019e

Browse files
committed
feat(client): add executor method when configuring a Client
This allows using a future `Executor` other than a `Handle` to execute the background (connection) tasks needed for sending requests and responses. This also deprecates `Client::handle()`, since the executor may not be a `Handle`.
1 parent 350ce54 commit c89019e

File tree

2 files changed

+133
-13
lines changed

2 files changed

+133
-13
lines changed

src/client/mod.rs

+95-13
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ use std::marker::PhantomData;
77
use std::rc::Rc;
88
use std::time::Duration;
99

10-
use futures::{future, Poll, Future, Stream};
10+
use futures::{Future, Poll, Stream};
11+
use futures::future::{self, Executor};
1112
#[cfg(feature = "compat")]
1213
use http;
1314
use tokio::reactor::Handle;
@@ -25,6 +26,8 @@ pub use proto::response::Response;
2526
pub use proto::request::Request;
2627
pub use self::connect::{HttpConnector, Connect};
2728

29+
use self::background::{bg, Background};
30+
2831
mod connect;
2932
mod dns;
3033
mod pool;
@@ -37,7 +40,7 @@ pub mod compat;
3740
// If the Connector is clone, then the Client can be clone easily.
3841
pub struct Client<C, B = proto::Body> {
3942
connector: C,
40-
handle: Handle,
43+
executor: Exec,
4144
pool: Pool<HyperClient<B>>,
4245
}
4346

@@ -74,18 +77,24 @@ impl Client<HttpConnector, proto::Body> {
7477
}
7578

7679
impl<C, B> Client<C, B> {
77-
/// Return a reference to a handle to the event loop this Client is associated with.
78-
#[inline]
80+
// Eventually, a Client won't really care about a tokio Handle, and only
81+
// the executor used to spawn background tasks. Removing this method is
82+
// a breaking change, so for now, it's just deprecated.
83+
#[doc(hidden)]
84+
#[deprecated]
7985
pub fn handle(&self) -> &Handle {
80-
&self.handle
86+
match self.executor {
87+
Exec::Handle(ref h) => h,
88+
Exec::Executor(..) => panic!("Client not built with a Handle"),
89+
}
8190
}
8291

8392
/// Create a new client with a specific connector.
8493
#[inline]
85-
fn configured(config: Config<C, B>, handle: &Handle) -> Client<C, B> {
94+
fn configured(config: Config<C, B>, exec: Exec) -> Client<C, B> {
8695
Client {
8796
connector: config.connector,
88-
handle: handle.clone(),
97+
executor: exec,
8998
pool: Pool::new(config.keep_alive, config.keep_alive_timeout)
9099
}
91100
}
@@ -185,11 +194,11 @@ where C: Connect,
185194

186195
let checkout = self.pool.checkout(domain.as_ref());
187196
let connect = {
188-
let handle = self.handle.clone();
197+
let executor = self.executor.clone();
189198
let pool = self.pool.clone();
190199
let pool_key = Rc::new(domain.to_string());
191200
self.connector.connect(url)
192-
.map(move |io| {
201+
.and_then(move |io| {
193202
let (tx, rx) = mpsc::channel(0);
194203
let tx = HyperClient {
195204
tx: RefCell::new(tx),
@@ -198,8 +207,8 @@ where C: Connect,
198207
let pooled = pool.pooled(pool_key, tx);
199208
let conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(io, pooled.clone());
200209
let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn);
201-
handle.spawn(dispatch.map_err(|err| debug!("client connection error: {}", err)));
202-
pooled
210+
executor.execute(dispatch.map_err(|e| debug!("client connection error: {}", e)))?;
211+
Ok(pooled)
203212
})
204213
};
205214

@@ -236,7 +245,7 @@ impl<C: Clone, B> Clone for Client<C, B> {
236245
fn clone(&self) -> Client<C, B> {
237246
Client {
238247
connector: self.connector.clone(),
239-
handle: self.handle.clone(),
248+
executor: self.executor.clone(),
240249
pool: self.pool.clone(),
241250
}
242251
}
@@ -384,7 +393,18 @@ where C: Connect,
384393
/// Construct the Client with this configuration.
385394
#[inline]
386395
pub fn build(self, handle: &Handle) -> Client<C, B> {
387-
Client::configured(self, handle)
396+
Client::configured(self, Exec::Handle(handle.clone()))
397+
}
398+
399+
/// Construct a Client with this configuration and an executor.
400+
///
401+
/// The executor will be used to spawn "background" connection tasks
402+
/// to drive requests and responses.
403+
pub fn executor<E>(self, executor: E) -> Client<C, B>
404+
where
405+
E: Executor<Background> + 'static,
406+
{
407+
Client::configured(self, Exec::Executor(Rc::new(executor)))
388408
}
389409
}
390410

@@ -417,3 +437,65 @@ impl<C: Clone, B> Clone for Config<C, B> {
417437
}
418438
}
419439
}
440+
441+
442+
// ===== impl Exec =====
443+
444+
#[derive(Clone)]
445+
enum Exec {
446+
Handle(Handle),
447+
Executor(Rc<Executor<Background>>),
448+
}
449+
450+
451+
impl Exec {
452+
fn execute<F>(&self, fut: F) -> io::Result<()>
453+
where
454+
F: Future<Item=(), Error=()> + 'static,
455+
{
456+
match *self {
457+
Exec::Handle(ref h) => h.spawn(fut),
458+
Exec::Executor(ref e) => {
459+
e.execute(bg(Box::new(fut)))
460+
.map_err(|err| {
461+
debug!("executor error: {:?}", err.kind());
462+
io::Error::new(
463+
io::ErrorKind::Other,
464+
"executor error",
465+
)
466+
})?
467+
},
468+
}
469+
Ok(())
470+
}
471+
}
472+
473+
// ===== impl Background =====
474+
475+
// The types inside this module are not exported out of the crate,
476+
// so they are in essence un-nameable.
477+
mod background {
478+
use futures::{Future, Poll};
479+
480+
// This is basically `impl Future`, since the type is un-nameable,
481+
// and only implementeds `Future`.
482+
#[allow(missing_debug_implementations)]
483+
pub struct Background {
484+
inner: Box<Future<Item=(), Error=()>>,
485+
}
486+
487+
pub fn bg(fut: Box<Future<Item=(), Error=()>>) -> Background {
488+
Background {
489+
inner: fut,
490+
}
491+
}
492+
493+
impl Future for Background {
494+
type Item = ();
495+
type Error = ();
496+
497+
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
498+
self.inner.poll()
499+
}
500+
}
501+
}

tests/client.rs

+38
Original file line numberDiff line numberDiff line change
@@ -888,6 +888,44 @@ mod dispatch_impl {
888888
assert_eq!(closes.load(Ordering::Relaxed), 1);
889889
}
890890

891+
#[test]
892+
fn client_custom_executor() {
893+
let server = TcpListener::bind("127.0.0.1:0").unwrap();
894+
let addr = server.local_addr().unwrap();
895+
let mut core = Core::new().unwrap();
896+
let handle = core.handle();
897+
let closes = Arc::new(AtomicUsize::new(0));
898+
899+
let (tx1, rx1) = oneshot::channel();
900+
901+
thread::spawn(move || {
902+
let mut sock = server.accept().unwrap().0;
903+
sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
904+
sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap();
905+
let mut buf = [0; 4096];
906+
sock.read(&mut buf).expect("read 1");
907+
sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").unwrap();
908+
let _ = tx1.send(());
909+
});
910+
911+
let uri = format!("http://{}/a", addr).parse().unwrap();
912+
913+
let client = Client::configure()
914+
.connector(DebugConnector(HttpConnector::new(1, &handle), closes.clone()))
915+
.executor(handle.clone());
916+
let res = client.get(uri).and_then(move |res| {
917+
assert_eq!(res.status(), hyper::StatusCode::Ok);
918+
res.body().concat2()
919+
});
920+
let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked")));
921+
922+
let timeout = Timeout::new(Duration::from_millis(200), &handle).unwrap();
923+
let rx = rx.and_then(move |_| timeout.map_err(|e| e.into()));
924+
core.run(res.join(rx).map(|r| r.0)).unwrap();
925+
926+
assert_eq!(closes.load(Ordering::Relaxed), 1);
927+
}
928+
891929
struct DebugConnector(HttpConnector, Arc<AtomicUsize>);
892930

893931
impl Service for DebugConnector {

0 commit comments

Comments
 (0)