Skip to content

[SPARK-34273][CORE] Do not reregister BlockManager when SparkContext is stopped #31373

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,16 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)

// Messages received from executors
case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId, executorUpdates) =>
var reregisterBlockManager = !sc.isStopped
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like a race condition that happens during SparkContext shutdown. So it's also possible that the SparkContext is stopped right after we sent the HeartbeatResponse. In that case, IIUC, the issue will still exist.

Does the current behavior cause any real issue?

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jan 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank. you for review, @Ngone51 . That's true. In the production environment, I hit those intermediate status. And, this will help us simplify the situation.

  1. The case you mentioned, Send HeartbeatResponse and sc.stop invoked, is a normal situation. The users don't complain about this.
  2. The case in this PR, sc.stop invoked and Spark works inefficiently by sending HeartbeatResponse(true) is a problem. The users complain about this.

For the following, yes. We are After the apps sc.stop takes a longer time than we expect.

Does the current behavior cause any real issue?

if (scheduler != null) {
if (executorLastSeen.contains(executorId)) {
executorLastSeen(executorId) = clock.getTimeMillis()
eventLoopThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, accumUpdates, blockManagerId, executorUpdates)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
reregisterBlockManager &= unknownExecutor
val response = HeartbeatResponse(reregisterBlockManager)
context.reply(response)
}
})
Expand All @@ -145,14 +147,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
// not log warning here. Otherwise there may be a lot of noise especially if
// we explicitly remove executors (SPARK-4134).
logDebug(s"Received heartbeat from unknown executor $executorId")
context.reply(HeartbeatResponse(reregisterBlockManager = true))
context.reply(HeartbeatResponse(reregisterBlockManager))
}
} else {
// Because Executor will sleep several seconds before sending the first "Heartbeat", this
// case rarely happens. However, if it really happens, log it and ask the executor to
// register itself again.
logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet")
context.reply(HeartbeatResponse(reregisterBlockManager = true))
context.reply(HeartbeatResponse(reregisterBlockManager))
}
}

Expand Down
18 changes: 18 additions & 0 deletions core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,24 @@ class HeartbeatReceiverSuite
fakeSchedulerBackend.stop()
}

test("SPARK-34273: Do not reregister BlockManager when SparkContext is stopped") {
val blockManagerId = BlockManagerId(executorId1, "localhost", 12345)

heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
Heartbeat(executorId1, Array.empty, blockManagerId, mutable.Map.empty))
assert(response.reregisterBlockManager)

try {
sc.stopped.set(true)
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
Heartbeat(executorId1, Array.empty, blockManagerId, mutable.Map.empty))
assert(!response.reregisterBlockManager)
} finally {
sc.stopped.set(false)
}
}

/** Manually send a heartbeat and return the response. */
private def triggerHeartbeat(
executorId: String,
Expand Down