1818package org .apache .spark .deploy .master
1919
2020import java .util .Date
21- import java .util .concurrent .ConcurrentLinkedQueue
21+ import java .util .concurrent .{ ConcurrentLinkedQueue , CountDownLatch , TimeUnit }
2222import java .util .concurrent .atomic .AtomicInteger
2323
2424import scala .collection .JavaConverters ._
@@ -97,15 +97,29 @@ class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extend
9797 }
9898}
9999
100- class MockExecutorLaunchFailWorker (master : RpcEndpointRef , conf : SparkConf = new SparkConf )
101- extends MockWorker (master, conf) {
100+ class MockExecutorLaunchFailWorker (master : Master , conf : SparkConf = new SparkConf )
101+ extends MockWorker (master.self, conf) {
102+
103+ val launchExecutorReceived = new CountDownLatch (1 )
102104 var failedCnt = 0
103105 override def receive : PartialFunction [Any , Unit ] = {
104106 case LaunchExecutor (_, appId, execId, _, _, _, _) =>
107+ if (failedCnt == 0 ) {
108+ launchExecutorReceived.countDown()
109+ }
110+ verifyMasterState(appId)
105111 failedCnt += 1
106- master.send(ExecutorStateChanged (appId, execId, ExecutorState .FAILED , None , None ))
112+ master.self.send(ExecutorStateChanged (appId, execId, ExecutorState .FAILED , None , None ))
113+
107114 case otherMsg => super .receive(otherMsg)
108115 }
116+
117+ private def verifyMasterState (appId : String ): Unit = {
118+ // The app would be registered with Master once Driver set up.
119+ // We verify the state of Master here to avoid timing issue, as it guarantees the verification
120+ // will run before Master changes the status of app to fail.
121+ assert(master.idToApp.contains(appId))
122+ }
109123}
110124
111125class MasterSuite extends SparkFunSuite
@@ -546,7 +560,6 @@ class MasterSuite extends SparkFunSuite
546560 PrivateMethod [Array [Int ]](Symbol (" scheduleExecutorsOnWorkers" ))
547561 private val _drivers = PrivateMethod [HashSet [DriverInfo ]](Symbol (" drivers" ))
548562 private val _state = PrivateMethod [RecoveryState .Value ](Symbol (" state" ))
549- private val _completedApps = PrivateMethod [ArrayBuffer [ApplicationInfo ]](Symbol (" completedApps" ))
550563
551564 private val workerInfo = makeWorkerInfo(4096 , 10 )
552565 private val workerInfos = Array (workerInfo, workerInfo, workerInfo)
@@ -663,7 +676,7 @@ class MasterSuite extends SparkFunSuite
663676 val master = makeAliveMaster()
664677 var worker : MockExecutorLaunchFailWorker = null
665678 try {
666- worker = new MockExecutorLaunchFailWorker (master.self )
679+ worker = new MockExecutorLaunchFailWorker (master)
667680 worker.rpcEnv.setupEndpoint(" worker" , worker)
668681 val workerRegMsg = RegisterWorker (
669682 worker.id,
@@ -678,19 +691,11 @@ class MasterSuite extends SparkFunSuite
678691 val driver = DeployTestUtils .createDriverDesc()
679692 // mimic DriverClient to send RequestSubmitDriver to master
680693 master.self.askSync[SubmitDriverResponse ](RequestSubmitDriver (driver))
681- var appId : String = null
682- eventually(timeout(10 .seconds)) {
683- // an app would be registered with Master once Driver set up
684- assert(worker.apps.nonEmpty)
685- appId = worker.apps.head._1
686-
687- // we found the case where the test was too fast which all steps were done within
688- // an interval - in this case, we have to check either app is available in master
689- // or marked as completed. See SPARK-30348 for details.
690- val completedApps = master.invokePrivate(_completedApps())
691- assert(master.idToApp.contains(appId) || completedApps.exists(_.id == appId))
692- }
693694
695+ // LaunchExecutor message should have been received in worker side
696+ assert(worker.launchExecutorReceived.await(10 , TimeUnit .SECONDS ))
697+
698+ val appId : String = worker.apps.head._1
694699 eventually(timeout(10 .seconds)) {
695700 // Master would continually launch executors until reach MAX_EXECUTOR_RETRIES
696701 assert(worker.failedCnt == master.conf.get(MAX_EXECUTOR_RETRIES ))
0 commit comments