Skip to content

Initial architecture for PySpark w/o dependency management #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions bin/docker-image-tool.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,20 @@ function build {
if [ ! -d "$IMG_PATH" ]; then
error "Cannot find docker image. This script must be run from a runnable distribution of Apache Spark."
fi

local DOCKERFILE=${DOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}
local BINDING_BUILD_ARGS=(
--build-arg
base_img=$(image_ref spark)
)
local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/Dockerfile"}
local PYDOCKERFILE=${PYDOCKERFILE:-"$IMG_PATH/spark/bindings/python/Dockerfile"}

docker build "${BUILD_ARGS[@]}" \
-t $(image_ref spark) \
-f "$DOCKERFILE" .
-f "$BASEDOCKERFILE" .

docker build "${BINDING_BUILD_ARGS[@]}" \
-t $(image_ref spark-py) \
-f "$PYDOCKERFILE" .
}

function push {
Expand All @@ -86,7 +94,8 @@ Commands:
push Push a pre-built image to a registry. Requires a repository address to be provided.

Options:
-f file Dockerfile to build. By default builds the Dockerfile shipped with Spark.
-f file Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark.
-p file Dockerfile with Python baked in. By default builds the Dockerfile shipped with Spark.
-r repo Repository address.
-t tag Tag to apply to the built image, or to identify the image to be pushed.
-m Use minikube's Docker daemon.
Expand Down Expand Up @@ -116,12 +125,14 @@ fi

REPO=
TAG=
DOCKERFILE=
BASEDOCKERFILE=
PYDOCKERFILE=
while getopts f:mr:t: option
do
case "${option}"
in
f) DOCKERFILE=${OPTARG};;
f) BASEDOCKERFILE=${OPTARG};;
p) PYDOCKERFILE=${OPTARG};;
r) REPO=${OPTARG};;
t) TAG=${OPTARG};;
m)
Expand Down
14 changes: 10 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,6 @@ private[spark] class SparkSubmit extends Logging {
case (STANDALONE, CLUSTER) if args.isR =>
error("Cluster deploy mode is currently not supported for R " +
"applications on standalone clusters.")
case (KUBERNETES, _) if args.isPython =>
error("Python applications are currently not supported for Kubernetes.")
case (KUBERNETES, _) if args.isR =>
error("R applications are currently not supported for Kubernetes.")
case (LOCAL, CLUSTER) =>
Expand Down Expand Up @@ -695,9 +693,17 @@ private[spark] class SparkSubmit extends Logging {
if (isKubernetesCluster) {
childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs ++= Array("--primary-java-resource", args.primaryResource)
if (args.isPython) {
childArgs ++= Array("--primary-py-file", args.primaryResource)
childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner")
if (args.pyFiles != null) {
childArgs ++= Array("--other-py-files", args.pyFiles)
}
} else {
childArgs ++= Array("--primary-java-resource", args.primaryResource)
childArgs ++= Array("--main-class", args.mainClass)
}
}
childArgs ++= Array("--main-class", args.mainClass)
if (args.childArgs != null) {
args.childArgs.foreach { arg =>
childArgs += ("--arg", arg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,28 @@ private[spark] object Config extends Logging {
.stringConf
.createWithDefault("spark")

val KUBERNETES_PYSPARK_PY_FILES =
ConfigBuilder("spark.kubernetes.python.pyFiles")
.doc("The PyFiles that are distributed via client arguments")
.internal()
.stringConf
.createOptional

val KUBERNETES_PYSPARK_MAIN_APP_RESOURCE =
ConfigBuilder("spark.kubernetes.python.mainAppResource")
.doc("The main app resource for pyspark jobs")
.internal()
.stringConf
.createOptional

val KUBERNETES_PYSPARK_APP_ARGS =
ConfigBuilder("spark.kubernetes.python.appArgs")
.doc("The app arguments for PySpark Jobs")
.internal()
.stringConf
.createOptional


val KUBERNETES_ALLOCATION_BATCH_SIZE =
ConfigBuilder("spark.kubernetes.allocation.batch.size")
.doc("Number of pods to launch at once in each round of executor allocation.")
Expand Down Expand Up @@ -154,6 +176,13 @@ private[spark] object Config extends Logging {
.checkValue(interval => interval > 0, s"Logging interval must be a positive time value.")
.createWithDefaultString("1s")

val MEMORY_OVERHEAD_FACTOR =
ConfigBuilder("spark.kubernetes.memoryOverheadFactor")
.doc("This sets the Memory Overhead Factor that will allocate memory to non-JVM jobs " +
"which in the case of JVM tasks will default to 0.10 and 0.40 for non-JVM jobs")
.doubleConf
.createWithDefault(0.10)

val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,13 @@ private[spark] object Constants {
val SPARK_CONF_FILE_NAME = "spark.properties"
val SPARK_CONF_PATH = s"$SPARK_CONF_DIR_INTERNAL/$SPARK_CONF_FILE_NAME"

// BINDINGS
val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"
val ENV_PYSPARK_FILES = "PYSPARK_FILES"
val ENV_PYSPARK_ARGS = "PYSPARK_APP_ARGS"

// Miscellaneous
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
val MEMORY_OVERHEAD_FACTOR = 0.10
val MEMORY_OVERHEAD_MIN_MIB = 384L
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
*/
package org.apache.spark.deploy.k8s

import scala.collection.mutable

import io.fabric8.kubernetes.api.model.{LocalObjectReference, LocalObjectReferenceBuilder, Pod}

import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, MainAppResource}
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.config.ConfigEntry


private[spark] sealed trait KubernetesRoleSpecificConf

/*
Expand Down Expand Up @@ -54,7 +57,8 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
roleLabels: Map[String, String],
roleAnnotations: Map[String, String],
roleSecretNamesToMountPaths: Map[String, String],
roleEnvs: Map[String, String]) {
roleEnvs: Map[String, String],
sparkFiles: Seq[String]) {

def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)

Expand All @@ -63,10 +67,14 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
.map(str => str.split(",").toSeq)
.getOrElse(Seq.empty[String])

def sparkFiles(): Seq[String] = sparkConf
.getOption("spark.files")
.map(str => str.split(",").toSeq)
.getOrElse(Seq.empty[String])
def pyFiles(): Option[String] = sparkConf
.get(KUBERNETES_PYSPARK_PY_FILES)

def pySparkMainResource(): Option[String] = sparkConf
.get(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE)

def pySparkAppArgs(): Option[String] = sparkConf
.get(KUBERNETES_PYSPARK_APP_ARGS)

def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)

Expand Down Expand Up @@ -101,17 +109,29 @@ private[spark] object KubernetesConf {
appId: String,
mainAppResource: Option[MainAppResource],
mainClass: String,
appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = {
appArgs: Array[String],
maybePyFiles: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = {
val sparkConfWithMainAppJar = sparkConf.clone()
val additionalFiles = mutable.ArrayBuffer.empty[String]
mainAppResource.foreach {
case JavaMainAppResource(res) =>
val previousJars = sparkConf
.getOption("spark.jars")
.map(_.split(","))
.getOrElse(Array.empty)
if (!previousJars.contains(res)) {
sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
}
case JavaMainAppResource(res) =>
val previousJars = sparkConf
.getOption("spark.jars")
.map(_.split(","))
.getOrElse(Array.empty)
if (!previousJars.contains(res)) {
sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
}
case nonJVM: NonJVMResource =>
nonJVM match {
case PythonMainAppResource(res) =>
additionalFiles += res
maybePyFiles.foreach{maybePyFiles =>
additionalFiles.appendAll(maybePyFiles.split(","))}
sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res)
sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_APP_ARGS, appArgs.mkString(" "))
}
sparkConfWithMainAppJar.set(MEMORY_OVERHEAD_FACTOR, 0.4)
}

val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
Expand All @@ -132,6 +152,11 @@ private[spark] object KubernetesConf {
val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_ENV_PREFIX)

val sparkFiles = sparkConf
.getOption("spark.files")
.map(str => str.split(",").toSeq)
.getOrElse(Seq.empty[String]) ++ additionalFiles

KubernetesConf(
sparkConfWithMainAppJar,
KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs),
Expand All @@ -140,7 +165,8 @@ private[spark] object KubernetesConf {
driverLabels,
driverAnnotations,
driverSecretNamesToMountPaths,
driverEnvs)
driverEnvs,
sparkFiles)
}

def createExecutorConf(
Expand Down Expand Up @@ -179,6 +205,7 @@ private[spark] object KubernetesConf {
executorLabels,
executorAnnotations,
executorSecrets,
executorEnv)
executorEnv,
Seq.empty[String])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private[spark] object KubernetesUtils {
}
}

private def resolveFileUri(uri: String): String = {
def resolveFileUri(uri: String): String = {
val fileUri = Utils.resolveURI(uri)
val fileScheme = Option(fileUri.getScheme).getOrElse("file")
fileScheme match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.SparkException
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.config._
import org.apache.spark.launcher.SparkLauncher

Expand All @@ -44,11 +45,16 @@ private[spark] class BasicDriverFeatureStep(
private val driverCpuCores = conf.get("spark.driver.cores", "1")
private val driverLimitCores = conf.get(KUBERNETES_DRIVER_LIMIT_CORES)

private val driverDockerContainer = conf.roleSpecificConf.mainAppResource.map {
case JavaMainAppResource(_) => "driver"
case PythonMainAppResource(_) => "driver-py"
}.getOrElse(throw new SparkException("Must specify a JVM or Python Resource"))
// Memory settings
private val driverMemoryMiB = conf.get(DRIVER_MEMORY)
private val memoryOverheadMiB = conf
.get(DRIVER_MEMORY_OVERHEAD)
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB))
.getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR) * driverMemoryMiB).toInt,
MEMORY_OVERHEAD_MIN_MIB))
private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB

override def configurePod(pod: SparkPod): SparkPod = {
Expand All @@ -71,7 +77,7 @@ private[spark] class BasicDriverFeatureStep(
("cpu", new QuantityBuilder(false).withAmount(limitCores).build())
}

val driverContainer = new ContainerBuilder(pod.container)
val withoutArgsDriverContainer: ContainerBuilder = new ContainerBuilder(pod.container)
.withName(DRIVER_CONTAINER_NAME)
.withImage(driverContainerImage)
.withImagePullPolicy(conf.imagePullPolicy())
Expand All @@ -88,15 +94,22 @@ private[spark] class BasicDriverFeatureStep(
.addToRequests("memory", driverMemoryQuantity)
.addToLimits("memory", driverMemoryQuantity)
.endResources()
.addToArgs("driver")
.addToArgs(driverDockerContainer)
.addToArgs("--properties-file", SPARK_CONF_PATH)
.addToArgs("--class", conf.roleSpecificConf.mainClass)
// The user application jar is merged into the spark.jars list and managed through that
// property, so there is no need to reference it explicitly here.
.addToArgs(SparkLauncher.NO_RESOURCE)
.addToArgs(conf.roleSpecificConf.appArgs: _*)
.build()

val driverContainer =
if (driverDockerContainer == "driver-py") {
withoutArgsDriverContainer
.build()
} else {
// The user application jar is merged into the spark.jars list and managed through that
// property, so there is no need to reference it explicitly here.
withoutArgsDriverContainer
.addToArgs(SparkLauncher.NO_RESOURCE)
.addToArgs(conf.roleSpecificConf.appArgs: _*)
.build()
}
val driverPod = new PodBuilder(pod.pod)
.editOrNewMetadata()
.withName(driverPodName)
Expand All @@ -122,7 +135,7 @@ private[spark] class BasicDriverFeatureStep(
val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath(
conf.sparkJars())
val resolvedSparkFiles = KubernetesUtils.resolveFileUrisAndPath(
conf.sparkFiles())
conf.sparkFiles)
if (resolvedSparkJars.nonEmpty) {
additionalProps.put("spark.jars", resolvedSparkJars.mkString(","))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[spark] class BasicExecutorFeatureStep(

private val memoryOverheadMiB = kubernetesConf
.get(EXECUTOR_MEMORY_OVERHEAD)
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
.getOrElse(math.max((kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt,
MEMORY_OVERHEAD_MIN_MIB))
private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB

Expand Down
Loading