Skip to content

[SPARK-29950][k8s] Blacklist deleted executors in K8S with dynamic allocation. #26586

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from

Conversation

vanzin
Copy link
Contributor

@vanzin vanzin commented Nov 19, 2019

The issue here is that when Spark is downscaling the application and deletes
a few pod requests that aren't needed anymore, it may actually race with the
K8S scheduler, who may be bringing up those executors. So they may have enough
time to connect back to the driver, register, to just be deleted soon after.
This wastes resources and causes misleading entries in the driver log.

The change (ab)uses the blacklisting mechanism to consider the deleted excess
pods as blacklisted, so that if they try to connect back, the driver will deny
it.

It also changes the executor registration slightly, since even with the above
change there were misleading logs. That was because the executor registration
message was an RPC that always succeeded (bar network issues), so the executor
would always try to send an unregistration message to the driver, which would
then log several messages about not knowing anything about the executor. The
change makes the registration RPC succeed or fail directly, instead of using
the separate failure message that would lead to this issue.

Note the last change required some changes in a standalone test suite related
to dynamic allocation, since it relied on the driver not throwing exceptions
when a duplicate executor registration happened.

Tested with existing unit tests, and with live cluster with dyn alloc on.

…location.

The issue here is that when Spark is downscaling the application and deletes
a few pod requests that aren't needed anymore, it may actually race with the
K8S scheduler, who may be bringing up those executors. So they may have enough
time to connect back to the driver, register, to just be deleted soon after.
This wastes resources and causes misleading entries in the driver log.

The change (ab)uses the blacklisting mechanism to consider the deleted excess
pods as blacklisted, so that if they try to connect back, the driver will deny
it.

It also changes the executor registration slightly, since even with the above
change there were misleading logs. That was because the executor registration
message was an RPC that always succeeded (bar network issues), so the executor
would always try to send an unregistration message to the driver, which would
then log several messages about not knowing anything about the executor. The
change makes the registration RPC succeed or fail directly, instead of using
the separate failure message that would lead to this issue.

Note the last change required some changes in a standalone test suite related
to dynamic allocation, since it relied on the driver not throwing exceptions
when a duplicate executor registration happened.

Tested with existing unit tests, and with live cluster with dyn alloc on.
@SparkQA
Copy link

SparkQA commented Nov 19, 2019

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/18899/

@SparkQA
Copy link

SparkQA commented Nov 19, 2019

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/18899/

@SparkQA
Copy link

SparkQA commented Nov 19, 2019

Test build #114038 has finished for PR 26586 at commit a14c4ab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@shaneknapp
Copy link
Contributor

test this please

@SparkQA
Copy link

SparkQA commented Dec 3, 2019

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/19565/

@SparkQA
Copy link

SparkQA commented Dec 3, 2019

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/19565/

@SparkQA
Copy link

SparkQA commented Dec 3, 2019

Test build #114743 has finished for PR 26586 at commit a14c4ab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ifilonenko
Copy link
Contributor

test this please

@SparkQA
Copy link

SparkQA commented Dec 4, 2019

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/19640/

@SparkQA
Copy link

SparkQA commented Dec 4, 2019

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/19640/

@SparkQA
Copy link

SparkQA commented Dec 4, 2019

Test build #114817 has finished for PR 26586 at commit a14c4ab.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couple small nits, I'm not a k8s expert, but overall logic makes sense

