Skip to content

Commit eabe4b9

Browse files
authored
Merge pull request #1 from ifilonenko/py-spark
Initial architecture for PySpark w/o dependency management
2 parents 73f2853 + dc670dc commit eabe4b9

25 files changed

+526
-79
lines changed

bin/docker-image-tool.sh

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,20 @@ function build {
6363
if [ ! -d "$IMG_PATH" ]; then
6464
error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
6565
fi
66-
67-
local DOCKERFILE=${DOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}
66+
local BINDING_BUILD_ARGS=(
67+
--build-arg
68+
base_img=$(image_ref spark)
69+
)
70+
local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}
71+
local PYDOCKERFILE=${PYDOCKERFILE:-"$IMG_PATH/spark/bindings/python/Dockerfile"}
6872

6973
docker build "${BUILD_ARGS[@]}" \
7074
-t $(image_ref spark) \
71-
-f "$DOCKERFILE" .
75+
-f "$BASEDOCKERFILE" .
76+
77+
docker build "${BINDING_BUILD_ARGS[@]}" \
78+
-t $(image_ref spark-py) \
79+
-f "$PYDOCKERFILE" .
7280
}
7381

7482
function push {
@@ -86,7 +94,8 @@ Commands:
8694
push Push a pre-built image to a registry. Requires a repository address to be provided.
8795
8896
Options:
89-
-f file Dockerfile to build. By default builds the Dockerfile shipped with Spark.
97+
-f file Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.
98+
-p file Dockerfile with Python baked in. By default builds the Dockerfile shipped with Spark.
9099
-r repo Repository address.
91100
-t tag Tag to apply to the built image, or to identify the image to be pushed.
92101
-m Use minikube's Docker daemon.
@@ -116,12 +125,14 @@ fi
116125

117126
REPO=
118127
TAG=
119-
DOCKERFILE=
128+
BASEDOCKERFILE=
129+
PYDOCKERFILE=
120130
while getopts f:mr:t: option
121131
do
122132
case "${option}"
123133
in
124-
f) DOCKERFILE=${OPTARG};;
134+
f) BASEDOCKERFILE=${OPTARG};;
135+
p) PYDOCKERFILE=${OPTARG};;
125136
r) REPO=${OPTARG};;
126137
t) TAG=${OPTARG};;
127138
m)

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -284,8 +284,6 @@ private[spark] class SparkSubmit extends Logging {
284284
case (STANDALONE, CLUSTER) if args.isR =>
285285
error("Cluster deploy mode is currently not supported for R " +
286286
"applications on standalone clusters.")
287-
case (KUBERNETES, _) if args.isPython =>
288-
error("Python applications are currently not supported for Kubernetes.")
289287
case (KUBERNETES, _) if args.isR =>
290288
error("R applications are currently not supported for Kubernetes.")
291289
case (LOCAL, CLUSTER) =>
@@ -695,9 +693,17 @@ private[spark] class SparkSubmit extends Logging {
695693
if (isKubernetesCluster) {
696694
childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS
697695
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
698-
childArgs ++= Array("--primary-java-resource", args.primaryResource)
696+
if (args.isPython) {
697+
childArgs ++= Array("--primary-py-file", args.primaryResource)
698+
childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner")
699+
if (args.pyFiles != null) {
700+
childArgs ++= Array("--other-py-files", args.pyFiles)
701+
}
702+
} else {
703+
childArgs ++= Array("--primary-java-resource", args.primaryResource)
704+
childArgs ++= Array("--main-class", args.mainClass)
705+
}
699706
}
700-
childArgs ++= Array("--main-class", args.mainClass)
701707
if (args.childArgs != null) {
702708
args.childArgs.foreach { arg =>
703709
childArgs += ("--arg", arg)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,28 @@ private[spark] object Config extends Logging {
117117
.stringConf
118118
.createWithDefault("spark")
119119

120+
val KUBERNETES_PYSPARK_PY_FILES =
121+
ConfigBuilder("spark.kubernetes.python.pyFiles")
122+
.doc("The PyFiles that are distributed via client arguments")
123+
.internal()
124+
.stringConf
125+
.createOptional
126+
127+
val KUBERNETES_PYSPARK_MAIN_APP_RESOURCE =
128+
ConfigBuilder("spark.kubernetes.python.mainAppResource")
129+
.doc("The main app resource for pyspark jobs")
130+
.internal()
131+
.stringConf
132+
.createOptional
133+
134+
val KUBERNETES_PYSPARK_APP_ARGS =
135+
ConfigBuilder("spark.kubernetes.python.appArgs")
136+
.doc("The app arguments for PySpark Jobs")
137+
.internal()
138+
.stringConf
139+
.createOptional
140+
141+
120142
val KUBERNETES_ALLOCATION_BATCH_SIZE =
121143
ConfigBuilder("spark.kubernetes.allocation.batch.size")
122144
.doc("Number of pods to launch at once in each round of executor allocation.")
@@ -154,6 +176,13 @@ private[spark] object Config extends Logging {
154176
.checkValue(interval => interval > 0, s"Logging interval must be a positive time value.")
155177
.createWithDefaultString("1s")
156178

179+
val MEMORY_OVERHEAD_FACTOR =
180+
ConfigBuilder("spark.kubernetes.memoryOverheadFactor")
181+
.doc("This sets the Memory Overhead Factor that will allocate memory to non-JVM jobs " +
182+
"which in the case of JVM tasks will default to 0.10 and 0.40 for non-JVM jobs")
183+
.doubleConf
184+
.createWithDefault(0.10)
185+
157186
val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
158187
"spark.kubernetes.authenticate.submission"
159188

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,13 @@ private[spark] object Constants {
7171
val SPARK_CONF_FILE_NAME = "spark.properties"
7272
val SPARK_CONF_PATH = s"$SPARK_CONF_DIR_INTERNAL/$SPARK_CONF_FILE_NAME"
7373

74+
// BINDINGS
75+
val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"
76+
val ENV_PYSPARK_FILES = "PYSPARK_FILES"
77+
val ENV_PYSPARK_ARGS = "PYSPARK_APP_ARGS"
78+
7479
// Miscellaneous
7580
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
7681
val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
77-
val MEMORY_OVERHEAD_FACTOR = 0.10
7882
val MEMORY_OVERHEAD_MIN_MIB = 384L
7983
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@
1616
*/
1717
package org.apache.spark.deploy.k8s
1818

19+
import scala.collection.mutable
20+
1921
import io.fabric8.kubernetes.api.model.{LocalObjectReference, LocalObjectReferenceBuilder, Pod}
2022

2123
import org.apache.spark.SparkConf
2224
import org.apache.spark.deploy.k8s.Config._
2325
import org.apache.spark.deploy.k8s.Constants._
24-
import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, MainAppResource}
26+
import org.apache.spark.deploy.k8s.submit._
2527
import org.apache.spark.internal.config.ConfigEntry
2628

29+
2730
private[spark] sealed trait KubernetesRoleSpecificConf
2831

2932
/*
@@ -54,7 +57,8 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
5457
roleLabels: Map[String, String],
5558
roleAnnotations: Map[String, String],
5659
roleSecretNamesToMountPaths: Map[String, String],
57-
roleEnvs: Map[String, String]) {
60+
roleEnvs: Map[String, String],
61+
sparkFiles: Seq[String]) {
5862

5963
def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
6064

@@ -63,10 +67,14 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
6367
.map(str => str.split(",").toSeq)
6468
.getOrElse(Seq.empty[String])
6569

66-
def sparkFiles(): Seq[String] = sparkConf
67-
.getOption("spark.files")
68-
.map(str => str.split(",").toSeq)
69-
.getOrElse(Seq.empty[String])
70+
def pyFiles(): Option[String] = sparkConf
71+
.get(KUBERNETES_PYSPARK_PY_FILES)
72+
73+
def pySparkMainResource(): Option[String] = sparkConf
74+
.get(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE)
75+
76+
def pySparkAppArgs(): Option[String] = sparkConf
77+
.get(KUBERNETES_PYSPARK_APP_ARGS)
7078

7179
def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
7280

@@ -101,17 +109,29 @@ private[spark] object KubernetesConf {
101109
appId: String,
102110
mainAppResource: Option[MainAppResource],
103111
mainClass: String,
104-
appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = {
112+
appArgs: Array[String],
113+
maybePyFiles: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = {
105114
val sparkConfWithMainAppJar = sparkConf.clone()
115+
val additionalFiles = mutable.ArrayBuffer.empty[String]
106116
mainAppResource.foreach {
107-
case JavaMainAppResource(res) =>
108-
val previousJars = sparkConf
109-
.getOption("spark.jars")
110-
.map(_.split(","))
111-
.getOrElse(Array.empty)
112-
if (!previousJars.contains(res)) {
113-
sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
114-
}
117+
case JavaMainAppResource(res) =>
118+
val previousJars = sparkConf
119+
.getOption("spark.jars")
120+
.map(_.split(","))
121+
.getOrElse(Array.empty)
122+
if (!previousJars.contains(res)) {
123+
sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
124+
}
125+
case nonJVM: NonJVMResource =>
126+
nonJVM match {
127+
case PythonMainAppResource(res) =>
128+
additionalFiles += res
129+
maybePyFiles.foreach{maybePyFiles =>
130+
additionalFiles.appendAll(maybePyFiles.split(","))}
131+
sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res)
132+
sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_APP_ARGS, appArgs.mkString(" "))
133+
}
134+
sparkConfWithMainAppJar.set(MEMORY_OVERHEAD_FACTOR, 0.4)
115135
}
116136

117137
val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
@@ -132,6 +152,11 @@ private[spark] object KubernetesConf {
132152
val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs(
133153
sparkConf, KUBERNETES_DRIVER_ENV_PREFIX)
134154

155+
val sparkFiles = sparkConf
156+
.getOption("spark.files")
157+
.map(str => str.split(",").toSeq)
158+
.getOrElse(Seq.empty[String]) ++ additionalFiles
159+
135160
KubernetesConf(
136161
sparkConfWithMainAppJar,
137162
KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs),
@@ -140,7 +165,8 @@ private[spark] object KubernetesConf {
140165
driverLabels,
141166
driverAnnotations,
142167
driverSecretNamesToMountPaths,
143-
driverEnvs)
168+
driverEnvs,
169+
sparkFiles)
144170
}
145171

146172
def createExecutorConf(
@@ -179,6 +205,7 @@ private[spark] object KubernetesConf {
179205
executorLabels,
180206
executorAnnotations,
181207
executorSecrets,
182-
executorEnv)
208+
executorEnv,
209+
Seq.empty[String])
183210
}
184211
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ private[spark] object KubernetesUtils {
5252
}
5353
}
5454

55-
private def resolveFileUri(uri: String): String = {
55+
def resolveFileUri(uri: String): String = {
5656
val fileUri = Utils.resolveURI(uri)
5757
val fileScheme = Option(fileUri.getScheme).getOrElse("file")
5858
fileScheme match {

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.SparkException
2525
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
2626
import org.apache.spark.deploy.k8s.Config._
2727
import org.apache.spark.deploy.k8s.Constants._
28+
import org.apache.spark.deploy.k8s.submit._
2829
import org.apache.spark.internal.config._
2930
import org.apache.spark.launcher.SparkLauncher
3031

@@ -44,11 +45,16 @@ private[spark] class BasicDriverFeatureStep(
4445
private val driverCpuCores = conf.get("spark.driver.cores", "1")
4546
private val driverLimitCores = conf.get(KUBERNETES_DRIVER_LIMIT_CORES)
4647

48+
private val driverDockerContainer = conf.roleSpecificConf.mainAppResource.map {
49+
case JavaMainAppResource(_) => "driver"
50+
case PythonMainAppResource(_) => "driver-py"
51+
}.getOrElse(throw new SparkException("Must specify a JVM or Python Resource"))
4752
// Memory settings
4853
private val driverMemoryMiB = conf.get(DRIVER_MEMORY)
4954
private val memoryOverheadMiB = conf
5055
.get(DRIVER_MEMORY_OVERHEAD)
51-
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB))
56+
.getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR) * driverMemoryMiB).toInt,
57+
MEMORY_OVERHEAD_MIN_MIB))
5258
private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB
5359

5460
override def configurePod(pod: SparkPod): SparkPod = {
@@ -71,7 +77,7 @@ private[spark] class BasicDriverFeatureStep(
7177
("cpu", new QuantityBuilder(false).withAmount(limitCores).build())
7278
}
7379

74-
val driverContainer = new ContainerBuilder(pod.container)
80+
val withoutArgsDriverContainer: ContainerBuilder = new ContainerBuilder(pod.container)
7581
.withName(DRIVER_CONTAINER_NAME)
7682
.withImage(driverContainerImage)
7783
.withImagePullPolicy(conf.imagePullPolicy())
@@ -88,15 +94,22 @@ private[spark] class BasicDriverFeatureStep(
8894
.addToRequests("memory", driverMemoryQuantity)
8995
.addToLimits("memory", driverMemoryQuantity)
9096
.endResources()
91-
.addToArgs("driver")
97+
.addToArgs(driverDockerContainer)
9298
.addToArgs("--properties-file", SPARK_CONF_PATH)
9399
.addToArgs("--class", conf.roleSpecificConf.mainClass)
94-
// The user application jar is merged into the spark.jars list and managed through that
95-
// property, so there is no need to reference it explicitly here.
96-
.addToArgs(SparkLauncher.NO_RESOURCE)
97-
.addToArgs(conf.roleSpecificConf.appArgs: _*)
98-
.build()
99100

101+
val driverContainer =
102+
if (driverDockerContainer == "driver-py") {
103+
withoutArgsDriverContainer
104+
.build()
105+
} else {
106+
// The user application jar is merged into the spark.jars list and managed through that
107+
// property, so there is no need to reference it explicitly here.
108+
withoutArgsDriverContainer
109+
.addToArgs(SparkLauncher.NO_RESOURCE)
110+
.addToArgs(conf.roleSpecificConf.appArgs: _*)
111+
.build()
112+
}
100113
val driverPod = new PodBuilder(pod.pod)
101114
.editOrNewMetadata()
102115
.withName(driverPodName)
@@ -122,7 +135,7 @@ private[spark] class BasicDriverFeatureStep(
122135
val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath(
123136
conf.sparkJars())
124137
val resolvedSparkFiles = KubernetesUtils.resolveFileUrisAndPath(
125-
conf.sparkFiles())
138+
conf.sparkFiles)
126139
if (resolvedSparkJars.nonEmpty) {
127140
additionalProps.put("spark.jars", resolvedSparkJars.mkString(","))
128141
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ private[spark] class BasicExecutorFeatureStep(
5454

5555
private val memoryOverheadMiB = kubernetesConf
5656
.get(EXECUTOR_MEMORY_OVERHEAD)
57-
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
57+
.getOrElse(math.max((kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt,
5858
MEMORY_OVERHEAD_MIN_MIB))
5959
private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB
6060

0 commit comments

Comments
 (0)