@@ -7,7 +7,7 @@ use std::rc::Rc;
77use std:: time:: { Duration , Instant } ;
88
99use futures:: { Future , Async , Poll } ;
10- use futures :: unsync :: oneshot ;
10+ use relay ;
1111
1212use http:: { KeepAlive , KA } ;
1313
@@ -17,8 +17,19 @@ pub struct Pool<T> {
1717
1818struct PoolInner < T > {
1919 enabled : bool ,
20+ // These are internal Conns sitting in the event loop in the KeepAlive
21+ // state, waiting to receive a new Request to send on the socket.
2022 idle : HashMap < Rc < String > , Vec < Entry < T > > > ,
21- parked : HashMap < Rc < String > , VecDeque < oneshot:: Sender < Entry < T > > > > ,
23+ // These are outstanding Checkouts that are waiting for a socket to be
24+ // able to send a Request one. This is used when "racing" for a new
25+ // connection.
26+ //
27+ // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait
28+ // for the Pool to receive an idle Conn. When a Conn becomes idle,
29+ // this list is checked for any parked Checkouts, and tries to notify
30+ // them that the Conn could be used instead of waiting for a brand new
31+ // connection.
32+ parked : HashMap < Rc < String > , VecDeque < relay:: Sender < Entry < T > > > > ,
2233 timeout : Option < Duration > ,
2334}
2435
@@ -50,13 +61,20 @@ impl<T: Clone> Pool<T> {
5061 let mut entry = Some ( entry) ;
5162 if let Some ( parked) = inner. parked . get_mut ( & key) {
5263 while let Some ( tx) = parked. pop_front ( ) {
64+ if tx. is_canceled ( ) {
65+ trace ! ( "Pool::put removing canceled parked {:?}" , key) ;
66+ } else {
67+ tx. complete ( entry. take ( ) . unwrap ( ) ) ;
68+ }
69+ /*
5370 match tx.send(entry.take().unwrap()) {
5471 Ok(()) => break,
5572 Err(e) => {
5673 trace!("Pool::put removing canceled parked {:?}", key);
5774 entry = Some(e);
5875 }
5976 }
77+ */
6078 }
6179 remove_parked = parked. is_empty ( ) ;
6280 }
@@ -74,6 +92,7 @@ impl<T: Clone> Pool<T> {
7492 }
7593 }
7694
95+
7796 pub fn pooled ( & self , key : Rc < String > , value : T ) -> Pooled < T > {
7897 trace ! ( "Pool::pooled {:?}" , key) ;
7998 Pooled {
@@ -102,7 +121,7 @@ impl<T: Clone> Pool<T> {
102121 }
103122 }
104123
105- fn park ( & mut self , key : Rc < String > , tx : oneshot :: Sender < Entry < T > > ) {
124+ fn park ( & mut self , key : Rc < String > , tx : relay :: Sender < Entry < T > > ) {
106125 trace ! ( "Pool::park {:?}" , key) ;
107126 self . inner . borrow_mut ( )
108127 . parked . entry ( key)
@@ -111,6 +130,24 @@ impl<T: Clone> Pool<T> {
111130 }
112131}
113132
133+ impl < T > Pool < T > {
134+ fn clean_parked ( & mut self , key : & Rc < String > ) {
135+ trace ! ( "Pool::clean_parked {:?}" , key) ;
136+ let mut inner = self . inner . borrow_mut ( ) ;
137+
138+ let mut remove_parked = false ;
139+ if let Some ( parked) = inner. parked . get_mut ( key) {
140+ parked. retain ( |tx| {
141+ !tx. is_canceled ( )
142+ } ) ;
143+ remove_parked = parked. is_empty ( ) ;
144+ }
145+ if remove_parked {
146+ inner. parked . remove ( key) ;
147+ }
148+ }
149+ }
150+
114151impl < T > Clone for Pool < T > {
115152 fn clone ( & self ) -> Pool < T > {
116153 Pool {
@@ -204,7 +241,7 @@ enum TimedKA {
204241pub struct Checkout < T > {
205242 key : Rc < String > ,
206243 pool : Pool < T > ,
207- parked : Option < oneshot :: Receiver < Entry < T > > > ,
244+ parked : Option < relay :: Receiver < Entry < T > > > ,
208245}
209246
210247impl < T : Clone > Future for Checkout < T > {
@@ -260,7 +297,7 @@ impl<T: Clone> Future for Checkout<T> {
260297 Some ( entry) => Ok ( Async :: Ready ( self . pool . reuse ( self . key . clone ( ) , entry) ) ) ,
261298 None => {
262299 if self . parked . is_none ( ) {
263- let ( tx, mut rx) = oneshot :: channel ( ) ;
300+ let ( tx, mut rx) = relay :: channel ( ) ;
264301 let _ = rx. poll ( ) ; // park this task
265302 self . pool . park ( self . key . clone ( ) , tx) ;
266303 self . parked = Some ( rx) ;
@@ -271,6 +308,13 @@ impl<T: Clone> Future for Checkout<T> {
271308 }
272309}
273310
311+ impl < T > Drop for Checkout < T > {
312+ fn drop ( & mut self ) {
313+ self . parked . take ( ) ;
314+ self . pool . clean_parked ( & self . key ) ;
315+ }
316+ }
317+
274318struct Expiration ( Option < Duration > ) ;
275319
276320impl Expiration {
@@ -364,4 +408,30 @@ mod tests {
364408 } ) ) . map ( |( entry, _) | entry) ;
365409 assert_eq ! ( * checkout. wait( ) . unwrap( ) , * pooled1) ;
366410 }
411+
412+ #[ test]
413+ fn test_pool_checkout_drop_cleans_up_parked ( ) {
414+ future:: lazy ( || {
415+ let pool = Pool :: new ( true , Some ( Duration :: from_secs ( 10 ) ) ) ;
416+ let key = Rc :: new ( "localhost:12345" . to_string ( ) ) ;
417+ let _pooled1 = pool. pooled ( key. clone ( ) , 41 ) ;
418+ let mut checkout1 = pool. checkout ( & key) ;
419+ let mut checkout2 = pool. checkout ( & key) ;
420+
421+ // first poll needed to get into Pool's parked
422+ checkout1. poll ( ) . unwrap ( ) ;
423+ assert_eq ! ( pool. inner. borrow( ) . parked. get( & key) . unwrap( ) . len( ) , 1 ) ;
424+ checkout2. poll ( ) . unwrap ( ) ;
425+ assert_eq ! ( pool. inner. borrow( ) . parked. get( & key) . unwrap( ) . len( ) , 2 ) ;
426+
427+ // on drop, clean up Pool
428+ drop ( checkout1) ;
429+ assert_eq ! ( pool. inner. borrow( ) . parked. get( & key) . unwrap( ) . len( ) , 1 ) ;
430+
431+ drop ( checkout2) ;
432+ assert ! ( pool. inner. borrow( ) . parked. get( & key) . is_none( ) ) ;
433+
434+ :: futures:: future:: ok :: < ( ) , ( ) > ( ( ) )
435+ } ) . wait ( ) . unwrap ( ) ;
436+ }
367437}
0 commit comments