if (snapshots.nonEmpty) {
logDebug(s"Pod allocation status: $currentRunningCount running, " +
s"${currentPendingExecutors.size} pending, " +
s"${newlyCreatedExecutors.size} unacknowledged.")

val existingExecs = snapshots.last.executorPods.keySet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could use lastSnapshot instead of snapshots.last

// Executors that have been deleted by this allocator but not yet detected as deleted in
// a snapshot from the API server. This is used to deny registration from these executors
// if they happen to come up before the deletion takes effect.
@volatile private var excessExecutors = Set.empty[Long]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: perhaps deletedExecutorIds

@tgravescs
Copy link
Contributor

test this please

@SparkQA
Copy link

SparkQA commented Dec 9, 2019

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/19873/

@SparkQA
Copy link

SparkQA commented Dec 10, 2019

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/19873/

@SparkQA
Copy link

SparkQA commented Dec 10, 2019

Test build #115055 has finished for PR 26586 at commit a14c4ab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes LGTM pending Jenkins

@SparkQA
Copy link

SparkQA commented Dec 10, 2019

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/19938/

@vanzin
Copy link
Contributor Author

vanzin commented Dec 10, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Dec 10, 2019

Test build #115125 has finished for PR 26586 at commit 6c8d4cd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 10, 2019

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/19944/

@SparkQA
Copy link

SparkQA commented Dec 10, 2019

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/19944/

@SparkQA
Copy link

SparkQA commented Dec 11, 2019

Test build #115131 has finished for PR 26586 at commit 6c8d4cd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor Author

vanzin commented Dec 11, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Dec 11, 2019

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/19950/

@SparkQA
Copy link

SparkQA commented Dec 11, 2019

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/19950/

@SparkQA
Copy link

SparkQA commented Dec 11, 2019

Test build #115138 has finished for PR 26586 at commit 6c8d4cd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor Author

vanzin commented Dec 12, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Dec 13, 2019

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/20075/

@SparkQA
Copy link

SparkQA commented Dec 13, 2019

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/20075/

@SparkQA
Copy link

SparkQA commented Dec 13, 2019

Test build #115267 has finished for PR 26586 at commit 6c8d4cd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Dec 13, 2019

K8s integration failure is consistent, but it seems to be irrelevant to this PR.

- Launcher client dependencies *** FAILED ***
  The code passed to eventually never returned normally. Attempted 29 times over 2.006781855433333 minutes. 
Last failure message: Failed to create bucket spark.. (DepsTestsSuite.scala:213)
- Launcher client dependencies *** FAILED ***
  java.lang.AssertionError: assertion failed: 

@vanzin
Copy link
Contributor Author

vanzin commented Dec 13, 2019

Yeah, but I'd be more comfortable if I saw that same failure in another PR, and I don't remember seeing it. (These integration tests are also becoming pretty hard to run locally with all the external dependencies...)

@vanzin
Copy link
Contributor Author

vanzin commented Dec 13, 2019

Ok, the following is a different PR and fails the same test:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/20076/artifact/resource-managers/kubernetes/integration-tests/target/integration-tests.log

(Also seems to have run around the same time as the run that failed for this one.)

case (_, PodDeleted(_)) => false
case _ => true
}
currentSnapshot = ExecutorPodsSnapshot(nonDeleted)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could call replaceSnapshot() here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it is unrelated to this change but it is strange to me that notifySubscribers clears the snapshotsBuffer but does not sets currentSnapshot to an empty ExecutorPodsSnapshot().
So this way a notifySubscribers call followed by an updatePod could keeps some pods which with the next notifySubscribers would be notified.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaceSnapshot takes a seq, nonDeleted is a map.

As for not clearing the current snapshot, that's because snapshots are cumulative. Each update from the k8s server just adds to the previous snapshot (until a periodic full sync replaces it with replaceSnapshot).

scheduler.driverEndpoint.ask[Boolean](message)
eventually(timeout(10.seconds), interval(100.millis)) {
verify(endpointRef).send(RegisterExecutorFailed(any()))
intercept[Exception] {
Copy link
Contributor

@attilapiros attilapiros Dec 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about saving the intercepted exception into a val and checking its content with asserts?

Like:

        assert(exception.getCause.getMessage === "Executor is blacklisted: one")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I very much dislike checking error messages, they are not part of the API contract. But I can try to check for a more specific exception.

@SparkQA
Copy link

SparkQA commented Dec 19, 2019

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/20382/

@SparkQA
Copy link

SparkQA commented Dec 19, 2019

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/20382/

@SparkQA
Copy link

SparkQA commented Dec 19, 2019

Test build #115582 has finished for PR 26586 at commit e6f8814.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@ifilonenko ifilonenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

general comment, but otherwise this lgtm. would love to see this change in

// If the cluster manager gives us an executor on a blacklisted node (because it
// already started allocating those resources before we informed it of our blacklist,
// or if it ignored our blacklist), then we reject that executor immediately.
logInfo(s"Rejecting $executorId as it has been blacklisted.")
executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId"))
context.reply(true)
context.sendFailure(new IllegalStateException(s"Executor is blacklisted: $executorId"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we would rather sendFailure(_) instead of the exiting the executor with a RegisterExecutorFailed message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explained in the PR description.

@vanzin
Copy link
Contributor Author

vanzin commented Jan 16, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Jan 16, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/21635/

@SparkQA
Copy link

SparkQA commented Jan 16, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/21635/

@SparkQA
Copy link

SparkQA commented Jan 16, 2020

Test build #116864 has finished for PR 26586 at commit e6f8814.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor Author

vanzin commented Jan 16, 2020

Merging to master.

@vanzin vanzin closed this in dca8380 Jan 16, 2020
@ifilonenko
Copy link
Contributor

wasn't there a failure in the k8s tests? was this not a bi-product of this PR?

@vanzin
Copy link
Contributor Author

vanzin commented Jan 18, 2020

That's the same "Launcher client dependencies" test that seems super flaky, and has failed with the same error in other PRs before this one was merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants