Skip to content

[SPARK-18278] [Scheduler] Support native submission of spark jobs to a kubernetes cluster #16061

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

erikerlandson
Copy link
Contributor

What changes were proposed in this pull request?

Add support for native submission of spark jobs to a kubernetes cluster, as a first-class scheduler back-end.

How was this patch tested?

Currently, testing has been mostly informal. Integration tests will be added before completion.

Notes

The initial branch on this PR is intended to serve as an "MVP" base for integrating various contributions to a "k8s-native" effort being conducted by several interested community members, including:

@foxish, @erikerlandson, @iyanuobidele
https://github.com/foxish/spark/tree/k8s-support/kubernetes

@mccheah
foxish#7

Goals for integrating features include:

  • build of docker image (or images) for spark
  • integration tests
  • publishing of third-party resources
  • resource constraints
  • external shuffle service via daemon-sets
  • possible hooks for control of executor scale by kubernetes control plane

Instructions

Current instructions for building at the time of this submission are here:
https://github.com/foxish/spark/tree/k8s-support/kubernetes

foxish and others added 14 commits November 29, 2016 08:46
* Use images with spark pre-installed

* simplify staging for client.jar

* Remove some tarball-uri code.  Fix kube client URI in scheduler backend.  Number executors default to 1

* tweak client again, works across my testing environments

* use executor.sh shim

* allow configuration of service account name for driver pod

* spark image as a configuration setting instead of env var

* namespace from spark.kubernetes.namespace

* configure client with namespace; smooths out cases when not logged in as admin

* Assume a download jar to /opt/spark/kubernetes to avoid dropping protections on /opt
* Add support for dynamic executors

* fill in some sane logic for doKillExecutors

* doRequestTotalExecutors signals graceful executor shutdown, and favors idle executors
@SparkQA
Copy link

SparkQA commented Nov 29, 2016

Test build #69335 has finished for PR 16061 at commit 8584913.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -596,6 +599,26 @@ object SparkSubmit extends CommandLineUtils {
}
}

if (isKubernetesCluster) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if in Kubernetes and client mode?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should throw an error for now if in client mode with Kubernetes. There's still open questions about how we can manage the networking there, so we can revisit the support later.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Guys, really excited about this work by the way. :)

Just wondering if client mode from within a kubernetes cluster will be supported? Not looking to add work, just curious.

I'm wondering whether we could check if the client IP is within the kubernetes container CIDR (this info should be available via the kubernetes API), instead of just blocking client mode altogether. This would support Zeppelin/Jupyter instances running within kube connecting to spark. Which is a big usecase in our case.

We could take the nodelist and check if the client IP is in any of the v1.NodeSpec.PodCIDR of any node. Although as k8s clusters get large (nodes>1000), this may become a costly operation.

This would allow for client mode support, provided the client is on one of the kube nodes.

BUILD_COMMAND=("$MVN" -T 1C clean package -DskipTests $@)
# BUILD_COMMAND=("$MVN" -T 1C clean package -DskipTests $@)

BUILD_COMMAND=("$MVN" -T 2C package -DskipTests $@)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be reverted.

