-
Notifications
You must be signed in to change notification settings - Fork 118
[SPARK-18278] Minimal support for submitting to Kubernetes. #1
Conversation
@mccheah is this PR the one meant to be the phase one implementation (without the API layer)? If so let's update the description to make that clear what this is intended to become. |
- Don't hold the raw secret bytes - Add CPU limits and requests
.addToRequests("memory", executorMemoryQuantity) | ||
.addToLimits("memory", executorMemoryLimitQuantity) | ||
.addToRequests("cpu", executorCpuQuantity) | ||
.addToLimits("cpu", executorCpuQuantity) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the container goes over this CPU limit is it forcefully killed? Or is the CPU limit merely what's provided on the hardware, and then it's impossible for the container to actually ever go over the limit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Details are in https://github.com/kubernetes/kubernetes/blob/master/docs/design/resource-qos.md
The container wouldn't get killed, as CPU is classified as a compressible resource. It would get throttled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If request == limit, we are making its QoS == guaranteed. It's the requested amount that is considered when making scheduling decisions. Having a higher limit would let us burst to use more CPU if it is available when overcommit is enabled in the cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm so if I have 2 cores on a cluster and this Spark service needs 1 core minimum but would happily accept the second core if it was available, how should I set up cpu requests and limits? request=1 and limit=2 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if we're expecting CPU bursts here - executors use a fixed-size thread pool based on the provided core count for running tasks. Setting limit == requested seems sufficient, but I'm not sure if this accounts for other random threads that are in the executor process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should have the QoS class of these executor pods be configurable. When we think of clusters sharing pods running serving workloads alongside Spark, the batch jobs would be lower priority, and it would make sense to make the executors be "best-effort" (no limit/request). They would get killed before the serving jobs if CPU/Memory were lacking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the semantics we have here are closer to what is expected by Spark users migrating from YARN and Mesos.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting a slightly higher limit than request for the CPU (10-25% more) may be a good thing, but if as Matt said, we are sure that we don't expect CPU bursts, it is fine as it is for the MVP. I think in future, in order to improve utilization, we want to allow pods having different QoS classes. We probably want something similar to YARN's queueing mechanism which we can implicitly translate into QoS class. We can discuss this further after phase 1 is complete however.
@@ -229,6 +230,7 @@ object SparkSubmit extends CommandLineUtils { | |||
YARN | |||
case m if m.startsWith("spark") => STANDALONE | |||
case m if m.startsWith("mesos") => MESOS | |||
case m if m.startsWith("kubernetes") => KUBERNETES | |||
case m if m.startsWith("local") => LOCAL | |||
case _ => | |||
printErrorAndExit("Master must either be yarn or start with spark, mesos, local") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this error message needs updating
@@ -229,6 +230,7 @@ object SparkSubmit extends CommandLineUtils { | |||
YARN | |||
case m if m.startsWith("spark") => STANDALONE | |||
case m if m.startsWith("mesos") => MESOS | |||
case m if m.startsWith("kubernetes") => KUBERNETES |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we agreed we wanted kubernetes URLs to be of the form: k8s://http://hostname:port
and k8s://https://hostname:port
kubernetes/core/pom.xml
Outdated
@@ -0,0 +1,101 @@ | |||
<?xml version="1.0" encoding="UTF-8"?> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all the kubernetes files need to be under /resource-managers/kubernetes
now that upstream has put yarn and mesos into the resource-managers folder
.endContainer() | ||
.endSpec() | ||
.done() | ||
submitCompletedFuture.get(30, TimeUnit.SECONDS) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got this exception when trying to run a job and the image failed to pull:
Exception in thread "OkHttp https://10.0.23.119:6443/api/v1/namespaces/spark-test1/pods?labelSelector=driver-launcher-selector%3Ddriver-launcher-1483785139234&resourceVersion=10329&watch=true WebSocket" java.lang.NullPointerException
at org.spark_project.guava.base.Preconditions.checkNotNull(Preconditions.java:191)
at org.spark_project.guava.util.concurrent.AbstractFuture.setException(AbstractFuture.java:201)
at org.spark_project.guava.util.concurrent.SettableFuture.setException(SettableFuture.java:68)
at org.apache.spark.deploy.kubernetes.Client$$anonfun$run$5$$anon$1.onClose(Client.scala:177)
at io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onClose(WatchConnectionManager.java:255)
at okhttp3.internal.ws.RealWebSocket.peerClose(RealWebSocket.java:197)
at okhttp3.internal.ws.RealWebSocket.access$200(RealWebSocket.java:38)
at okhttp3.internal.ws.RealWebSocket$1$2.execute(RealWebSocket.java:84)
at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "main" java.util.concurrent.TimeoutException: Timeout waiting for task.
at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:276)
at org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:96)
at org.apache.spark.deploy.kubernetes.Client$$anonfun$run$5.org$apache$spark$deploy$kubernetes$Client$$anonfun$$createDriverPod$1(Client.scala:218)
at org.apache.spark.deploy.kubernetes.Client$$anonfun$run$5$$anonfun$apply$2.apply(Client.scala:224)
at org.apache.spark.deploy.kubernetes.Client$$anonfun$run$5$$anonfun$apply$2.apply(Client.scala:224)
at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2508)
at org.apache.spark.deploy.kubernetes.Client$$anonfun$run$5.apply(Client.scala:224)
at org.apache.spark.deploy.kubernetes.Client$$anonfun$run$5.apply(Client.scala:102)
at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2508)
at org.apache.spark.deploy.kubernetes.Client.run(Client.scala:102)
at org.apache.spark.deploy.kubernetes.Client$.main(Client.scala:361)
at org.apache.spark.deploy.kubernetes.Client.main(Client.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:753)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:178)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:117)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
aash@aash01-mac ~$ kubectl get pods -n spark-test1
NAME READY STATUS RESTARTS AGE
spark-pi 0/1 ErrImagePull 0 1m
aash@aash01-mac ~$
We should provide a better message instead of timing out here, something like pod spark-pi in namespace spark-test1 never reached Running status. Last observed status: ErrImagePull
FROM ubuntu:trusty | ||
|
||
# Upgrade package index | ||
# install a few other useful packages plus Open Jdk 7 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer Java 8 over Java 7 in these templates
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-parent_2.11</artifactId> | ||
<version>2.1.0-SNAPSHOT</version> | ||
<relativePath>../pom.xml</relativePath> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
../../pom.xml
@mccheah I'm trying to build this and get compiler errors
My build command is
Do you use a different one? |
@foxish this works for me: |
The build process fails ScalaStyle checks otherwise.
@@ -229,6 +230,7 @@ object SparkSubmit extends CommandLineUtils { | |||
YARN | |||
case m if m.startsWith("spark") => STANDALONE | |||
case m if m.startsWith("mesos") => MESOS | |||
case m if m.startsWith("k8s") => KUBERNETES | |||
case m if m.startsWith("local") => LOCAL | |||
case _ => | |||
printErrorAndExit("Master must either be yarn or start with spark, mesos, local") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please update this error message to have "k8s" in the list
…te-incremental' into k8s-support-alternate-incremental
I think two of my earlier comments are still remaining:
Can you please send those in as separate PRs into this one's branch? It will be easier to review separately that way from the full PR |
* Use tar and gzip to archive shipped jars. * Address comments * Move files to resolve merge
* Use alpine and java 8 for docker images. * Remove installation of vim and redundant comment
Hi folks - In case anyone hasn't seen how to do this before, here's how you debug the failing Travis CI build. Click on one of the "Details" links - here's the one for the pull request: https://travis-ci.org/apache-spark-on-k8s/spark/builds/191462919 then in that one, Travis has two separate CI builds, one for Java 7 and one for Java 8. Both failed, so I just clicked on the first one, #2.1. That gets me here: https://travis-ci.org/apache-spark-on-k8s/spark/jobs/191462920 In the "Job log" on that page you can see the failure, it's in the Checkstyle linter: [ERROR] src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java:[83] (sizes) LineLength: Line is longer than 100 characters (found 102). |
* Error messages when the driver container fails to start. * Fix messages a bit * Use timeout constant * Delete the pod if it fails for any reason (not just timeout) * Actually set submit succeeded * Fix typo
* Documentation for the current state of the world. * Adding navigation links from other pages * Address comments, add TODO for things that should be fixed * Address comments, mostly making images section clearer * Virtual runtime -> container runtime
…122) * Refactor the cleaning up of Kubernetes components. Create a KubernetesComponentsCleaner which can register arbitrary pods, services, secrets, and ingresses. When an exception is thrown or the JVM shuts down, the cleaner automatically purges any of its registered components from Kubernetes. The components can be unregistered when the driver successfully begins running, so that the application persists beyond the lifetime of the spark-submit process. * Fix spacing * Address comments * Fix compiler error * Pull KubernetesComponentCleaner into instance variable * Remove a parameter * Remove redundant registerOrUpdateSecret for SSL * Remove Ingresses from component cleaner * Clear resources generically as opposed to specifying each type * Remove incorrect test assertion * Rename variable
* Configure unit test build while banning flaky tests * Clean up comment
* Extract SSL configuration to a separate class * KubernetesSsl -> Ssl, container -> local
* Exclude SparkSubmitSuite from Travis unit test build * Remove SortShuffleSuite * Exclude Java tests
* pod-watch progress around watch events * Simplify return * comments
* Document blocking submit calls #53 added these config but didn't document them * Update running-on-kubernetes.md
* Update client version * Upgrade minikube * Update pom.xml
…a annotations (#147) * Listen for annotations that provide external URIs. * FIx scalstyle * Address comments * Fix doc style * Docs updates * Clearly explain path rewrites
* Allow setting memory on the driver submission server. * Address comments * Address comments
* Adding prerequisites * address comments
* Logging for resource deletion Remove dangling colon and replace with an ellipses and a second log statement * Update KubernetesResourceCleaner.scala
* Adding official alpha docker image to docs * Reorder sections and create a specific one for "advanced" * Provide limitations and instructions about running on GKE * Fix title of advanced section: submission * Improved section on running in the cloud * Update versioning * Address comments * Address comments
* Add Apache license to a few files * Ignore license check on META-INF service
* Allow providing an OAuth token for authenticating against k8s * Organize imports * Fix style * Remove extra newline * Use OAuth token data instead of a file.
Closing this in favor of diffing from branch-2.1-kubernetes. Probably will need to set the base branch to 2.1 also. |
See #200 |
…te temporary path in local staging directory ## What changes were proposed in this pull request? Th environment of my cluster as follows: ``` OS:Linux version 2.6.32-220.7.1.el6.x86_64 (mockbuildc6b18n3.bsys.dev.centos.org) (gcc version 4.4.6 20110731 (Red Hat 4.4.6-3) (GCC) ) apache-spark-on-k8s#1 SMP Wed Mar 7 00:52:02 GMT 2012 Hadoop: 2.7.2 Spark: 2.3.0 or 3.0.0(master branch) Hive: 1.2.1 ``` My spark run on deploy mode yarn-client. If I execute the SQL `insert overwrite local directory '/home/test/call_center/' select * from call_center`, a HiveException will appear as follows: `Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.io.IOException: Mkdirs failed to create file:/home/xitong/hive/stagingdir_hive_2019-02-19_17-31-00_678_1816816774691551856-1/-ext-10000/_temporary/0/_temporary/attempt_20190219173233_0002_m_000000_3 (exists=false, cwd=file:/data10/yarn/nm-local-dir/usercache/xitong/appcache/application_1543893582405_6126857/container_e124_1543893582405_6126857_01_000011) at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:249)` Current spark sql generate a local temporary path in local staging directory.The schema of local temporary path start with `file`, so the HiveException appears. This PR change the local temporary path to HDFS temporary path, and use DistributedFileSystem instance copy the data from HDFS temporary path to local directory. If Spark run on local deploy mode, 'insert overwrite local directory' works fine. ## How was this patch tested? UT cannot support yarn-client mode.The test is in my product environment. Closes apache#23841 from beliefer/fix-bug-of-insert-overwrite-local-dir. Authored-by: gengjiaan <gengjiaan@360.cn> Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request? This PR supports `OpenJ9` in addition to `IBM JDK` and `OpenJDK` in Spark by handling `System.getProperty("java.vendor") = "Eclipse OpenJ9"`. In `inferDefaultMemory()` and `getKrb5LoginModuleName()`, this PR uses non `IBM` way. ``` $ ~/jdk-11.0.2+9_openj9-0.12.1/bin/jshell | Welcome to JShell -- Version 11.0.2 | For an introduction type: /help intro jshell> System.out.println(System.getProperty("java.vendor")) Eclipse OpenJ9 jshell> System.out.println(System.getProperty("java.vm.info")) JRE 11 Linux amd64-64-Bit Compressed References 20190204_127 (JIT enabled, AOT enabled) OpenJ9 - 90dd8cb40 OMR - d2f4534b JCL - 289c70b6844 based on jdk-11.0.2+9 jshell> System.out.println(Class.forName("com.ibm.lang.management.OperatingSystemMXBean").getDeclaredMethod("getTotalPhysicalMemory")) public abstract long com.ibm.lang.management.OperatingSystemMXBean.getTotalPhysicalMemory() jshell> System.out.println(Class.forName("com.sun.management.OperatingSystemMXBean").getDeclaredMethod("getTotalPhysicalMemorySize")) public abstract long com.sun.management.OperatingSystemMXBean.getTotalPhysicalMemorySize() jshell> System.out.println(Class.forName("com.ibm.security.auth.module.Krb5LoginModule")) | Exception java.lang.ClassNotFoundException: com.ibm.security.auth.module.Krb5LoginModule | at Class.forNameImpl (Native Method) | at Class.forName (Class.java:339) | at (apache-spark-on-k8s#1:1) jshell> System.out.println(Class.forName("com.sun.security.auth.module.Krb5LoginModule")) class com.sun.security.auth.module.Krb5LoginModule ``` ## How was this patch tested? Existing test suites Manual testing with OpenJ9. Closes apache#24308 from kiszk/SPARK-27397. Authored-by: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Signed-off-by: Sean Owen <sean.owen@databricks.com>
Phase 1 for Kubernetes support in Spark. Only supports static resource allocation in cluster mode.
Some changes were made to the original fundamental approach to foxish#7, in particular how the REST server is built. Now, the REST server uses the existing submission REST server infrastructure as its base, instead of creating new Jetty-based code from scratch. Additionally, uploading local dependencies now uses a separate field in order to deduplicate from specifying jars local to the docker image via "spark.jars" vs. uploading jars from the client machine. The appropraite APIs to expose these configuration knobs are open to discussion, especially since the user's main resource is currently being uploaded indiscriminately but one could foresee the user wanting to specify their main resource as a file local to the Docker image's disk.
Also the client arguments have changed to mostly use Spark properties. Like the YARN support, common configuration points should be able to be set by arguments to spark-submit, but translated to properties in SparkConf.