Skip to content

Commit bc41c5a

Browse files
committed
[SPARK-34273][CORE] Do not reregister BlockManager when SparkContext is stopped
### What changes were proposed in this pull request? This PR aims to prevent `HeartbeatReceiver` asks `Executor` to re-register blocker manager when the SparkContext is already stopped. ### Why are the changes needed? Currently, `HeartbeatReceiver` blindly asks re-registration for the new heartbeat message. However, when SparkContext is stopped, we don't need to re-register new block manager. Re-registration causes unnecessary executors' logs and and a delay on job termination. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs with the newly added test case. Closes #31373 from dongjoon-hyun/SPARK-34273. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
1 parent 78244ba commit bc41c5a

File tree

2 files changed

+23
-3
lines changed

2 files changed

+23
-3
lines changed

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,14 +128,16 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
128128

129129
// Messages received from executors
130130
case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId, executorUpdates) =>
131+
var reregisterBlockManager = !sc.isStopped
131132
if (scheduler != null) {
132133
if (executorLastSeen.contains(executorId)) {
133134
executorLastSeen(executorId) = clock.getTimeMillis()
134135
eventLoopThread.submit(new Runnable {
135136
override def run(): Unit = Utils.tryLogNonFatalError {
136137
val unknownExecutor = !scheduler.executorHeartbeatReceived(
137138
executorId, accumUpdates, blockManagerId, executorUpdates)
138-
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
139+
reregisterBlockManager &= unknownExecutor
140+
val response = HeartbeatResponse(reregisterBlockManager)
139141
context.reply(response)
140142
}
141143
})
@@ -145,14 +147,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
145147
// not log warning here. Otherwise there may be a lot of noise especially if
146148
// we explicitly remove executors (SPARK-4134).
147149
logDebug(s"Received heartbeat from unknown executor $executorId")
148-
context.reply(HeartbeatResponse(reregisterBlockManager = true))
150+
context.reply(HeartbeatResponse(reregisterBlockManager))
149151
}
150152
} else {
151153
// Because Executor will sleep several seconds before sending the first "Heartbeat", this
152154
// case rarely happens. However, if it really happens, log it and ask the executor to
153155
// register itself again.
154156
logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet")
155-
context.reply(HeartbeatResponse(reregisterBlockManager = true))
157+
context.reply(HeartbeatResponse(reregisterBlockManager))
156158
}
157159
}
158160

core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,24 @@ class HeartbeatReceiverSuite
219219
fakeSchedulerBackend.stop()
220220
}
221221

222+
test("SPARK-34273: Do not reregister BlockManager when SparkContext is stopped") {
223+
val blockManagerId = BlockManagerId(executorId1, "localhost", 12345)
224+
225+
heartbeatReceiverRef.askSync[Boolean](TaskSchedulerIsSet)
226+
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
227+
Heartbeat(executorId1, Array.empty, blockManagerId, mutable.Map.empty))
228+
assert(response.reregisterBlockManager)
229+
230+
try {
231+
sc.stopped.set(true)
232+
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
233+
Heartbeat(executorId1, Array.empty, blockManagerId, mutable.Map.empty))
234+
assert(!response.reregisterBlockManager)
235+
} finally {
236+
sc.stopped.set(false)
237+
}
238+
}
239+
222240
/** Manually send a heartbeat and return the response. */
223241
private def triggerHeartbeat(
224242
executorId: String,

0 commit comments

Comments
 (0)