Skip to content

[SPARK-18278] [Scheduler] Support native submission of spark jobs to a kubernetes cluster #16061

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>kubernetes</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kubernetes_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>hive</id>
<dependencies>
Expand Down
27 changes: 25 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ object SparkSubmit extends CommandLineUtils {
private val STANDALONE = 2
private val MESOS = 4
private val LOCAL = 8
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
private val KUBERNETES = 16
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | KUBERNETES | LOCAL

// Deploy modes
private val CLIENT = 1
Expand Down Expand Up @@ -230,8 +231,9 @@ object SparkSubmit extends CommandLineUtils {
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("local") => LOCAL
case m if m.startsWith("k8s") => KUBERNETES
case _ =>
printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
printErrorAndExit("Master must either be yarn or start with spark, mesos, k8s, local")
-1
}

Expand Down Expand Up @@ -274,6 +276,7 @@ object SparkSubmit extends CommandLineUtils {
}
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER

// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for packages that include Python code
Expand Down Expand Up @@ -596,6 +599,26 @@ object SparkSubmit extends CommandLineUtils {
}
}

if (isKubernetesCluster) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if in Kubernetes and client mode?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should throw an error for now if in client mode with Kubernetes. There's still open questions about how we can manage the networking there, so we can revisit the support later.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Guys, really excited about this work by the way. :)

Just wondering if client mode from within a kubernetes cluster will be supported? Not looking to add work, just curious.

I'm wondering whether we could check if the client IP is within the kubernetes container CIDR (this info should be available via the kubernetes API), instead of just blocking client mode altogether. This would support Zeppelin/Jupyter instances running within kube connecting to spark. Which is a big usecase in our case.

We could take the nodelist and check if the client IP is in any of the v1.NodeSpec.PodCIDR of any node. Although as k8s clusters get large (nodes>1000), this may become a costly operation.

This would allow for client mode support, provided the client is on one of the kube nodes.

childMainClass = "org.apache.spark.deploy.kubernetes.Client"
if (args.isPython) {
childArgs += ("--primary-py-file", args.primaryResource)
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
} else if (args.isR) {
val mainFile = new Path(args.primaryResource).getName
childArgs += ("--primary-r-file", mainFile)
childArgs += ("--class", "org.apache.spark.deploy.RRunner")
} else {
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs += ("--jar", args.primaryResource)
}
childArgs += ("--class", args.mainClass)
}
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
}
}

// Load any properties specified through --conf and the default properties file
for ((k, v) <- args.sparkProperties) {
sysProps.getOrElseUpdate(k, v)
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.4
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ jersey-client-2.22.2.jar
jersey-common-2.22.2.jar
jersey-container-servlet-2.22.2.jar
jersey-container-servlet-core-2.22.2.jar
jersey-guava-2.22.2.jar
jersey-guava-2.22.2.jarshaded-proto
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason why this changed? Would be curious to trace how this dependency changed.

jersey-media-jaxb-2.22.2.jar
jersey-server-2.22.2.jar
jets3t-0.9.3.jar
Expand Down
4 changes: 3 additions & 1 deletion dev/make-distribution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@ export MAVEN_OPTS="${MAVEN_OPTS:--Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCac
# Store the command as an array because $MVN variable might have spaces in it.
# Normal quoting tricks don't work.
# See: http://mywiki.wooledge.org/BashFAQ/050
BUILD_COMMAND=("$MVN" -T 1C clean package -DskipTests $@)
# BUILD_COMMAND=("$MVN" -T 1C clean package -DskipTests $@)

BUILD_COMMAND=("$MVN" -T 2C package -DskipTests $@)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be reverted.


# Actually build the jar
echo -e "\nBuilding with..."
Expand Down
21 changes: 21 additions & 0 deletions kubernetes/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Pre-requisites
* maven, JDK and all other pre-requisites for building Spark.

# Steps to compile

* Clone the fork of spark: https://github.com/foxish/spark/ and switch to the k8s-support branch.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not correct now?

* Build the project
* ./build/mvn -Pkubernetes -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests package
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think hadoop-2.4 is gone in master.

* Ensure that you are pointing to a k8s cluster (kubectl config current-context), which you want to use with spark.
* Launch a spark-submit job:
* `./bin/spark-submit --deploy-mode cluster --class org.apache.spark.examples.SparkPi --master k8s://default --conf spark.executor.instances=5 --conf spark.kubernetes.sparkImage=manyangled/kube-spark:dynamic http://storage.googleapis.com/foxish-spark-distro/original-spark-examples_2.11-2.1.0-SNAPSHOT.jar 10000`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to prepare an official image for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There will need to be some official Apache Spark repository for images, which I presume will be up to Apache Spark to create. The exact nature of the images to be produced is still being discussed

I did create a "semi-official" org up on docker hub called "k8s4spark":
https://hub.docker.com/u/k8s4spark/dashboard/

I haven't actually pushed any images to it, but we could start using it as an interim repo if people think that is useful

* Optionally, the following config parameters can be supplied to spark-submit with additional `--conf` arguments (or a configuration file).
* spark.kubernetes.serviceAccountName (defaults to "default")
* spark.kubernetes.namespace (defaults to "default"). The namespace must exist prior to launching spark-submit.
* The image is built from https://github.com/erikerlandson/openshift-spark.
* `--master k8s://default` ensures that it picks up the correct APIServer the default from the current context.
* Check for pods being created. Watch the master logs using kubectl log -f <driver-pod>.
* If on a cloud/infrastructure provider that allows external load balancers to be provisioned, an external IP will be allocated to the service associated with the driver. The spark-master UI can be accessed from that IP address on port 4040.


![spark-submit](spark-submit.png)
54 changes: 54 additions & 0 deletions kubernetes/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.1.0-SNAPSHOT</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2.1.0-SNAPSHOT???

<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>spark-kubernetes_2.11</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

${scala.binary.version}

<packaging>jar</packaging>
<name>Spark Project Kubernetes</name>
<properties>
<sbt.project.name>kubernetes</sbt.project.name>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>1.4.8</version>
</dependency>

<!-- Explicit listing of transitive deps that are shaded. Otherwise, odd compiler crashes. -->
</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>

</project>
Binary file added kubernetes/spark-submit.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 3 additions & 0 deletions kubernetes/src/main/resources/META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Manifest-Version: 1.0
Main-Class: org.apache.spark.deploy.kubernetes.Client

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.scheduler.cluster.kubernetes.KubernetesClusterManager
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.kubernetes

import java.util.concurrent.CountDownLatch

import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.cluster.kubernetes.KubernetesClusterScheduler
import org.apache.spark.util.ShutdownHookManager

private[spark] class Client(val args: ClientArguments,
val hadoopConf: Configuration,
val sparkConf: SparkConf)
extends Logging {
private val scheduler = new KubernetesClusterScheduler(sparkConf)
private val shutdownLatch = new CountDownLatch(1)

def this(clientArgs: ClientArguments, spConf: SparkConf) =
this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)

def start(): Unit = {
scheduler.start(args)
}

def stop(): Unit = {
scheduler.stop()
shutdownLatch.countDown()
System.clearProperty("SPARK_KUBERNETES_MODE")
}

def awaitShutdown(): Unit = {
shutdownLatch.await()
}
}

private object Client extends Logging {
def main(argStrings: Array[String]) {
val sparkConf = new SparkConf
System.setProperty("SPARK_KUBERNETES_MODE", "true")
val args = new ClientArguments(argStrings)
val client = new Client(args, sparkConf)
client.start()

logDebug("Adding shutdown hook")
ShutdownHookManager.addShutdownHook { () =>
logInfo("Shutdown hook is shutting down client")
client.stop()
client.awaitShutdown()
}
client.awaitShutdown()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.kubernetes

import scala.collection.mutable.ArrayBuffer

private[spark] class ClientArguments(args: Array[String]) {

var userJar: String = null
var userClass: String = null
var primaryPyFile: String = null
var primaryRFile: String = null
var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()

parseArgs(args.toList)

private def parseArgs(inputArgs: List[String]): Unit = {
var args = inputArgs

while (!args.isEmpty) {
args match {
case ("--jar") :: value :: tail =>
userJar = value
args = tail

case ("--class") :: value :: tail =>
userClass = value
args = tail

case ("--primary-py-file") :: value :: tail =>
primaryPyFile = value
args = tail

case ("--primary-r-file") :: value :: tail =>
primaryRFile = value
args = tail

case ("--arg") :: value :: tail =>
userArgs += value
args = tail

case Nil =>

case _ =>
throw new IllegalArgumentException(getUsageMessage(args))
}
}

if (primaryPyFile != null && primaryRFile != null) {
throw new IllegalArgumentException("Cannot have primary-py-file and primary-r-file" +
" at the same time")
}
}

private def getUsageMessage(unknownParam: List[String] = null): String = {
val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else ""
message +
s"""
|Usage: org.apache.spark.deploy.kubernetes.Client [options]
|Options:
| --jar JAR_PATH Path to your application's JAR file (required in kubernetes-cluster
| mode)
| --class CLASS_NAME Name of your application's main class (required)
| --primary-py-file A main Python file
| --primary-r-file A main R file
| --arg ARG Argument to be passed to your application's main class.
| Multiple invocations are possible, each will be passed in order.
""".stripMargin
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster.kubernetes

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}

/**
* Cluster Manager for creation of Kubernetes scheduler and backend
*/
private[spark] class KubernetesClusterManager extends ExternalClusterManager {
override def canCreate(masterURL: String): Boolean = {
masterURL.startsWith("k8s")
}

override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
new TaskSchedulerImpl(sc)
}

override def createSchedulerBackend(sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend = {
new KubernetesClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
}

override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
}
}

Loading