-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-18278] [Scheduler] Support native submission of spark jobs to a kubernetes cluster #16061
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18278] [Scheduler] Support native submission of spark jobs to a kubernetes cluster #16061
Conversation
* Use images with spark pre-installed * simplify staging for client.jar * Remove some tarball-uri code. Fix kube client URI in scheduler backend. Number executors default to 1 * tweak client again, works across my testing environments * use executor.sh shim * allow configuration of service account name for driver pod * spark image as a configuration setting instead of env var * namespace from spark.kubernetes.namespace * configure client with namespace; smooths out cases when not logged in as admin * Assume a download jar to /opt/spark/kubernetes to avoid dropping protections on /opt
* Add support for dynamic executors * fill in some sane logic for doKillExecutors * doRequestTotalExecutors signals graceful executor shutdown, and favors idle executors
Test build #69335 has finished for PR 16061 at commit
|
@@ -596,6 +599,26 @@ object SparkSubmit extends CommandLineUtils { | |||
} | |||
} | |||
|
|||
if (isKubernetesCluster) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if in Kubernetes and client mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should throw an error for now if in client mode with Kubernetes. There's still open questions about how we can manage the networking there, so we can revisit the support later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey Guys, really excited about this work by the way. :)
Just wondering if client mode from within a kubernetes cluster will be supported? Not looking to add work, just curious.
I'm wondering whether we could check if the client IP is within the kubernetes container CIDR (this info should be available via the kubernetes API), instead of just blocking client mode altogether. This would support Zeppelin/Jupyter instances running within kube connecting to spark. Which is a big usecase in our case.
We could take the nodelist and check if the client IP is in any of the v1.NodeSpec.PodCIDR of any node. Although as k8s clusters get large (nodes>1000), this may become a costly operation.
This would allow for client mode support, provided the client is on one of the kube nodes.
BUILD_COMMAND=("$MVN" -T 1C clean package -DskipTests $@) | ||
# BUILD_COMMAND=("$MVN" -T 1C clean package -DskipTests $@) | ||
|
||
BUILD_COMMAND=("$MVN" -T 2C package -DskipTests $@) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be reverted.
logInfo(s"Adding $delta new executors") | ||
createExecutorPods(delta) | ||
} else if (delta < 0) { | ||
val d = -delta |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't happen, assert instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delta (as currently computed) can become negative, whenever the requested total starts to decrease.
However, this logic warrants a total overhaul, since it was designed around the assumption that doRequestTotalExecutors
is symmetric, in the sense of having to handle a decreasing total analogously to an increasing total. But responsibilities for executor shutdown vs spin-up aren't symmetric. I might file a small doc PR to clarify that point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also need to add a method to find the not-ready pods in the system from the earlier scaling event, before we decide to create a new one. We had a discussion about this here. I think relying on people to set resource requests/limits, or writing admission controllers correctly is one part of the solution, but we should also have an additional safeguard to ensure we're not flooding the system with pod creation requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had started in on keeping track of new pods until they stop saying "pending", however I think this may be a good use case for a PodWatcher
import scala.concurrent.Future | ||
|
||
private[spark] class KubernetesClusterSchedulerBackend( | ||
scheduler: TaskSchedulerImpl, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix the formatting to conform with Spark style
.withName(svcName) | ||
.create(svc) | ||
|
||
// try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove
This is pretty cool. |
One thing - can we submit a separate pr to move all resource managers into resource-managers/yarn ? |
@erikerlandson For the RAT failure, you may either add Apache license header to newly added files or add the file to |
@rxin, when you say "move all resource managers" does that mean "move scheduler back-ends for mesos, yarn, etc, into some |
Another external scheduler backend I'm aware of is Two Sigma's scheduler backend for the system they've created called Cook. See CoarseCookSchedulerBackend.scala |
* ./build/mvn -Pkubernetes -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests package | ||
* Ensure that you are pointing to a k8s cluster (kubectl config current-context), which you want to use with spark. | ||
* Launch a spark-submit job: | ||
* `./bin/spark-submit --deploy-mode cluster --class org.apache.spark.examples.SparkPi --master k8s://default --conf spark.executor.instances=5 --conf spark.kubernetes.sparkImage=manyangled/kube-spark:dynamic http://storage.googleapis.com/foxish-spark-distro/original-spark-examples_2.11-2.1.0-SNAPSHOT.jar 10000` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to prepare an official image for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There will need to be some official Apache Spark repository for images, which I presume will be up to Apache Spark to create. The exact nature of the images to be produced is still being discussed
I did create a "semi-official" org up on docker hub called "k8s4spark":
https://hub.docker.com/u/k8s4spark/dashboard/
I haven't actually pushed any images to it, but we could start using it as an interim repo if people think that is useful
|
||
# Steps to compile | ||
|
||
* Clone the fork of spark: https://github.com/foxish/spark/ and switch to the k8s-support branch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not correct now?
Cross posting link to the proposal here. |
|
||
.addNewContainer().withName("spark-executor").withImage(sparkImage) | ||
.withImagePullPolicy("IfNotPresent") | ||
.withCommand("/opt/executor.sh") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another approach that we could take is to put the command in the Dockerfile and supply the container with environment variables to configure the runtime behavior. A few benefits to this:
- Transparency: Instead of needing to inspect the shim scripts to discover the specific execution behavior, the behavior is well-defined in the Dockerfile.
- Immutability: The run command does not need to be assembled by the client code every time.
- Decoupling configuration from logic: The Docker containers are only configured with properties, as opposed to needing to specify both logic (running the shim script) and configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we can just execute the container and pass config via something along the lines of SPARK_MASTER_OPTS
(is there SPARK_DRIVER_OPTS
?), that could also implicitly handle the case of running a custom user container. It might shift documentation of conventions to which env vars to expect, but if we can use standard spark env-vars that would be idiomatic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense - essentially the Dockerfile has this line:
CMD exec ${JAVA_HOME}/bin/java -Xmx$SPARK_EXECUTOR_MEMORY org.apache.spark... --executor-id $SPARK_EXECUTOR_ID ...
etc.
And then we set these environment variables on the container.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, you don't even need to throw the -Xmx
mem flags; the driver, executor, etc are supposed to honor those (and the *_CORES
), aren't they? We've been using spark-class
to run the executor backends. And I think just spark-submit
(client mode) for the driver. Although I think it all eventually bakes down to a jvm call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark-class
wasn't built to handle starting executors so it won't pick up memory flags for them. We have to execute the Java binary directly. spark-class
I believe was built for running long-lived daemons like the history server and standalone components.
As for the driver - we can run spark-submit
for now but when we move to supporting uploading files from the submitter's local disk we're going to need an in-between process in the pod to upload the local resources to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I think I was mistaken here - spark-class
eventually calls to this class which then forks the actual process running the passed-in class. We can probably then indeed take advantage of SPARK_JAVA_OPTS
and SPARK_EXECUTOR_MEMORY
, etc. Arguments like executor-id which are passed as command line arguments and not JVM options should probably still be set as environment variables on the container though and the Dockerfile handles formatting the command line arguments sent to spark-class
.
submitArgs ++= Vector("org.apache.spark.executor.CoarseGrainedExecutorBackend", | ||
"--driver-url", s"$driverURL", | ||
"--executor-id", s"$executorNum", | ||
"--hostname", "localhost", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use the HOSTNAME environment variable here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure how that plays out in a pod context. "localhost" has been working, but its worth testing alternatives.
@@ -98,7 +98,7 @@ jersey-client-2.22.2.jar | |||
jersey-common-2.22.2.jar | |||
jersey-container-servlet-2.22.2.jar | |||
jersey-container-servlet-core-2.22.2.jar | |||
jersey-guava-2.22.2.jar | |||
jersey-guava-2.22.2.jarshaded-proto |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any particular reason why this changed? Would be curious to trace how this dependency changed.
var executorID = 0 | ||
|
||
val sparkImage = conf.get("spark.kubernetes.sparkImage") | ||
val clientJarUri = conf.get("spark.executor.jar") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this used anywhere in this class?
"--driver-url", s"$driverURL", | ||
"--executor-id", s"$executorNum", | ||
"--hostname", "localhost", | ||
"--app-id", "1", // TODO: change app-id per application and pass from driver. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can fill this in with the applicationId()
method defined in SchedulerBackend
.
} | ||
|
||
def stop(): Unit = { | ||
client.pods().inNamespace(nameSpace).withName(driverName).delete() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we do this here in cluster mode we will shut ourselves down and risk not actually finishing the stop()
method. It's also possible that the SparkContext
object and its associated components like the scheduler can be stopped, but the JVM still is doing other things afterwards. Therefore I don't think we should be deleting the driver pod here.
Since we have at least four people with interest in working on the code, I have a workflow proposal, which is that we all submit candidate updates to #16061 in the form of PRs against erikerlandson:spark-18278-k8s-native (recursive PRs against a PR!) I did this with Anirudh against his |
At the risk of redundant announcements, I am going to present this spark-on-kube topic at the Kubernetes SIG-Apps next Monday (Dec 5). It will be similar to my earlier OpenShift briefing but the focus will be on kube specifically, and I'll update it to cover all the latest developments. |
@erikerlandson are there any new commits to this branch ? Or it's at the same level as @foxish's k8s-support. In case I need to rebase.... |
I would think that it would be easier if we continue to keep PRs and issues in foxish/spark since we have the previous PRs and issues over there. It should be easy enough to pick those commits up and update this branch once we review and merge them. It doesn't matter where it lives really, except that it seems like most of it is already in one place. |
@iyanuobidele the current head of this branch should be equivalent, but it can't hurt to rebase just in case. @foxish yesterday occurred to me that it would've been sensible set up a GH organization for this little consortium, and run the PR from there. Presumably we still could. The drawback to making any change to it now is losing the dialog that's happened here already. The problem I'm interested in solving even more is ensuring that everybody who contributes to this PR gets logged somehow in the upstream commit history when this PR eventually gets merged. I don't think that would happen in the case of a typical squash-merge, although I assume it would using a rebase merge. @rxin do you have any thoughts on that? PS: I created a GH org https://github.com/apache-spark-on-k8s in the event that there is interest |
We should acknowledge people that have contributed to this in the merged commit, but I don't think it'd make a lot of sense to do a merge rather than squash. The initial Spark SQL commit was a single commit even though it was a much larger project #146 |
## What changes were proposed in this pull request? * Moves yarn and mesos scheduler backends to resource-managers/ sub-directory (in preparation for https://issues.apache.org/jira/browse/SPARK-18278) * Corresponding change in top-level pom.xml. Ref: apache#16061 (comment) ## How was this patch tested? * Manual tests /cc rxin Author: Anirudh <ramanathana@google.com> Closes apache#16092 from foxish/fix-scheduler-structure-2.
## What changes were proposed in this pull request? * Moves yarn and mesos scheduler backends to resource-managers/ sub-directory (in preparation for https://issues.apache.org/jira/browse/SPARK-18278) * Corresponding change in top-level pom.xml. Ref: apache#16061 (comment) ## How was this patch tested? * Manual tests /cc rxin Author: Anirudh <ramanathana@google.com> Closes apache#16092 from foxish/fix-scheduler-structure-2.
## What changes were proposed in this pull request? * Moves yarn and mesos scheduler backends to resource-managers/ sub-directory (in preparation for https://issues.apache.org/jira/browse/SPARK-18278) * Corresponding change in top-level pom.xml. Ref: apache#16061 (comment) ## How was this patch tested? * Manual tests /cc rxin Author: Anirudh <ramanathana@google.com> Closes apache#16092 from foxish/fix-scheduler-structure-2.
|
||
* Clone the fork of spark: https://github.com/foxish/spark/ and switch to the k8s-support branch. | ||
* Build the project | ||
* ./build/mvn -Pkubernetes -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests package |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think hadoop-2.4
is gone in master.
<parent> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-parent_2.11</artifactId> | ||
<version>2.1.0-SNAPSHOT</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2.1.0-SNAPSHOT
???
<relativePath>../pom.xml</relativePath> | ||
</parent> | ||
|
||
<artifactId>spark-kubernetes_2.11</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
${scala.binary.version}
Work on this feature has moved to https://github.com/apache-spark-on-k8s/spark. The exact diff we are working with is this: apache-spark-on-k8s#1. Feel free to provide feedback on the fork's PR. @erikerlandson can you close this PR? |
+1 I think we should close this PR to avoid confusion. |
The diff showing our progress so far has moved to apache-spark-on-k8s#200 for anyone following along. |
What changes were proposed in this pull request?
Add support for native submission of spark jobs to a kubernetes cluster, as a first-class scheduler back-end.
How was this patch tested?
Currently, testing has been mostly informal. Integration tests will be added before completion.
Notes
The initial branch on this PR is intended to serve as an "MVP" base for integrating various contributions to a "k8s-native" effort being conducted by several interested community members, including:
@foxish, @erikerlandson, @iyanuobidele
https://github.com/foxish/spark/tree/k8s-support/kubernetes
@mccheah
foxish#7
Goals for integrating features include:
Instructions
Current instructions for building at the time of this submission are here:
https://github.com/foxish/spark/tree/k8s-support/kubernetes