Skip to content

Commit f48bce9

Browse files
committed
In long running contexts, we encountered the situation of double register without a remove in between. The cause for that is unknown, and assumed a temp network issue.
However, since the second register is with a BlockManagerId on a different port, blockManagerInfo.contains() returns false, while blockManagerIdByExecutor returns Some. This inconsistency is caught in a conditional statement that does System.exit(1), which is a huge robustness issue for us. The fix - simply remove the old id from both maps during register when this happens. We are mimicking the behavior of expireDeadHosts(), by doing local cleanup of the maps before trying to add new ones. Also - added some logging for register and unregister.
1 parent d1966f3 commit f48bce9

File tree

1 file changed

+17
-16
lines changed

1 file changed

+17
-16
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,19 @@
1818
package org.apache.spark.storage
1919

2020
import java.util.{HashMap => JHashMap}
21-
2221
import scala.collection.mutable
2322
import scala.collection.JavaConversions._
2423
import scala.concurrent.Future
2524
import scala.concurrent.duration._
26-
2725
import akka.actor.{Actor, ActorRef, Cancellable}
2826
import akka.pattern.ask
29-
3027
import org.apache.spark.{Logging, SparkConf, SparkException}
3128
import org.apache.spark.annotation.DeveloperApi
3229
import org.apache.spark.scheduler._
3330
import org.apache.spark.storage.BlockManagerMessages._
3431
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils}
3532

33+
3634
/**
3735
* BlockManagerMasterActor is an actor on the master node to track statuses of
3836
* all slaves' block managers.
@@ -187,12 +185,13 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
187185

188186
private def removeBlockManager(blockManagerId: BlockManagerId) {
189187
val info = blockManagerInfo(blockManagerId)
190-
188+
191189
// Remove the block manager from blockManagerIdByExecutor.
192190
blockManagerIdByExecutor -= blockManagerId.executorId
193-
191+
194192
// Remove it from blockManagerInfo and remove all the blocks.
195193
blockManagerInfo.remove(blockManagerId)
194+
196195
val iterator = info.blocks.keySet.iterator
197196
while (iterator.hasNext) {
198197
val blockId = iterator.next
@@ -203,6 +202,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
203202
}
204203
}
205204
listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId))
205+
logInfo("removed " + blockManagerId)
206206
}
207207

208208
private def expireDeadHosts() {
@@ -325,22 +325,23 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
325325

326326
private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
327327
val time = System.currentTimeMillis()
328+
328329
if (!blockManagerInfo.contains(id)) {
329330
blockManagerIdByExecutor.get(id.executorId) match {
330331
case Some(manager) =>
331-
// A block manager of the same executor already exists.
332-
// This should never happen. Let's just quit.
333-
logError("Got two different block manager registrations on " + id.executorId)
334-
System.exit(1)
332+
// A block manager of the same executor already exists so remove it (assumed dead).
333+
logError("Got two different block manager registrations on same executor - "
334+
+ " will remove, new Id " + id + ", orig id - " + manager)
335+
removeExecutor(id.executorId)
335336
case None =>
336-
blockManagerIdByExecutor(id.executorId) = id
337337
}
338-
339-
logInfo("Registering block manager %s with %s RAM".format(
340-
id.hostPort, Utils.bytesToString(maxMemSize)))
341-
342-
blockManagerInfo(id) =
343-
new BlockManagerInfo(id, time, maxMemSize, slaveActor)
338+
logInfo("Registering block manager %s with %s RAM, %s".format(
339+
id.hostPort, Utils.bytesToString(maxMemSize), id))
340+
341+
blockManagerIdByExecutor(id.executorId) = id
342+
343+
blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(),
344+
maxMemSize, slaveActor)
344345
}
345346
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
346347
}

0 commit comments

Comments
 (0)