-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-29950][k8s] Blacklist deleted executors in K8S with dynamic allocation. #26586
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…location. The issue here is that when Spark is downscaling the application and deletes a few pod requests that aren't needed anymore, it may actually race with the K8S scheduler, who may be bringing up those executors. So they may have enough time to connect back to the driver, register, to just be deleted soon after. This wastes resources and causes misleading entries in the driver log. The change (ab)uses the blacklisting mechanism to consider the deleted excess pods as blacklisted, so that if they try to connect back, the driver will deny it. It also changes the executor registration slightly, since even with the above change there were misleading logs. That was because the executor registration message was an RPC that always succeeded (bar network issues), so the executor would always try to send an unregistration message to the driver, which would then log several messages about not knowing anything about the executor. The change makes the registration RPC succeed or fail directly, instead of using the separate failure message that would lead to this issue. Note the last change required some changes in a standalone test suite related to dynamic allocation, since it relied on the driver not throwing exceptions when a duplicate executor registration happened. Tested with existing unit tests, and with live cluster with dyn alloc on.
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #114038 has finished for PR 26586 at commit
|
test this please |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #114743 has finished for PR 26586 at commit
|
test this please |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #114817 has finished for PR 26586 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couple small nits, I'm not a k8s expert, but overall logic makes sense
if (snapshots.nonEmpty) { | ||
logDebug(s"Pod allocation status: $currentRunningCount running, " + | ||
s"${currentPendingExecutors.size} pending, " + | ||
s"${newlyCreatedExecutors.size} unacknowledged.") | ||
|
||
val existingExecs = snapshots.last.executorPods.keySet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you could use lastSnapshot instead of snapshots.last
// Executors that have been deleted by this allocator but not yet detected as deleted in | ||
// a snapshot from the API server. This is used to deny registration from these executors | ||
// if they happen to come up before the deletion takes effect. | ||
@volatile private var excessExecutors = Set.empty[Long] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: perhaps deletedExecutorIds
test this please |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #115055 has finished for PR 26586 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changes LGTM pending Jenkins
Kubernetes integration test starting |
retest this please |
Test build #115125 has finished for PR 26586 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #115131 has finished for PR 26586 at commit
|
retest this please |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #115138 has finished for PR 26586 at commit
|
retest this please |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #115267 has finished for PR 26586 at commit
|
K8s integration failure is consistent, but it seems to be irrelevant to this PR.
|
Yeah, but I'd be more comfortable if I saw that same failure in another PR, and I don't remember seeing it. (These integration tests are also becoming pretty hard to run locally with all the external dependencies...) |
Ok, the following is a different PR and fails the same test: (Also seems to have run around the same time as the run that failed for this one.) |
case (_, PodDeleted(_)) => false | ||
case _ => true | ||
} | ||
currentSnapshot = ExecutorPodsSnapshot(nonDeleted) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could call replaceSnapshot()
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it is unrelated to this change but it is strange to me that notifySubscribers
clears the snapshotsBuffer
but does not sets currentSnapshot
to an empty ExecutorPodsSnapshot()
.
So this way a notifySubscribers
call followed by an updatePod
could keeps some pods which with the next notifySubscribers
would be notified.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replaceSnapshot
takes a seq, nonDeleted
is a map.
As for not clearing the current snapshot, that's because snapshots are cumulative. Each update from the k8s server just adds to the previous snapshot (until a periodic full sync replaces it with replaceSnapshot).
scheduler.driverEndpoint.ask[Boolean](message) | ||
eventually(timeout(10.seconds), interval(100.millis)) { | ||
verify(endpointRef).send(RegisterExecutorFailed(any())) | ||
intercept[Exception] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about saving the intercepted exception into a val
and checking its content with asserts?
Like:
assert(exception.getCause.getMessage === "Executor is blacklisted: one")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I very much dislike checking error messages, they are not part of the API contract. But I can try to check for a more specific exception.
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #115582 has finished for PR 26586 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
general comment, but otherwise this lgtm. would love to see this change in
// If the cluster manager gives us an executor on a blacklisted node (because it | ||
// already started allocating those resources before we informed it of our blacklist, | ||
// or if it ignored our blacklist), then we reject that executor immediately. | ||
logInfo(s"Rejecting $executorId as it has been blacklisted.") | ||
executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId")) | ||
context.reply(true) | ||
context.sendFailure(new IllegalStateException(s"Executor is blacklisted: $executorId")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason we would rather sendFailure(_)
instead of the exiting the executor with a RegisterExecutorFailed
message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explained in the PR description.
retest this please |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #116864 has finished for PR 26586 at commit
|
Merging to master. |
wasn't there a failure in the k8s tests? was this not a bi-product of this PR? |
That's the same "Launcher client dependencies" test that seems super flaky, and has failed with the same error in other PRs before this one was merged. |
The issue here is that when Spark is downscaling the application and deletes
a few pod requests that aren't needed anymore, it may actually race with the
K8S scheduler, who may be bringing up those executors. So they may have enough
time to connect back to the driver, register, to just be deleted soon after.
This wastes resources and causes misleading entries in the driver log.
The change (ab)uses the blacklisting mechanism to consider the deleted excess
pods as blacklisted, so that if they try to connect back, the driver will deny
it.
It also changes the executor registration slightly, since even with the above
change there were misleading logs. That was because the executor registration
message was an RPC that always succeeded (bar network issues), so the executor
would always try to send an unregistration message to the driver, which would
then log several messages about not knowing anything about the executor. The
change makes the registration RPC succeed or fail directly, instead of using
the separate failure message that would lead to this issue.
Note the last change required some changes in a standalone test suite related
to dynamic allocation, since it relied on the driver not throwing exceptions
when a duplicate executor registration happened.
Tested with existing unit tests, and with live cluster with dyn alloc on.