Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

[SPARK-18278] Minimal support for submitting to Kubernetes. #1

Closed
wants to merge 80 commits into from

Conversation

mccheah
Copy link

@mccheah mccheah commented Dec 9, 2016

Phase 1 for Kubernetes support in Spark. Only supports static resource allocation in cluster mode.

Some changes were made to the original fundamental approach to foxish#7, in particular how the REST server is built. Now, the REST server uses the existing submission REST server infrastructure as its base, instead of creating new Jetty-based code from scratch. Additionally, uploading local dependencies now uses a separate field in order to deduplicate from specifying jars local to the docker image via "spark.jars" vs. uploading jars from the client machine. The appropraite APIs to expose these configuration knobs are open to discussion, especially since the user's main resource is currently being uploaded indiscriminately but one could foresee the user wanting to specify their main resource as a file local to the Docker image's disk.

Also the client arguments have changed to mostly use Spark properties. Like the YARN support, common configuration points should be able to be set by arguments to spark-submit, but translated to properties in SparkConf.

@mccheah mccheah changed the title K8s support alternate incremental [SPARK-18278] Minimal support for submitting to Kubernetes. Dec 9, 2016
@ash211
Copy link

ash211 commented Jan 7, 2017

@mccheah is this PR the one meant to be the phase one implementation (without the API layer)? If so let's update the description to make that clear what this is intended to become.

@ash211 ash211 mentioned this pull request Jan 7, 2017
22 tasks
- Don't hold the raw secret bytes
- Add CPU limits and requests
.addToRequests("memory", executorMemoryQuantity)
.addToLimits("memory", executorMemoryLimitQuantity)
.addToRequests("cpu", executorCpuQuantity)
.addToLimits("cpu", executorCpuQuantity)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the container goes over this CPU limit is it forcefully killed? Or is the CPU limit merely what's provided on the hardware, and then it's impossible for the container to actually ever go over the limit?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Details are in https://github.com/kubernetes/kubernetes/blob/master/docs/design/resource-qos.md
The container wouldn't get killed, as CPU is classified as a compressible resource. It would get throttled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If request == limit, we are making its QoS == guaranteed. It's the requested amount that is considered when making scheduling decisions. Having a higher limit would let us burst to use more CPU if it is available when overcommit is enabled in the cluster.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm so if I have 2 cores on a cluster and this Spark service needs 1 core minimum but would happily accept the second core if it was available, how should I set up cpu requests and limits? request=1 and limit=2 ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we're expecting CPU bursts here - executors use a fixed-size thread pool based on the provided core count for running tasks. Setting limit == requested seems sufficient, but I'm not sure if this accounts for other random threads that are in the executor process.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should have the QoS class of these executor pods be configurable. When we think of clusters sharing pods running serving workloads alongside Spark, the batch jobs would be lower priority, and it would make sense to make the executors be "best-effort" (no limit/request). They would get killed before the serving jobs if CPU/Memory were lacking.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the semantics we have here are closer to what is expected by Spark users migrating from YARN and Mesos.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting a slightly higher limit than request for the CPU (10-25% more) may be a good thing, but if as Matt said, we are sure that we don't expect CPU bursts, it is fine as it is for the MVP. I think in future, in order to improve utilization, we want to allow pods having different QoS classes. We probably want something similar to YARN's queueing mechanism which we can implicitly translate into QoS class. We can discuss this further after phase 1 is complete however.

