@@ -576,19 +576,14 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
576
576
}
577
577
578
578
test(" ask a message timeout on Future using RpcTimeout" ) {
579
- case class SleepyReply (msg : String )
579
+ case class NeverReply (msg : String )
580
580
581
581
val rpcEndpointRef = env.setupEndpoint(" ask-future" , new RpcEndpoint {
582
582
override val rpcEnv = env
583
583
584
584
override def receiveAndReply (context : RpcCallContext ): PartialFunction [Any , Unit ] = {
585
- case msg : String => {
586
- context.reply(msg)
587
- }
588
- case sr : SleepyReply => {
589
- Thread .sleep(50 )
590
- context.reply(sr.msg)
591
- }
585
+ case msg : String => context.reply(msg)
586
+ case _ : NeverReply =>
592
587
}
593
588
})
594
589
@@ -601,7 +596,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
601
596
assert(" hello" === reply1)
602
597
603
598
// Ask with a delayed response and wait for response immediately that should timeout
604
- val fut2 = rpcEndpointRef.ask[String ](SleepyReply (" doh" ), shortTimeout)
599
+ val fut2 = rpcEndpointRef.ask[String ](NeverReply (" doh" ), shortTimeout)
605
600
val reply2 =
606
601
intercept[RpcTimeoutException ] {
607
602
shortTimeout.awaitResult(fut2)
@@ -611,7 +606,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
611
606
assert(reply2.contains(shortTimeout.timeoutProp))
612
607
613
608
// Ask with delayed response and allow the Future to timeout before Await.result
614
- val fut3 = rpcEndpointRef.ask[String ](SleepyReply (" goodbye" ), shortTimeout)
609
+ val fut3 = rpcEndpointRef.ask[String ](NeverReply (" goodbye" ), shortTimeout)
615
610
616
611
// Allow future to complete with failure using plain Await.result, this will return
617
612
// once the future is complete to verify addMessageIfTimeout was invoked
0 commit comments