Skip to content

[SPARK-22839] [K8s] Refactor to unify driver and executor pod builder APIs #20910

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f1b8c08
Fundamental building blocks for the new pod construction architecture.
mccheah Mar 26, 2018
80e1562
Intermediate commit to move file.
mccheah Mar 26, 2018
c3460ae
Move basic driver configuration to new architecture.
mccheah Mar 26, 2018
6d1711b
Intermediate commit to move file
mccheah Mar 26, 2018
4036d72
Migrate mounting K8s credentials to the new architecture.
mccheah Mar 26, 2018
d46d671
Intermediate commit to move file.
mccheah Mar 26, 2018
2936aa5
Migrate creating the driver service to the new architecture.
mccheah Mar 26, 2018
d2751b6
Remove dependency resolution step entirely.
mccheah Mar 26, 2018
430fbb2
Move mounting driver secrets to new architecture.
mccheah Mar 26, 2018
fd3e8e6
Complete driver migration to new pod construction architecture.
mccheah Mar 26, 2018
f0ea6d9
Intermediate commit to move file
mccheah Mar 26, 2018
67e9ca1
Migrate executor pod construction to use the new architecture.
mccheah Mar 26, 2018
4c944c4
Manage args differently.
mccheah Mar 27, 2018
27b8634
Revert "Manage args differently."
mccheah Mar 27, 2018
9c67016
Make envs role-specific
mccheah Mar 27, 2018
f3540f8
Address comments.
mccheah Mar 27, 2018
33f9d56
Move executor env to KubernetesExecutorSpecificConf
mccheah Mar 27, 2018
9b6cc05
Fix import
mccheah Mar 27, 2018
a5f08bb
Merge remote-tracking branch 'apache/master' into spark-22839-increme…
mccheah Apr 3, 2018
fbde25d
Merge remote-tracking branch 'master' into spark-22839-incremental
mccheah Apr 3, 2018
dff0089
Fix indentation
mccheah Apr 3, 2018
02bbcbc
Fix closures
mccheah Apr 3, 2018
7d65875
Fix compilation
mccheah Apr 3, 2018
041a240
Address comments.
mccheah Apr 4, 2018
f446868
Merge remote-tracking branch 'apache/master' into spark-22839-increme…
mccheah Apr 4, 2018
df75a9c
Fix merge conflicts.
mccheah Apr 4, 2018
518fb2a
Move around some code.
mccheah Apr 4, 2018
7b339c3
Fix line breaks
mccheah Apr 4, 2018
dbe35fa
Simplify a line
mccheah Apr 4, 2018
4b92989
Rename KubernetesSpec -> KubernetesDriverSpec
mccheah Apr 11, 2018
7807c9c
Fix scalastyle
mccheah Apr 12, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,5 +167,5 @@ private[spark] object Config extends Logging {
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets."

val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."
val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv."
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import io.fabric8.kubernetes.api.model.{LocalObjectReference, LocalObjectReferenceBuilder, Pod}

import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, MainAppResource}
import org.apache.spark.internal.config.ConfigEntry

private[spark] sealed trait KubernetesRoleSpecificConf

/*
* Structure containing metadata for Kubernetes logic that builds a Spark driver.
*/
private[spark] case class KubernetesDriverSpecificConf(
mainAppResource: Option[MainAppResource],
mainClass: String,
appName: String,
appArgs: Seq[String]) extends KubernetesRoleSpecificConf

/*
* Structure containing metadata for Kubernetes logic that builds a Spark executor.
*/
private[spark] case class KubernetesExecutorSpecificConf(
executorId: String,
driverPod: Pod)
extends KubernetesRoleSpecificConf

/**
* Structure containing metadata for Kubernetes logic to build Spark pods.
*/
private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
sparkConf: SparkConf,
roleSpecificConf: T,
appResourceNamePrefix: String,
appId: String,
roleLabels: Map[String, String],
roleAnnotations: Map[String, String],
roleSecretNamesToMountPaths: Map[String, String],
roleEnvs: Map[String, String]) {

def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)

def sparkJars(): Seq[String] = sparkConf
.getOption("spark.jars")
.map(str => str.split(",").toSeq)
.getOrElse(Seq.empty[String])

def sparkFiles(): Seq[String] = sparkConf
.getOption("spark.files")
.map(str => str.split(",").toSeq)
.getOrElse(Seq.empty[String])

def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)

def imagePullSecrets(): Seq[LocalObjectReference] = {
sparkConf
.get(IMAGE_PULL_SECRETS)
.map(_.split(","))
.getOrElse(Array.empty[String])
.map(_.trim)
.map { secret =>
new LocalObjectReferenceBuilder().withName(secret).build()
}
}

def nodeSelector(): Map[String, String] =
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)

def get[T](config: ConfigEntry[T]): T = sparkConf.get(config)