ash211
ash211 previously requested changes Jan 7, 2017
@@ -229,6 +230,7 @@ object SparkSubmit extends CommandLineUtils {
YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("kubernetes") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this error message needs updating

@@ -229,6 +230,7 @@ object SparkSubmit extends CommandLineUtils {
YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("kubernetes") => KUBERNETES
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we agreed we wanted kubernetes URLs to be of the form: k8s://http://hostname:port and k8s://https://hostname:port

@@ -0,0 +1,101 @@
<?xml version="1.0" encoding="UTF-8"?>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all the kubernetes files need to be under /resource-managers/kubernetes now that upstream has put yarn and mesos into the resource-managers folder

.endContainer()
.endSpec()
.done()
submitCompletedFuture.get(30, TimeUnit.SECONDS)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got this exception when trying to run a job and the image failed to pull:

Exception in thread "OkHttp https://10.0.23.119:6443/api/v1/namespaces/spark-test1/pods?labelSelector=driver-launcher-selector%3Ddriver-launcher-1483785139234&resourceVersion=10329&watch=true WebSocket" java.lang.NullPointerException
	at org.spark_project.guava.base.Preconditions.checkNotNull(Preconditions.java:191)
	at org.spark_project.guava.util.concurrent.AbstractFuture.setException(AbstractFuture.java:201)
	at org.spark_project.guava.util.concurrent.SettableFuture.setException(SettableFuture.java:68)
	at org.apache.spark.deploy.kubernetes.Client$$anonfun$run$5$$anon$1.onClose(Client.scala:177)
	at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onClose(WatchConnectionManager.java:255)
	at okhttp3.internal.ws.RealWebSocket.peerClose(RealWebSocket.java:197)
	at okhttp3.internal.ws.RealWebSocket.access$200(RealWebSocket.java:38)
	at okhttp3.internal.ws.RealWebSocket$1$2.execute(RealWebSocket.java:84)
	at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Exception in thread "main" java.util.concurrent.TimeoutException: Timeout waiting for task.
	at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:276)
	at org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:96)
	at org.apache.spark.deploy.kubernetes.Client$$anonfun$run$5.org$apache$spark$deploy$kubernetes$Client$$anonfun$$createDriverPod$1(Client.scala:218)
	at org.apache.spark.deploy.kubernetes.Client$$anonfun$run$5$$anonfun$apply$2.apply(Client.scala:224)
	at org.apache.spark.deploy.kubernetes.Client$$anonfun$run$5$$anonfun$apply$2.apply(Client.scala:224)
	at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2508)
	at org.apache.spark.deploy.kubernetes.Client$$anonfun$run$5.apply(Client.scala:224)
	at org.apache.spark.deploy.kubernetes.Client$$anonfun$run$5.apply(Client.scala:102)
	at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2508)
	at org.apache.spark.deploy.kubernetes.Client.run(Client.scala:102)
	at org.apache.spark.deploy.kubernetes.Client$.main(Client.scala:361)
	at org.apache.spark.deploy.kubernetes.Client.main(Client.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:753)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:178)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:117)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
aash@aash01-mac ~$ kubectl get pods -n spark-test1
NAME       READY     STATUS         RESTARTS   AGE
spark-pi   0/1       ErrImagePull   0          1m
aash@aash01-mac ~$

We should provide a better message instead of timing out here, something like pod spark-pi in namespace spark-test1 never reached Running status. Last observed status: ErrImagePull

FROM ubuntu:trusty

# Upgrade package index
# install a few other useful packages plus Open Jdk 7
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer Java 8 over Java 7 in these templates

<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

../../pom.xml

@foxish
Copy link
Member

foxish commented Jan 9, 2017

@mccheah I'm trying to build this and get compiler errors

[INFO] Spark Project Shuffle Streaming Service ............ FAILURE [  0.874 s] 

My build command is

./build/mvn -U -T 4 -Pkubernetes -Dhadoop.version=2.4.0 -Dmaven.test.skip=true -DskipTests package

Do you use a different one?

@mccheah
Copy link
Author

mccheah commented Jan 9, 2017

@foxish this works for me: build/mvn compile package -DskipTests -Pkubernetes -Phadoop2.4

