Skip to content

Commit 3b91fc6

Browse files
committed
fix(client): cleanup dropped pending Checkouts from Pool
Closes #1315
1 parent 971864c commit 3b91fc6

File tree

3 files changed

+77
-5
lines changed

3 files changed

+77
-5
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ language-tags = "0.2"
2828
log = "0.3"
2929
mime = "0.3.2"
3030
percent-encoding = "1.0"
31+
relay = "0.1"
3132
time = "0.1"
3233
tokio-core = "0.1.6"
3334
tokio-proto = "0.1"

src/client/pool.rs

+75-5
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::rc::Rc;
77
use std::time::{Duration, Instant};
88

99
use futures::{Future, Async, Poll};
10-
use futures::unsync::oneshot;
10+
use relay;
1111

1212
use http::{KeepAlive, KA};
1313

@@ -17,8 +17,19 @@ pub struct Pool<T> {
1717

1818
struct PoolInner<T> {
1919
enabled: bool,
20+
// These are internal Conns sitting in the event loop in the KeepAlive
21+
// state, waiting to receive a new Request to send on the socket.
2022
idle: HashMap<Rc<String>, Vec<Entry<T>>>,
21-
parked: HashMap<Rc<String>, VecDeque<oneshot::Sender<Entry<T>>>>,
23+
// These are outstanding Checkouts that are waiting for a socket to be
24+
// able to send a Request one. This is used when "racing" for a new
25+
// connection.
26+
//
27+
// The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait
28+
// for the Pool to receive an idle Conn. When a Conn becomes idle,
29+
// this list is checked for any parked Checkouts, and tries to notify
30+
// them that the Conn could be used instead of waiting for a brand new
31+
// connection.
32+
parked: HashMap<Rc<String>, VecDeque<relay::Sender<Entry<T>>>>,
2233
timeout: Option<Duration>,
2334
}
2435

@@ -50,13 +61,20 @@ impl<T: Clone> Pool<T> {
5061
let mut entry = Some(entry);
5162
if let Some(parked) = inner.parked.get_mut(&key) {
5263
while let Some(tx) = parked.pop_front() {
64+
if tx.is_canceled() {
65+
trace!("Pool::put removing canceled parked {:?}", key);
66+
} else {
67+
tx.complete(entry.take().unwrap());
68+
}
69+
/*
5370
match tx.send(entry.take().unwrap()) {
5471
Ok(()) => break,
5572
Err(e) => {
5673
trace!("Pool::put removing canceled parked {:?}", key);
5774
entry = Some(e);
5875
}
5976
}
77+
*/
6078
}
6179
remove_parked = parked.is_empty();
6280
}
@@ -74,6 +92,7 @@ impl<T: Clone> Pool<T> {
7492
}
7593
}
7694

95+
7796
pub fn pooled(&self, key: Rc<String>, value: T) -> Pooled<T> {
7897
trace!("Pool::pooled {:?}", key);
7998
Pooled {
@@ -102,7 +121,7 @@ impl<T: Clone> Pool<T> {
102121
}
103122
}
104123

105-
fn park(&mut self, key: Rc<String>, tx: oneshot::Sender<Entry<T>>) {
124+
fn park(&mut self, key: Rc<String>, tx: relay::Sender<Entry<T>>) {
106125
trace!("Pool::park {:?}", key);
107126
self.inner.borrow_mut()
108127
.parked.entry(key)
@@ -111,6 +130,24 @@ impl<T: Clone> Pool<T> {
111130
}
112131
}
113132

133+
impl<T> Pool<T> {
134+
fn clean_parked(&mut self, key: &Rc<String>) {
135+
trace!("Pool::clean_parked {:?}", key);
136+
let mut inner = self.inner.borrow_mut();
137+
138+
let mut remove_parked = false;
139+
if let Some(parked) = inner.parked.get_mut(key) {
140+
parked.retain(|tx| {
141+
!tx.is_canceled()
142+
});
143+
remove_parked = parked.is_empty();
144+
}
145+
if remove_parked {
146+
inner.parked.remove(key);
147+
}
148+
}
149+
}
150+
114151
impl<T> Clone for Pool<T> {
115152
fn clone(&self) -> Pool<T> {
116153
Pool {
@@ -204,7 +241,7 @@ enum TimedKA {
204241
pub struct Checkout<T> {
205242
key: Rc<String>,
206243
pool: Pool<T>,
207-
parked: Option<oneshot::Receiver<Entry<T>>>,
244+
parked: Option<relay::Receiver<Entry<T>>>,
208245
}
209246

210247
impl<T: Clone> Future for Checkout<T> {
@@ -260,7 +297,7 @@ impl<T: Clone> Future for Checkout<T> {
260297
Some(entry) => Ok(Async::Ready(self.pool.reuse(self.key.clone(), entry))),
261298
None => {
262299
if self.parked.is_none() {
263-
let (tx, mut rx) = oneshot::channel();
300+
let (tx, mut rx) = relay::channel();
264301
let _ = rx.poll(); // park this task
265302
self.pool.park(self.key.clone(), tx);
266303
self.parked = Some(rx);
@@ -271,6 +308,13 @@ impl<T: Clone> Future for Checkout<T> {
271308
}
272309
}
273310

311+
impl<T> Drop for Checkout<T> {
312+
fn drop(&mut self) {
313+
self.parked.take();
314+
self.pool.clean_parked(&self.key);
315+
}
316+
}
317+
274318
struct Expiration(Option<Duration>);
275319

276320
impl Expiration {
@@ -364,4 +408,30 @@ mod tests {
364408
})).map(|(entry, _)| entry);
365409
assert_eq!(*checkout.wait().unwrap(), *pooled1);
366410
}
411+
412+
#[test]
413+
fn test_pool_checkout_drop_cleans_up_parked() {
414+
future::lazy(|| {
415+
let pool = Pool::new(true, Some(Duration::from_secs(10)));
416+
let key = Rc::new("localhost:12345".to_string());
417+
let _pooled1 = pool.pooled(key.clone(), 41);
418+
let mut checkout1 = pool.checkout(&key);
419+
let mut checkout2 = pool.checkout(&key);
420+
421+
// first poll needed to get into Pool's parked
422+
checkout1.poll().unwrap();
423+
assert_eq!(pool.inner.borrow().parked.get(&key).unwrap().len(), 1);
424+
checkout2.poll().unwrap();
425+
assert_eq!(pool.inner.borrow().parked.get(&key).unwrap().len(), 2);
426+
427+
// on drop, clean up Pool
428+
drop(checkout1);
429+
assert_eq!(pool.inner.borrow().parked.get(&key).unwrap().len(), 1);
430+
431+
drop(checkout2);
432+
assert!(pool.inner.borrow().parked.get(&key).is_none());
433+
434+
::futures::future::ok::<(), ()>(())
435+
}).wait().unwrap();
436+
}
367437
}

src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ extern crate language_tags;
2626
#[macro_use] extern crate log;
2727
pub extern crate mime;
2828
#[macro_use] extern crate percent_encoding;
29+
extern crate relay;
2930
extern crate time;
3031
extern crate tokio_core as tokio;
3132
#[macro_use] extern crate tokio_io;

0 commit comments

Comments
 (0)