def get(conf: String): String = sparkConf.get(conf)

def get(conf: String, defaultValue: String): String = sparkConf.get(conf, defaultValue)

def getOption(key: String): Option[String] = sparkConf.getOption(key)
}

private[spark] object KubernetesConf {
def createDriverConf(
sparkConf: SparkConf,
appName: String,
appResourceNamePrefix: String,
appId: String,
mainAppResource: Option[MainAppResource],
mainClass: String,
appArgs: Array[String]): KubernetesConf[KubernetesDriverSpecificConf] = {
val sparkConfWithMainAppJar = sparkConf.clone()
mainAppResource.foreach {
case JavaMainAppResource(res) =>
val previousJars = sparkConf
.getOption("spark.jars")
.map(_.split(","))
.getOrElse(Array.empty)
if (!previousJars.contains(res)) {
sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res))
}
}

val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX)
require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " +
s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping " +
"operations.")
require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with key " +
s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark bookkeeping " +
"operations.")
val driverLabels = driverCustomLabels ++ Map(
SPARK_APP_ID_LABEL -> appId,
SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
val driverAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX)
val driverSecretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_SECRETS_PREFIX)
val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_ENV_PREFIX)

KubernetesConf(
sparkConfWithMainAppJar,
KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs),
appResourceNamePrefix,
appId,
driverLabels,
driverAnnotations,
driverSecretNamesToMountPaths,
driverEnvs)
}

def createExecutorConf(
sparkConf: SparkConf,
executorId: String,
appId: String,
driverPod: Pod): KubernetesConf[KubernetesExecutorSpecificConf] = {
val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX)
require(
!executorCustomLabels.contains(SPARK_APP_ID_LABEL),
s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.")
require(
!executorCustomLabels.contains(SPARK_EXECUTOR_ID_LABEL),
s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" +
" Spark.")
require(
!executorCustomLabels.contains(SPARK_ROLE_LABEL),
s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.")
val executorLabels = Map(
SPARK_EXECUTOR_ID_LABEL -> executorId,
SPARK_APP_ID_LABEL -> appId,
SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++
executorCustomLabels
val executorAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
val executorSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
val executorEnv = sparkConf.getExecutorEnv.toMap

KubernetesConf(
sparkConf.clone(),
KubernetesExecutorSpecificConf(executorId, driverPod),
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX),
appId,
executorLabels,
executorAnnotations,
executorSecrets,
executorEnv)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,16 @@
*/
package org.apache.spark.deploy.k8s

import io.fabric8.kubernetes.api.model.LocalObjectReference
import io.fabric8.kubernetes.api.model.HasMetadata

import org.apache.spark.SparkFunSuite

class KubernetesUtilsTest extends SparkFunSuite {

test("testParseImagePullSecrets") {
val noSecrets = KubernetesUtils.parseImagePullSecrets(None)
assert(noSecrets === Nil)

val oneSecret = KubernetesUtils.parseImagePullSecrets(Some("imagePullSecret"))
assert(oneSecret === new LocalObjectReference("imagePullSecret") :: Nil)

val commaSeparatedSecrets = KubernetesUtils.parseImagePullSecrets(Some("s1, s2 , s3,s4"))
assert(commaSeparatedSecrets.map(_.getName) === "s1" :: "s2" :: "s3" :: "s4" :: Nil)
}
private[spark] case class KubernetesDriverSpec(
pod: SparkPod,
driverKubernetesResources: Seq[HasMetadata],
systemProperties: Map[String, String])

private[spark] object KubernetesDriverSpec {
def initialSpec(initialProps: Map[String, String]): KubernetesDriverSpec = KubernetesDriverSpec(
SparkPod.initialPod(),
Seq.empty,
initialProps)
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,6 @@ private[spark] object KubernetesUtils {
sparkConf.getAllWithPrefix(prefix).toMap
}

/**
* Parses comma-separated list of imagePullSecrets into K8s-understandable format
*/
def parseImagePullSecrets(imagePullSecrets: Option[String]): List[LocalObjectReference] = {
imagePullSecrets match {
case Some(secretsCommaSeparated) =>
secretsCommaSeparated.split(',').map(_.trim).map(new LocalObjectReference(_)).toList
case None => Nil
}
}

def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps
package org.apache.spark.deploy.k8s

import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}

/**
* Represents a step in configuring the Spark driver pod.
*/
private[spark] trait DriverConfigurationStep {
private[spark] case class SparkPod(pod: Pod, container: Container)

/**
* Apply some transformation to the previous state of the driver to add a new feature to it.
*/
def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec
private[spark] object SparkPod {
def initialPod(): SparkPod = {
SparkPod(
new PodBuilder()
.withNewMetadata()
.endMetadata()
.withNewSpec()
.endSpec()
.build(),
new ContainerBuilder().build())
}
}
Loading