logInfo(s"Adding $delta new executors")
createExecutorPods(delta)
} else if (delta < 0) {
val d = -delta
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't happen, assert instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delta (as currently computed) can become negative, whenever the requested total starts to decrease.

However, this logic warrants a total overhaul, since it was designed around the assumption that doRequestTotalExecutors is symmetric, in the sense of having to handle a decreasing total analogously to an increasing total. But responsibilities for executor shutdown vs spin-up aren't symmetric. I might file a small doc PR to clarify that point.

Copy link
Contributor

@foxish foxish Nov 30, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to add a method to find the not-ready pods in the system from the earlier scaling event, before we decide to create a new one. We had a discussion about this here. I think relying on people to set resource requests/limits, or writing admission controllers correctly is one part of the solution, but we should also have an additional safeguard to ensure we're not flooding the system with pod creation requests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had started in on keeping track of new pods until they stop saying "pending", however I think this may be a good use case for a PodWatcher

import scala.concurrent.Future

private[spark] class KubernetesClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix the formatting to conform with Spark style

.withName(svcName)
.create(svc)

// try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

@rxin
Copy link
Contributor

rxin commented Nov 29, 2016

This is pretty cool.

@rxin
Copy link
Contributor

rxin commented Nov 29, 2016

One thing - can we submit a separate pr to move all resource managers into

resource-managers/yarn
resource-managers/mesos

?

@tnachen
Copy link
Contributor

tnachen commented Nov 29, 2016

@rxin Makes sense, @srowen also talked about starting the discussion of having a better support for external cluster managers as well.

@liancheng
Copy link
Contributor

@erikerlandson For the RAT failure, you may either add Apache license header to newly added files or add the file to dev/.rat-excludes.

@erikerlandson
Copy link
Contributor Author

@rxin, when you say "move all resource managers" does that mean "move scheduler back-ends for mesos, yarn, etc, into some resource-managers sub-project" ?

@ash211
Copy link
Contributor

ash211 commented Nov 30, 2016

Another external scheduler backend I'm aware of is Two Sigma's scheduler backend for the system they've created called Cook. See CoarseCookSchedulerBackend.scala

* ./build/mvn -Pkubernetes -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests package
* Ensure that you are pointing to a k8s cluster (kubectl config current-context), which you want to use with spark.
* Launch a spark-submit job:
* `./bin/spark-submit --deploy-mode cluster --class org.apache.spark.examples.SparkPi --master k8s://default --conf spark.executor.instances=5 --conf spark.kubernetes.sparkImage=manyangled/kube-spark:dynamic http://storage.googleapis.com/foxish-spark-distro/original-spark-examples_2.11-2.1.0-SNAPSHOT.jar 10000`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to prepare an official image for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will need to be some official Apache Spark repository for images, which I presume will be up to Apache Spark to create. The exact nature of the images to be produced is still being discussed

I did create a "semi-official" org up on docker hub called "k8s4spark":
https://hub.docker.com/u/k8s4spark/dashboard/

I haven't actually pushed any images to it, but we could start using it as an interim repo if people think that is useful


# Steps to compile

* Clone the fork of spark: https://github.com/foxish/spark/ and switch to the k8s-support branch.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not correct now?

@foxish
Copy link
Contributor

foxish commented Nov 30, 2016

Cross posting link to the proposal here.


.addNewContainer().withName("spark-executor").withImage(sparkImage)
.withImagePullPolicy("IfNotPresent")
.withCommand("/opt/executor.sh")
Copy link
Contributor

@mccheah mccheah Nov 30, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another approach that we could take is to put the command in the Dockerfile and supply the container with environment variables to configure the runtime behavior. A few benefits to this:

  • Transparency: Instead of needing to inspect the shim scripts to discover the specific execution behavior, the behavior is well-defined in the Dockerfile.
  • Immutability: The run command does not need to be assembled by the client code every time.
  • Decoupling configuration from logic: The Docker containers are only configured with properties, as opposed to needing to specify both logic (running the shim script) and configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we can just execute the container and pass config via something along the lines of SPARK_MASTER_OPTS (is there SPARK_DRIVER_OPTS ?), that could also implicitly handle the case of running a custom user container. It might shift documentation of conventions to which env vars to expect, but if we can use standard spark env-vars that would be idiomatic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense - essentially the Dockerfile has this line:

CMD exec ${JAVA_HOME}/bin/java -Xmx$SPARK_EXECUTOR_MEMORY org.apache.spark... --executor-id $SPARK_EXECUTOR_ID ... etc.

And then we set these environment variables on the container.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, you don't even need to throw the -Xmx mem flags; the driver, executor, etc are supposed to honor those (and the *_CORES), aren't they? We've been using spark-class to run the executor backends. And I think just spark-submit (client mode) for the driver. Although I think it all eventually bakes down to a jvm call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark-class wasn't built to handle starting executors so it won't pick up memory flags for them. We have to execute the Java binary directly. spark-class I believe was built for running long-lived daemons like the history server and standalone components.

As for the driver - we can run spark-submit for now but when we move to supporting uploading files from the submitter's local disk we're going to need an in-between process in the pod to upload the local resources to it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think I was mistaken here - spark-class eventually calls to this class which then forks the actual process running the passed-in class. We can probably then indeed take advantage of SPARK_JAVA_OPTS and SPARK_EXECUTOR_MEMORY, etc. Arguments like executor-id which are passed as command line arguments and not JVM options should probably still be set as environment variables on the container though and the Dockerfile handles formatting the command line arguments sent to spark-class.

submitArgs ++= Vector("org.apache.spark.executor.CoarseGrainedExecutorBackend",
"--driver-url", s"$driverURL",
"--executor-id", s"$executorNum",
"--hostname", "localhost",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use the HOSTNAME environment variable here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how that plays out in a pod context. "localhost" has been working, but its worth testing alternatives.

@@ -98,7 +98,7 @@ jersey-client-2.22.2.jar
jersey-common-2.22.2.jar
jersey-container-servlet-2.22.2.jar
jersey-container-servlet-core-2.22.2.jar
jersey-guava-2.22.2.jar
jersey-guava-2.22.2.jarshaded-proto
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason why this changed? Would be curious to trace how this dependency changed.

var executorID = 0

val sparkImage = conf.get("spark.kubernetes.sparkImage")
val clientJarUri = conf.get("spark.executor.jar")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used anywhere in this class?

"--driver-url", s"$driverURL",
"--executor-id", s"$executorNum",
"--hostname", "localhost",
"--app-id", "1", // TODO: change app-id per application and pass from driver.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can fill this in with the applicationId() method defined in SchedulerBackend.

}

def stop(): Unit = {
client.pods().inNamespace(nameSpace).withName(driverName).delete()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we do this here in cluster mode we will shut ourselves down and risk not actually finishing the stop() method. It's also possible that the SparkContext object and its associated components like the scheduler can be stopped, but the JVM still is doing other things afterwards. Therefore I don't think we should be deleting the driver pod here.

@erikerlandson
Copy link
Contributor Author

Since we have at least four people with interest in working on the code, I have a workflow proposal, which is that we all submit candidate updates to #16061 in the form of PRs against erikerlandson:spark-18278-k8s-native (recursive PRs against a PR!)

I did this with Anirudh against his k8s-support branch, and it seemed to work fine. It should help keep asynchronous contributions sane, and provides the usual durable public forum for discussing the updates as they evolve.

@erikerlandson
Copy link
Contributor Author

At the risk of redundant announcements, I am going to present this spark-on-kube topic at the Kubernetes SIG-Apps next Monday (Dec 5). It will be similar to my earlier OpenShift briefing but the focus will be on kube specifically, and I'll update it to cover all the latest developments.

@iyanuobidele
Copy link

@erikerlandson are there any new commits to this branch ? Or it's at the same level as @foxish's k8s-support. In case I need to rebase....

@foxish
Copy link
Contributor

foxish commented Dec 2, 2016

I would think that it would be easier if we continue to keep PRs and issues in foxish/spark since we have the previous PRs and issues over there. It should be easy enough to pick those commits up and update this branch once we review and merge them. It doesn't matter where it lives really, except that it seems like most of it is already in one place.

@erikerlandson
Copy link
Contributor Author

erikerlandson commented Dec 2, 2016

@iyanuobidele the current head of this branch should be equivalent, but it can't hurt to rebase just in case.

@foxish yesterday occurred to me that it would've been sensible set up a GH organization for this little consortium, and run the PR from there. Presumably we still could. The drawback to making any change to it now is losing the dialog that's happened here already.

The problem I'm interested in solving even more is ensuring that everybody who contributes to this PR gets logged somehow in the upstream commit history when this PR eventually gets merged. I don't think that would happen in the case of a typical squash-merge, although I assume it would using a rebase merge. @rxin do you have any thoughts on that?

PS: I created a GH org https://github.com/apache-spark-on-k8s in the event that there is interest

@rxin
Copy link
Contributor

rxin commented Dec 3, 2016

We should acknowledge people that have contributed to this in the merged commit, but I don't think it'd make a lot of sense to do a merge rather than squash. The initial Spark SQL commit was a single commit even though it was a much larger project #146

ghost pushed a commit to dbtsai/spark that referenced this pull request Dec 7, 2016
## What changes were proposed in this pull request?

* Moves yarn and mesos scheduler backends to resource-managers/ sub-directory (in preparation for https://issues.apache.org/jira/browse/SPARK-18278)
* Corresponding change in top-level pom.xml.

Ref: apache#16061 (comment)

## How was this patch tested?

* Manual tests

/cc rxin

Author: Anirudh <ramanathana@google.com>

Closes apache#16092 from foxish/fix-scheduler-structure-2.
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 15, 2016
## What changes were proposed in this pull request?

* Moves yarn and mesos scheduler backends to resource-managers/ sub-directory (in preparation for https://issues.apache.org/jira/browse/SPARK-18278)
* Corresponding change in top-level pom.xml.

Ref: apache#16061 (comment)

## How was this patch tested?

* Manual tests

/cc rxin

Author: Anirudh <ramanathana@google.com>

Closes apache#16092 from foxish/fix-scheduler-structure-2.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?

* Moves yarn and mesos scheduler backends to resource-managers/ sub-directory (in preparation for https://issues.apache.org/jira/browse/SPARK-18278)
* Corresponding change in top-level pom.xml.

Ref: apache#16061 (comment)

## How was this patch tested?

* Manual tests

/cc rxin

Author: Anirudh <ramanathana@google.com>

Closes apache#16092 from foxish/fix-scheduler-structure-2.

* Clone the fork of spark: https://github.com/foxish/spark/ and switch to the k8s-support branch.
* Build the project
* ./build/mvn -Pkubernetes -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests package
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think hadoop-2.4 is gone in master.

<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.1.0-SNAPSHOT</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2.1.0-SNAPSHOT???

<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>spark-kubernetes_2.11</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

${scala.binary.version}

@mccheah
Copy link
Contributor

mccheah commented Mar 3, 2017

Work on this feature has moved to https://github.com/apache-spark-on-k8s/spark. The exact diff we are working with is this: apache-spark-on-k8s#1. Feel free to provide feedback on the fork's PR.

@erikerlandson can you close this PR?

@foxish
Copy link
Contributor

foxish commented Mar 3, 2017

+1 I think we should close this PR to avoid confusion.

@srowen srowen mentioned this pull request Mar 22, 2017
@asfgit asfgit closed this in b70c03a Mar 23, 2017
@mccheah
Copy link
Contributor

mccheah commented Mar 23, 2017

The diff showing our progress so far has moved to apache-spark-on-k8s#200 for anyone following along.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.