Skip to content

Commit b5b34af

Browse files
committed
[SPARK-3736] Workers reconnect when disassociated from the master.
Before, if the master node is killed and restarted, the worker nodes would not attempt to reconnect to the Master. Therefore, when the Master node was restarted, the worker nodes needed to be restarted as well. Now, when the Master node is disconnected, the worker nodes will continuously ping the master node in attempts to reconnect to it. Once the master node restarts, it will detect one of the registration requests from its former workers. The result is that the cluster re-enters a healthy state. In addition, when the master does not receive a heartbeat from the worker, the worker was removed; however, when the worker sent a heartbeat to the master, the master used to ignore the heartbeat. Now, a master that receives a heartbeat from a worker that had been disconnected will request the worker to re-attempt the registration process, at which point the worker will send a RegisterWorker request and be re-connected accordingly. Re-connection attempts per worker are submitted every N seconds, where N is configured by the property spark.worker.reconnect.interval - this has a default of 60 seconds right now.
1 parent 293a0b5 commit b5b34af

File tree

3 files changed

+27
-1
lines changed

3 files changed

+27
-1
lines changed

core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ private[deploy] object DeployMessages {
7171

7272
case class RegisterWorkerFailed(message: String) extends DeployMessage
7373

74+
case class ReconnectWorker(masterUrl: String) extends DeployMessage
75+
7476
case class KillExecutor(masterUrl: String, appId: String, execId: Int) extends DeployMessage
7577

7678
case class LaunchExecutor(

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,11 @@ private[spark] class Master(
341341
case Some(workerInfo) =>
342342
workerInfo.lastHeartbeat = System.currentTimeMillis()
343343
case None =>
344-
logWarning("Got heartbeat from unregistered worker " + workerId)
344+
if (workers.map(_.id).contains(workerId)) {
345+
logWarning(s"Got heartbeat from unregistered worker $workerId." +
346+
" Asking it to re-register.")
347+
sender ! ReconnectWorker(masterUrl)
348+
}
345349
}
346350
}
347351

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ private[spark] class Worker(
6767
val REGISTRATION_TIMEOUT = 20.seconds
6868
val REGISTRATION_RETRIES = 3
6969

70+
val RECONNECT_ATTEMPT_INTERVAL_MILLIS = conf.getLong("spark.worker.reconnect.interval", 60) * 1000
71+
7072
val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false)
7173
// How often worker will clean up old app folders
7274
val CLEANUP_INTERVAL_MILLIS = conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
@@ -94,6 +96,7 @@ private[spark] class Worker(
9496
val finishedExecutors = new HashMap[String, ExecutorRunner]
9597
val drivers = new HashMap[String, DriverRunner]
9698
val finishedDrivers = new HashMap[String, DriverRunner]
99+
var scheduledReconnectMessage: Option[Cancellable] = None
97100

98101
val publicAddress = {
99102
val envVar = System.getenv("SPARK_PUBLIC_DNS")
@@ -197,6 +200,8 @@ private[spark] class Worker(
197200
context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
198201
CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup)
199202
}
203+
scheduledReconnectMessage.foreach(_.cancel())
204+
scheduledReconnectMessage = None
200205

201206
case SendHeartbeat =>
202207
if (connected) { master ! Heartbeat(workerId) }
@@ -243,6 +248,10 @@ private[spark] class Worker(
243248
System.exit(1)
244249
}
245250

251+
case ReconnectWorker(masterUrl) =>
252+
logWarning(s"Master with url $masterUrl requested this worker to reconnect.")
253+
scheduleAttemptsToReconnectToMaster()
254+
246255
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
247256
if (masterUrl != activeMasterUrl) {
248257
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
@@ -365,6 +374,16 @@ private[spark] class Worker(
365374
def masterDisconnected() {
366375
logError("Connection to master failed! Waiting for master to reconnect...")
367376
connected = false
377+
scheduleAttemptsToReconnectToMaster()
378+
}
379+
380+
def scheduleAttemptsToReconnectToMaster() {
381+
if (!scheduledReconnectMessage.isDefined) {
382+
scheduledReconnectMessage = Some(context.system.scheduler.schedule(
383+
Duration Zero, RECONNECT_ATTEMPT_INTERVAL_MILLIS millis) {
384+
tryRegisterAllMasters()
385+
})
386+
}
368387
}
369388

370389
def generateWorkerId(): String = {
@@ -374,6 +393,7 @@ private[spark] class Worker(
374393
override def postStop() {
375394
metricsSystem.report()
376395
registrationRetryTimer.foreach(_.cancel())
396+
scheduledReconnectMessage.foreach(_.cancel())
377397
executors.values.foreach(_.kill())
378398
drivers.values.foreach(_.kill())
379399
webUi.stop()

0 commit comments

Comments
 (0)