-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-18278] [Scheduler] Support native submission of spark jobs to a kubernetes cluster #16061
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d392800
028f36c
0439454
098f551
196aad5
24d4896
b99662a
ec59033
f336d4d
4cab6e4
70a6f3f
63bc532
7876892
8584913
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -98,7 +98,7 @@ jersey-client-2.22.2.jar | |
jersey-common-2.22.2.jar | ||
jersey-container-servlet-2.22.2.jar | ||
jersey-container-servlet-core-2.22.2.jar | ||
jersey-guava-2.22.2.jar | ||
jersey-guava-2.22.2.jarshaded-proto | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any particular reason why this changed? Would be curious to trace how this dependency changed. |
||
jersey-media-jaxb-2.22.2.jar | ||
jersey-server-2.22.2.jar | ||
jets3t-0.9.3.jar | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -154,7 +154,9 @@ export MAVEN_OPTS="${MAVEN_OPTS:--Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCac | |
# Store the command as an array because $MVN variable might have spaces in it. | ||
# Normal quoting tricks don't work. | ||
# See: http://mywiki.wooledge.org/BashFAQ/050 | ||
BUILD_COMMAND=("$MVN" -T 1C clean package -DskipTests $@) | ||
# BUILD_COMMAND=("$MVN" -T 1C clean package -DskipTests $@) | ||
|
||
BUILD_COMMAND=("$MVN" -T 2C package -DskipTests $@) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be reverted. |
||
|
||
# Actually build the jar | ||
echo -e "\nBuilding with..." | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
# Pre-requisites | ||
* maven, JDK and all other pre-requisites for building Spark. | ||
|
||
# Steps to compile | ||
|
||
* Clone the fork of spark: https://github.com/foxish/spark/ and switch to the k8s-support branch. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is not correct now? |
||
* Build the project | ||
* ./build/mvn -Pkubernetes -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests package | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think |
||
* Ensure that you are pointing to a k8s cluster (kubectl config current-context), which you want to use with spark. | ||
* Launch a spark-submit job: | ||
* `./bin/spark-submit --deploy-mode cluster --class org.apache.spark.examples.SparkPi --master k8s://default --conf spark.executor.instances=5 --conf spark.kubernetes.sparkImage=manyangled/kube-spark:dynamic http://storage.googleapis.com/foxish-spark-distro/original-spark-examples_2.11-2.1.0-SNAPSHOT.jar 10000` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to prepare an official image for this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There will need to be some official Apache Spark repository for images, which I presume will be up to Apache Spark to create. The exact nature of the images to be produced is still being discussed I did create a "semi-official" org up on docker hub called "k8s4spark": I haven't actually pushed any images to it, but we could start using it as an interim repo if people think that is useful |
||
* Optionally, the following config parameters can be supplied to spark-submit with additional `--conf` arguments (or a configuration file). | ||
* spark.kubernetes.serviceAccountName (defaults to "default") | ||
* spark.kubernetes.namespace (defaults to "default"). The namespace must exist prior to launching spark-submit. | ||
* The image is built from https://github.com/erikerlandson/openshift-spark. | ||
* `--master k8s://default` ensures that it picks up the correct APIServer the default from the current context. | ||
* Check for pods being created. Watch the master logs using kubectl log -f <driver-pod>. | ||
* If on a cloud/infrastructure provider that allows external load balancers to be provisioned, an external IP will be allocated to the service associated with the driver. The spark-master UI can be accessed from that IP address on port 4040. | ||
|
||
|
||
 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
