Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49938][K8S] Prefer to use Hadoop configMap when specified #48427

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.jdk.CollectionConverters._
import com.google.common.io.Files
import io.fabric8.kubernetes.api.model._

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.util.ArrayImplicits._
Expand All @@ -39,12 +39,6 @@ private[spark] class HadoopConfDriverFeatureStep(conf: KubernetesConf)
private val confDir = Option(conf.sparkConf.getenv(ENV_HADOOP_CONF_DIR))
private val existingConfMap = conf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP)

KubernetesUtils.requireNandDefined(
confDir,
existingConfMap,
"Do not specify both the `HADOOP_CONF_DIR` in your ENV and the ConfigMap " +
"as the creation of an additional ConfigMap, when one is already specified is extraneous")

private lazy val confFiles: Seq[File] = {
val dir = new File(confDir.get)
if (dir.isDirectory) {
Expand All @@ -60,7 +54,14 @@ private[spark] class HadoopConfDriverFeatureStep(conf: KubernetesConf)

override def configurePod(original: SparkPod): SparkPod = {
original.transform { case pod if hasHadoopConf =>
val confVolume = if (confDir.isDefined) {
val confVolume = if (existingConfMap.isDefined) {
new VolumeBuilder()
.withName(HADOOP_CONF_VOLUME)
.withNewConfigMap()
.withName(existingConfMap.get)
.endConfigMap()
.build()
} else {
val keyPaths = confFiles.map { file =>
new KeyToPathBuilder()
.withKey(file.getName())
Expand All @@ -70,16 +71,9 @@ private[spark] class HadoopConfDriverFeatureStep(conf: KubernetesConf)
new VolumeBuilder()
.withName(HADOOP_CONF_VOLUME)
.withNewConfigMap()
.withName(newConfigMapName)
.withItems(keyPaths.asJava)
.endConfigMap()
.build()
} else {
new VolumeBuilder()
.withName(HADOOP_CONF_VOLUME)
.withNewConfigMap()
.withName(existingConfMap.get)
.endConfigMap()
.withName(newConfigMapName)
.withItems(keyPaths.asJava)
.endConfigMap()
.build()
}

Expand Down Expand Up @@ -114,7 +108,7 @@ private[spark] class HadoopConfDriverFeatureStep(conf: KubernetesConf)
}

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
if (confDir.isDefined) {
if (confDir.isDefined && existingConfMap.isEmpty) {
val fileMap = confFiles.map { file =>
(file.getName(), Files.asCharSource(file, StandardCharsets.UTF_8).read())
}.toMap.asJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,22 @@ class HadoopConfDriverFeatureStepSuite extends SparkFunSuite {
assert(hadoopConfMap.getData().keySet().asScala === confFiles)
}

test("SPARK-49938: use hadoop config map first when config map and dir are both defined") {
val confDir = Utils.createTempDir()
val confFiles = Set("core-site.xml", "hdfs-site.xml")

confFiles.foreach { f =>
Files.write("some data", new File(confDir, f), UTF_8)
}

val sparkConf = new SparkConfWithEnv(Map(ENV_HADOOP_CONF_DIR -> confDir.getAbsolutePath()))
.set(Config.KUBERNETES_HADOOP_CONF_CONFIG_MAP, "testConfigMap")
val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
val step = new HadoopConfDriverFeatureStep(conf)
checkPod(step.configurePod(SparkPod.initialPod()))
assert(step.getAdditionalKubernetesResources().isEmpty)
}

private def checkPod(pod: SparkPod): Unit = {
assert(podHasVolume(pod.pod, HADOOP_CONF_VOLUME))
assert(containerHasVolume(pod.container, HADOOP_CONF_VOLUME, HADOOP_CONF_DIR_PATH))
Expand Down