@@ -7,7 +7,7 @@ use std::rc::Rc;
7
7
use std:: time:: { Duration , Instant } ;
8
8
9
9
use futures:: { Future , Async , Poll } ;
10
- use futures :: unsync :: oneshot ;
10
+ use relay ;
11
11
12
12
use http:: { KeepAlive , KA } ;
13
13
@@ -17,8 +17,19 @@ pub struct Pool<T> {
17
17
18
18
struct PoolInner < T > {
19
19
enabled : bool ,
20
+ // These are internal Conns sitting in the event loop in the KeepAlive
21
+ // state, waiting to receive a new Request to send on the socket.
20
22
idle : HashMap < Rc < String > , Vec < Entry < T > > > ,
21
- parked : HashMap < Rc < String > , VecDeque < oneshot:: Sender < Entry < T > > > > ,
23
+ // These are outstanding Checkouts that are waiting for a socket to be
24
+ // able to send a Request one. This is used when "racing" for a new
25
+ // connection.
26
+ //
27
+ // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait
28
+ // for the Pool to receive an idle Conn. When a Conn becomes idle,
29
+ // this list is checked for any parked Checkouts, and tries to notify
30
+ // them that the Conn could be used instead of waiting for a brand new
31
+ // connection.
32
+ parked : HashMap < Rc < String > , VecDeque < relay:: Sender < Entry < T > > > > ,
22
33
timeout : Option < Duration > ,
23
34
}
24
35
@@ -50,13 +61,20 @@ impl<T: Clone> Pool<T> {
50
61
let mut entry = Some ( entry) ;
51
62
if let Some ( parked) = inner. parked . get_mut ( & key) {
52
63
while let Some ( tx) = parked. pop_front ( ) {
64
+ if tx. is_canceled ( ) {
65
+ trace ! ( "Pool::put removing canceled parked {:?}" , key) ;
66
+ } else {
67
+ tx. complete ( entry. take ( ) . unwrap ( ) ) ;
68
+ }
69
+ /*
53
70
match tx.send(entry.take().unwrap()) {
54
71
Ok(()) => break,
55
72
Err(e) => {
56
73
trace!("Pool::put removing canceled parked {:?}", key);
57
74
entry = Some(e);
58
75
}
59
76
}
77
+ */
60
78
}
61
79
remove_parked = parked. is_empty ( ) ;
62
80
}
@@ -74,6 +92,7 @@ impl<T: Clone> Pool<T> {
74
92
}
75
93
}
76
94
95
+
77
96
pub fn pooled ( & self , key : Rc < String > , value : T ) -> Pooled < T > {
78
97
trace ! ( "Pool::pooled {:?}" , key) ;
79
98
Pooled {
@@ -102,7 +121,7 @@ impl<T: Clone> Pool<T> {
102
121
}
103
122
}
104
123
105
- fn park ( & mut self , key : Rc < String > , tx : oneshot :: Sender < Entry < T > > ) {
124
+ fn park ( & mut self , key : Rc < String > , tx : relay :: Sender < Entry < T > > ) {
106
125
trace ! ( "Pool::park {:?}" , key) ;
107
126
self . inner . borrow_mut ( )
108
127
. parked . entry ( key)
@@ -111,6 +130,24 @@ impl<T: Clone> Pool<T> {
111
130
}
112
131
}
113
132
133
+ impl < T > Pool < T > {
134
+ fn clean_parked ( & mut self , key : & Rc < String > ) {
135
+ trace ! ( "Pool::clean_parked {:?}" , key) ;
136
+ let mut inner = self . inner . borrow_mut ( ) ;
137
+
138
+ let mut remove_parked = false ;
139
+ if let Some ( parked) = inner. parked . get_mut ( key) {
140
+ parked. retain ( |tx| {
141
+ !tx. is_canceled ( )
142
+ } ) ;
143
+ remove_parked = parked. is_empty ( ) ;
144
+ }
145
+ if remove_parked {
146
+ inner. parked . remove ( key) ;
147
+ }
148
+ }
149
+ }
150
+
114
151
impl < T > Clone for Pool < T > {
115
152
fn clone ( & self ) -> Pool < T > {
116
153
Pool {
@@ -204,7 +241,7 @@ enum TimedKA {
204
241
pub struct Checkout < T > {
205
242
key : Rc < String > ,
206
243
pool : Pool < T > ,
207
- parked : Option < oneshot :: Receiver < Entry < T > > > ,
244
+ parked : Option < relay :: Receiver < Entry < T > > > ,
208
245
}
209
246
210
247
impl < T : Clone > Future for Checkout < T > {
@@ -260,7 +297,7 @@ impl<T: Clone> Future for Checkout<T> {
260
297
Some ( entry) => Ok ( Async :: Ready ( self . pool . reuse ( self . key . clone ( ) , entry) ) ) ,
261
298
None => {
262
299
if self . parked . is_none ( ) {
263
- let ( tx, mut rx) = oneshot :: channel ( ) ;
300
+ let ( tx, mut rx) = relay :: channel ( ) ;
264
301
let _ = rx. poll ( ) ; // park this task
265
302
self . pool . park ( self . key . clone ( ) , tx) ;
266
303
self . parked = Some ( rx) ;
@@ -271,6 +308,13 @@ impl<T: Clone> Future for Checkout<T> {
271
308
}
272
309
}
273
310
311
+ impl < T > Drop for Checkout < T > {
312
+ fn drop ( & mut self ) {
313
+ self . parked . take ( ) ;
314
+ self . pool . clean_parked ( & self . key ) ;
315
+ }
316
+ }
317
+
274
318
struct Expiration ( Option < Duration > ) ;
275
319
276
320
impl Expiration {
@@ -364,4 +408,30 @@ mod tests {
364
408
} ) ) . map ( |( entry, _) | entry) ;
365
409
assert_eq ! ( * checkout. wait( ) . unwrap( ) , * pooled1) ;
366
410
}
411
+
412
+ #[ test]
413
+ fn test_pool_checkout_drop_cleans_up_parked ( ) {
414
+ future:: lazy ( || {
415
+ let pool = Pool :: new ( true , Some ( Duration :: from_secs ( 10 ) ) ) ;
416
+ let key = Rc :: new ( "localhost:12345" . to_string ( ) ) ;
417
+ let _pooled1 = pool. pooled ( key. clone ( ) , 41 ) ;
418
+ let mut checkout1 = pool. checkout ( & key) ;
419
+ let mut checkout2 = pool. checkout ( & key) ;
420
+
421
+ // first poll needed to get into Pool's parked
422
+ checkout1. poll ( ) . unwrap ( ) ;
423
+ assert_eq ! ( pool. inner. borrow( ) . parked. get( & key) . unwrap( ) . len( ) , 1 ) ;
424
+ checkout2. poll ( ) . unwrap ( ) ;
425
+ assert_eq ! ( pool. inner. borrow( ) . parked. get( & key) . unwrap( ) . len( ) , 2 ) ;
426
+
427
+ // on drop, clean up Pool
428
+ drop ( checkout1) ;
429
+ assert_eq ! ( pool. inner. borrow( ) . parked. get( & key) . unwrap( ) . len( ) , 1 ) ;
430
+
431
+ drop ( checkout2) ;
432
+ assert ! ( pool. inner. borrow( ) . parked. get( & key) . is_none( ) ) ;
433
+
434
+ :: futures:: future:: ok :: < ( ) , ( ) > ( ( ) )
435
+ } ) . wait ( ) . unwrap ( ) ;
436
+ }
367
437
}
0 commit comments