@@ -152,7 +152,10 @@ mod tests {
152
152
use event_loop:: SENDER ;
153
153
use protocol:: { ThriftMessage , ThriftMessageType } ;
154
154
use binary_protocol:: BinaryDeserializer ;
155
+ use std:: sync:: mpsc:: channel;
155
156
use util;
157
+ use std:: thread;
158
+ use std:: time:: Duration ;
156
159
157
160
#[ test]
158
161
fn should_create_server_dispatcher ( ) {
@@ -164,19 +167,32 @@ mod tests {
164
167
fn should_start_server ( ) {
165
168
let addr: SocketAddr = "127.0.0.1:5955" . parse ( ) . unwrap ( ) ;
166
169
let ( handle_server, server) = Dispatcher :: spawn ( Role :: Server ( addr. clone ( ) ) ) . unwrap ( ) ;
170
+ thread:: sleep ( Duration :: from_millis ( 30 ) ) ;
167
171
let ( handle_client, client) = Dispatcher :: spawn ( Role :: Client ( addr. clone ( ) ) ) . unwrap ( ) ;
168
172
169
173
let buf = util:: create_empty_thrift_message ( "foobar123" , ThriftMessageType :: Call ) ;
170
174
171
175
let ( res, future) = Future :: < ( ThriftMessage , BinaryDeserializer < Cursor < Vec < u8 > > > ) > :: channel ( ) ;
172
176
client. send ( Incoming :: Call ( "foobar123" . to_string ( ) , buf, res) ) . unwrap ( ) ;
173
177
178
+ let ( res_tx, res_rx) = channel ( ) ;
179
+ let cloned = res_tx. clone ( ) ;
174
180
future. and_then ( move |( msg, de) | {
175
181
println ! ( "[test]: Received: {:?}" , msg) ;
176
182
SENDER . clone ( ) . send ( Message :: Shutdown ) ;
183
+ res_tx. send ( 0 ) ;
177
184
Async :: Ok ( ( ) )
178
185
} ) ;
179
186
187
+ thread:: spawn ( move || -> Result < ( ) , ( ) > {
188
+ thread:: sleep ( Duration :: from_millis ( 3000 ) ) ;
189
+ SENDER . clone ( ) . send ( Message :: Shutdown ) ;
190
+ panic ! ( "Test timeout was hit. This means the Reactor did not shutdown and a response was not received." ) ;
191
+ cloned. send ( 1 ) ;
192
+ } ) ;
193
+
180
194
Reactor :: run ( ) . join ( ) ;
195
+
196
+ assert_eq ! ( res_rx. recv( ) . unwrap( ) , 0 ) ;
181
197
}
182
198
}
0 commit comments