~ Licensed to the Apache Software Foundation (ASF) under one or more | ||
~ contributor license agreements. See the NOTICE file distributed with | ||
~ this work for additional information regarding copyright ownership. | ||
~ The ASF licenses this file to You under the Apache License, Version 2.0 | ||
~ (the "License"); you may not use this file except in compliance with | ||
~ the License. You may obtain a copy of the License at | ||
~ | ||
~ http://www.apache.org/licenses/LICENSE-2.0 | ||
~ | ||
~ Unless required by applicable law or agreed to in writing, software | ||
~ distributed under the License is distributed on an "AS IS" BASIS, | ||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
~ See the License for the specific language governing permissions and | ||
~ limitations under the License. | ||
--> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
<parent> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-parent_2.11</artifactId> | ||
<version>2.1.0-SNAPSHOT</version> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
<relativePath>../pom.xml</relativePath> | ||
</parent> | ||
|
||
<artifactId>spark-kubernetes_2.11</artifactId> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
<packaging>jar</packaging> | ||
<name>Spark Project Kubernetes</name> | ||
<properties> | ||
<sbt.project.name>kubernetes</sbt.project.name> | ||
</properties> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-core_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.fabric8</groupId> | ||
<artifactId>kubernetes-client</artifactId> | ||
<version>1.4.8</version> | ||
</dependency> | ||
|
||
<!-- Explicit listing of transitive deps that are shaded. Otherwise, odd compiler crashes. --> | ||
</dependencies> | ||
|
||
<build> | ||
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> | ||
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> | ||
</build> | ||
|
||
</project> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
Manifest-Version: 1.0 | ||
Main-Class: org.apache.spark.deploy.kubernetes.Client | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
org.apache.spark.scheduler.cluster.kubernetes.KubernetesClusterManager |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.deploy.kubernetes | ||
|
||
import java.util.concurrent.CountDownLatch | ||
|
||
import org.apache.hadoop.conf.Configuration | ||
import org.apache.spark.SparkConf | ||
import org.apache.spark.deploy.SparkHadoopUtil | ||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.scheduler.cluster.kubernetes.KubernetesClusterScheduler | ||
import org.apache.spark.util.ShutdownHookManager | ||
|
||
private[spark] class Client(val args: ClientArguments, | ||
val hadoopConf: Configuration, | ||
val sparkConf: SparkConf) | ||
extends Logging { | ||
private val scheduler = new KubernetesClusterScheduler(sparkConf) | ||
private val shutdownLatch = new CountDownLatch(1) | ||
|
||
def this(clientArgs: ClientArguments, spConf: SparkConf) = | ||
this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf) | ||
|
||
def start(): Unit = { | ||
scheduler.start(args) | ||
} | ||
|
||
def stop(): Unit = { | ||
scheduler.stop() | ||
shutdownLatch.countDown() | ||
System.clearProperty("SPARK_KUBERNETES_MODE") | ||
} | ||
|
||
def awaitShutdown(): Unit = { | ||
shutdownLatch.await() | ||
} | ||
} | ||
|
||
private object Client extends Logging { | ||
def main(argStrings: Array[String]) { | ||
val sparkConf = new SparkConf | ||
System.setProperty("SPARK_KUBERNETES_MODE", "true") | ||
val args = new ClientArguments(argStrings) | ||
val client = new Client(args, sparkConf) | ||
client.start() | ||
|
||
logDebug("Adding shutdown hook") | ||
ShutdownHookManager.addShutdownHook { () => | ||
logInfo("Shutdown hook is shutting down client") | ||
client.stop() | ||
client.awaitShutdown() | ||
} | ||
client.awaitShutdown() | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.deploy.kubernetes | ||
|
||
import scala.collection.mutable.ArrayBuffer | ||
|
||
private[spark] class ClientArguments(args: Array[String]) { | ||
|
||
var userJar: String = null | ||
var userClass: String = null | ||
var primaryPyFile: String = null | ||
var primaryRFile: String = null | ||
var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]() | ||
|
||
parseArgs(args.toList) | ||
|
||
private def parseArgs(inputArgs: List[String]): Unit = { | ||
var args = inputArgs | ||
|
||
while (!args.isEmpty) { | ||
args match { | ||
case ("--jar") :: value :: tail => | ||
userJar = value | ||
args = tail | ||
|
||
case ("--class") :: value :: tail => | ||
userClass = value | ||
args = tail | ||
|
||
case ("--primary-py-file") :: value :: tail => | ||
primaryPyFile = value | ||
args = tail | ||
|
||
case ("--primary-r-file") :: value :: tail => | ||
primaryRFile = value | ||
args = tail | ||
|
||
case ("--arg") :: value :: tail => | ||
userArgs += value | ||
args = tail | ||
|
||
case Nil => | ||
|
||
case _ => | ||
throw new IllegalArgumentException(getUsageMessage(args)) | ||
} | ||
} | ||
|
||
if (primaryPyFile != null && primaryRFile != null) { | ||
throw new IllegalArgumentException("Cannot have primary-py-file and primary-r-file" + | ||
" at the same time") | ||
} | ||
} | ||
|
||
private def getUsageMessage(unknownParam: List[String] = null): String = { | ||
val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else "" | ||
message + | ||
s""" | ||
|Usage: org.apache.spark.deploy.kubernetes.Client [options] | ||
|Options: | ||
| --jar JAR_PATH Path to your application's JAR file (required in kubernetes-cluster | ||
| mode) | ||
| --class CLASS_NAME Name of your application's main class (required) | ||
| --primary-py-file A main Python file | ||
| --primary-r-file A main R file | ||
| --arg ARG Argument to be passed to your application's main class. | ||
| Multiple invocations are possible, each will be passed in order. | ||
""".stripMargin | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.scheduler.cluster.kubernetes | ||
|
||
import org.apache.spark.{SparkContext, SparkException} | ||
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} | ||
|
||
/** | ||
* Cluster Manager for creation of Kubernetes scheduler and backend | ||
*/ | ||
private[spark] class KubernetesClusterManager extends ExternalClusterManager { | ||
override def canCreate(masterURL: String): Boolean = { | ||
masterURL.startsWith("k8s") | ||
} | ||
|
||
override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { | ||
new TaskSchedulerImpl(sc) | ||
} | ||
|
||
override def createSchedulerBackend(sc: SparkContext, | ||
masterURL: String, | ||
scheduler: TaskScheduler): SchedulerBackend = { | ||
new KubernetesClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) | ||
} | ||
|
||
override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { | ||
scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if in Kubernetes and client mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should throw an error for now if in client mode with Kubernetes. There's still open questions about how we can manage the networking there, so we can revisit the support later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey Guys, really excited about this work by the way. :)
Just wondering if client mode from within a kubernetes cluster will be supported? Not looking to add work, just curious.
I'm wondering whether we could check if the client IP is within the kubernetes container CIDR (this info should be available via the kubernetes API), instead of just blocking client mode altogether. This would support Zeppelin/Jupyter instances running within kube connecting to spark. Which is a big usecase in our case.
We could take the nodelist and check if the client IP is in any of the v1.NodeSpec.PodCIDR of any node. Although as k8s clusters get large (nodes>1000), this may become a costly operation.
This would allow for client mode support, provided the client is on one of the kube nodes.