@@ -229,6 +230,7 @@ object SparkSubmit extends CommandLineUtils {
YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update this error message to have "k8s" in the list

@ash211
Copy link

ash211 commented Jan 10, 2017

I think two of my earlier comments are still remaining:

  • use java 8 instead of java 7 in the docker images
  • Good error message for ErrImagePull error state

Can you please send those in as separate PRs into this one's branch? It will be easier to review separately that way from the full PR

mccheah and others added 4 commits January 11, 2017 14:36
* Use tar and gzip to archive shipped jars.

* Address comments

* Move files to resolve merge
* Use alpine and java 8 for docker images.

* Remove installation of vim and redundant comment
@ssuchter
Copy link
Member

Hi folks -

In case anyone hasn't seen how to do this before, here's how you debug the failing Travis CI build. Click on one of the "Details" links - here's the one for the pull request:

https://travis-ci.org/apache-spark-on-k8s/spark/builds/191462919

then in that one, Travis has two separate CI builds, one for Java 7 and one for Java 8. Both failed, so I just clicked on the first one, #2.1. That gets me here:

https://travis-ci.org/apache-spark-on-k8s/spark/jobs/191462920

In the "Job log" on that page you can see the failure, it's in the Checkstyle linter:

[ERROR] src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java:[83] (sizes) LineLength: Line is longer than 100 characters (found 102).

mccheah and others added 3 commits January 12, 2017 17:59
* Error messages when the driver container fails to start.

* Fix messages a bit

* Use timeout constant

* Delete the pod if it fails for any reason (not just timeout)

* Actually set submit succeeded

* Fix typo
* Documentation for the current state of the world.

* Adding navigation links from other pages

* Address comments, add TODO for things that should be fixed

* Address comments, mostly making images section clearer

* Virtual runtime -> container runtime
mccheah and others added 11 commits February 23, 2017 14:44
…122)

* Refactor the cleaning up of Kubernetes components.

Create a KubernetesComponentsCleaner which can register arbitrary pods,
services, secrets, and ingresses. When an exception is thrown or the JVM
shuts down, the cleaner automatically purges any of its registered
components from Kubernetes. The components can be unregistered when the
driver successfully begins running, so that the application persists
beyond the lifetime of the spark-submit process.

* Fix spacing

* Address comments

* Fix compiler error

* Pull KubernetesComponentCleaner into instance variable

* Remove a parameter

* Remove redundant registerOrUpdateSecret for SSL

* Remove Ingresses from component cleaner

* Clear resources generically as opposed to specifying each type

* Remove incorrect test assertion

* Rename variable
* Configure unit test build while banning flaky tests

* Clean up comment
* Extract SSL configuration to a separate class

* KubernetesSsl -> Ssl, container -> local
* Exclude SparkSubmitSuite from Travis unit test build

* Remove SortShuffleSuite

* Exclude Java tests
* pod-watch progress around watch events

* Simplify return

* comments
* Document blocking submit calls

#53 added these config but didn't document them

* Update running-on-kubernetes.md
* Update client version

* Upgrade minikube

