Skip to content

Commit 95ae4db

Browse files
committed
[SPARK-4006] In long running contexts, we encountered the situation of double registe...
...r without a remove in between. The cause for that is unknown, and assumed a temp network issue. However, since the second register is with a BlockManagerId on a different port, blockManagerInfo.contains() returns false, while blockManagerIdByExecutor returns Some. This inconsistency is caught in a conditional statement that does System.exit(1), which is a huge robustness issue for us. The fix - simply remove the old id from both maps during register when this happens. We are mimicking the behavior of expireDeadHosts(), by doing local cleanup of the maps before trying to add new ones. Also - added some logging for register and unregister. This is just like #2854 except it's on master Author: Tal Sliwowicz <tal.s@taboola.com> Closes #2886 from tsliwowicz/master-block-mgr-removal and squashes the following commits: 094d508 [Tal Sliwowicz] some more white space change undone 41a2217 [Tal Sliwowicz] some more whitspaces change undone 7bcfc3d [Tal Sliwowicz] whitspaces fix df9d98f [Tal Sliwowicz] Code review comments fixed f48bce9 [Tal Sliwowicz] In long running contexts, we encountered the situation of double register without a remove in between. The cause for that is unknown, and assumed a temp network issue. (cherry picked from commit 6b48522) Conflicts: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
1 parent 81d69f0 commit 95ae4db

File tree

1 file changed

+11
-20
lines changed

1 file changed

+11
-20
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -145,17 +145,12 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
145145

146146
private def removeBlockManager(blockManagerId: BlockManagerId) {
147147
val info = blockManagerInfo(blockManagerId)
148-
148+
149149
// Remove the block manager from blockManagerIdByExecutor.
150150
blockManagerIdByExecutor -= blockManagerId.executorId
151-
152-
logInfo("removed executorId %s from blockManagerIdByExecutor".format(blockManagerId.executorId))
153151

154152
// Remove it from blockManagerInfo and remove all the blocks.
155153
blockManagerInfo.remove(blockManagerId)
156-
157-
logInfo("removed blockManagerId %s from blockManagerInfo".format(blockManagerId))
158-
159154
val iterator = info.blocks.keySet.iterator
160155
while (iterator.hasNext) {
161156
val blockId = iterator.next
@@ -165,8 +160,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
165160
blockLocations.remove(locations)
166161
}
167162
}
168-
169-
logInfo("done with remove "+blockManagerId)
163+
logInfo(s"Removing block manager $blockManagerId")
170164
}
171165

172166
private def expireDeadHosts() {
@@ -187,7 +181,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
187181
private def removeExecutor(execId: String) {
188182
logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
189183
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
190-
logInfo("removed executor " + execId + " from BlockManagerMaster.")
191184
}
192185

193186
private def heartBeat(blockManagerId: BlockManagerId): Boolean = {
@@ -231,24 +224,22 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
231224
}
232225

233226
private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
234-
logInfo("Registering block manager %s with %s RAM, %s".format(id.hostPort, Utils.bytesToString(maxMemSize), id))
235-
236227
if (!blockManagerInfo.contains(id)) {
237228
blockManagerIdByExecutor.get(id.executorId) match {
238-
case Some(manager) =>
239-
// A block manager of the same executor already exists so remove it (assumed dead).
240-
logError("Got two different block manager registrations on same executor - will remove, new Id " + id+", orig id - "+manager)
241-
removeExecutor(id.executorId)
229+
case Some(oldId) =>
230+
// A block manager of the same executor already exists, so remove it (assumed dead)
231+
logError("Got two different block manager registrations on same executor - "
232+
+ s" will replace old one $oldId with new one $id")
233+
removeExecutor(id.executorId)
242234
case None =>
243-
logInfo("about to register new id "+id)
244235
}
236+
logInfo("Registering block manager %s with %s RAM, %s".format(
237+
id.hostPort, Utils.bytesToString(maxMemSize), id))
245238

246239
blockManagerIdByExecutor(id.executorId) = id
247-
logInfo("Added %s to blockManagerIdByExecutor".format(id.executorId))
248240

249-
val info = new BlockManagerMasterActor.BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor)
250-
blockManagerInfo(id) = info
251-
logInfo("Added %s, %s to blockManagerInfo".format(id, info))
241+
blockManagerInfo(id) = new BlockManagerMasterActor.BlockManagerInfo(
242+
id, System.currentTimeMillis(), maxMemSize, slaveActor)
252243
}
253244
}
254245

0 commit comments

Comments
 (0)