File tree Expand file tree Collapse file tree 1 file changed +24
-0
lines changed
core/src/test/scala/org/apache/spark/network Expand file tree Collapse file tree 1 file changed +24
-0
lines changed Original file line number Diff line number Diff line change @@ -223,7 +223,31 @@ class ConnectionManagerSuite extends FunSuite {
223223 managerServer.stop()
224224 }
225225
226+ test(" Ack error message" ) {
227+ val conf = new SparkConf
228+ conf.set(" spark.authenticate" , " false" )
229+ val securityManager = new SecurityManager (conf)
230+ val manager = new ConnectionManager (0 , conf, securityManager)
231+ val managerServer = new ConnectionManager (0 , conf, securityManager)
232+ managerServer.onReceiveMessage((msg : Message , id : ConnectionManagerId ) => {
233+ throw new Exception
234+ })
226235
236+ val size = 10 * 1024 * 1024
237+ val buffer = ByteBuffer .allocate(size).put(Array .tabulate[Byte ](size)(x => x.toByte))
238+ buffer.flip
239+ val bufferMessage = Message .createBufferMessage(buffer)
240+
241+ val future = manager.sendMessageReliably(managerServer.id, bufferMessage)
242+
243+ val message = Await .result(future, 1 second)
244+ assert(message.isDefined)
245+ assert(message.get.hasError)
246+
247+ manager.stop()
248+ managerServer.stop()
249+
250+ }
227251
228252}
229253
You can’t perform that action at this time.
0 commit comments