* Update pom.xml
…a annotations (#147)

* Listen for annotations that provide external URIs.

* FIx scalstyle

* Address comments

* Fix doc style

* Docs updates

* Clearly explain path rewrites
mccheah and others added 9 commits March 3, 2017 13:45
* Allow setting memory on the driver submission server.

* Address comments

* Address comments
* Adding prerequisites

* address comments
* Logging for resource deletion

Remove dangling colon and replace with an ellipses and a second log statement

* Update KubernetesResourceCleaner.scala
* Adding official alpha docker image to docs

* Reorder sections and create a specific one for "advanced"

* Provide limitations and instructions about running on GKE

* Fix title of advanced section: submission

* Improved section on running in the cloud

* Update versioning

* Address comments

* Address comments
* Add Apache license to a few files

* Ignore license check on META-INF service
* Allow providing an OAuth token for authenticating against k8s

* Organize imports

* Fix style

* Remove extra newline

* Use OAuth token data instead of a file.
@mccheah
Copy link
Author

mccheah commented Mar 22, 2017

Closing this in favor of diffing from branch-2.1-kubernetes. Probably will need to set the base branch to 2.1 also.

@mccheah mccheah closed this Mar 22, 2017
@mccheah
Copy link
Author

mccheah commented Mar 22, 2017

See #200

echarles pushed a commit to datalayer-externals/spark that referenced this pull request Apr 23, 2019
…te temporary path in local staging directory

## What changes were proposed in this pull request?
Th environment of my cluster as follows:
```
OS:Linux version 2.6.32-220.7.1.el6.x86_64 (mockbuildc6b18n3.bsys.dev.centos.org) (gcc version 4.4.6 20110731 (Red Hat 4.4.6-3) (GCC) ) apache-spark-on-k8s#1 SMP Wed Mar 7 00:52:02 GMT 2012
Hadoop: 2.7.2
Spark: 2.3.0 or 3.0.0(master branch)
Hive: 1.2.1
```

My spark run on deploy mode yarn-client.

If I execute the SQL `insert overwrite local directory '/home/test/call_center/' select * from call_center`, a HiveException will appear as follows:
`Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.io.IOException: Mkdirs failed to create file:/home/xitong/hive/stagingdir_hive_2019-02-19_17-31-00_678_1816816774691551856-1/-ext-10000/_temporary/0/_temporary/attempt_20190219173233_0002_m_000000_3 (exists=false, cwd=file:/data10/yarn/nm-local-dir/usercache/xitong/appcache/application_1543893582405_6126857/container_e124_1543893582405_6126857_01_000011)
at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:249)`
Current spark sql generate a local temporary path in local staging directory.The schema of local temporary path start with `file`, so the HiveException appears.
This PR change the local temporary path to HDFS temporary path, and use DistributedFileSystem instance copy the data from HDFS temporary path to local directory.
If Spark run on local deploy mode, 'insert overwrite local directory' works fine.
## How was this patch tested?

UT cannot support yarn-client mode.The test is in my product environment.

Closes apache#23841 from beliefer/fix-bug-of-insert-overwrite-local-dir.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
echarles pushed a commit to datalayer-externals/spark that referenced this pull request Apr 23, 2019
## What changes were proposed in this pull request?

This PR supports `OpenJ9` in addition to `IBM JDK` and `OpenJDK` in Spark by handling `System.getProperty("java.vendor") = "Eclipse OpenJ9"`.

In `inferDefaultMemory()` and `getKrb5LoginModuleName()`, this PR uses non `IBM` way.

```
$ ~/jdk-11.0.2+9_openj9-0.12.1/bin/jshell
|  Welcome to JShell -- Version 11.0.2
|  For an introduction type: /help intro

jshell> System.out.println(System.getProperty("java.vendor"))
Eclipse OpenJ9

jshell> System.out.println(System.getProperty("java.vm.info"))
JRE 11 Linux amd64-64-Bit Compressed References 20190204_127 (JIT enabled, AOT enabled)
OpenJ9   - 90dd8cb40
OMR      - d2f4534b
JCL      - 289c70b6844 based on jdk-11.0.2+9

jshell> System.out.println(Class.forName("com.ibm.lang.management.OperatingSystemMXBean").getDeclaredMethod("getTotalPhysicalMemory"))
public abstract long com.ibm.lang.management.OperatingSystemMXBean.getTotalPhysicalMemory()

jshell> System.out.println(Class.forName("com.sun.management.OperatingSystemMXBean").getDeclaredMethod("getTotalPhysicalMemorySize"))
public abstract long com.sun.management.OperatingSystemMXBean.getTotalPhysicalMemorySize()

jshell> System.out.println(Class.forName("com.ibm.security.auth.module.Krb5LoginModule"))
|  Exception java.lang.ClassNotFoundException: com.ibm.security.auth.module.Krb5LoginModule
|        at Class.forNameImpl (Native Method)
|        at Class.forName (Class.java:339)
|        at (apache-spark-on-k8s#1:1)

jshell> System.out.println(Class.forName("com.sun.security.auth.module.Krb5LoginModule"))
class com.sun.security.auth.module.Krb5LoginModule
```

## How was this patch tested?

Existing test suites
Manual testing with OpenJ9.

Closes apache#24308 from kiszk/SPARK-27397.

Authored-by: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants