Skip to content

Commit 114d0de

Browse files
suxingfatexinglwang
authored andcommitted
[SPARK-25922][K8] Spark Driver/Executor "spark-app-selector" label mismatch
## What changes were proposed in this pull request? In K8S Cluster mode, the algorithm to generate spark-app-selector/spark.app.id of spark driver is different with spark executor. This patch makes sure spark driver and executor to use the same spark-app-selector/spark.app.id if spark.app.id is set, otherwise it will use superclass applicationId. In K8S Client mode, spark-app-selector/spark.app.id for executors will use superclass applicationId. ## How was this patch tested? Manually run." Closes #23322 from suxingfate/SPARK-25922. Lead-authored-by: suxingfate <suxingfate@163.com> Co-authored-by: xinglwang <xinglwang@ebay.com> Signed-off-by: Yinan Li <ynli@google.com>
1 parent 81d377d commit 114d0de

File tree

2 files changed

+28
-14
lines changed

2 files changed

+28
-14
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ package org.apache.spark.scheduler.cluster.k8s
1818

1919
import java.util.concurrent.ExecutorService
2020

21-
import io.fabric8.kubernetes.client.KubernetesClient
2221
import scala.concurrent.{ExecutionContext, Future}
2322

23+
import io.fabric8.kubernetes.client.KubernetesClient
24+
2425
import org.apache.spark.SparkContext
2526
import org.apache.spark.deploy.k8s.Config._
2627
import org.apache.spark.deploy.k8s.Constants._
@@ -39,10 +40,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
3940
lifecycleEventHandler: ExecutorPodsLifecycleManager,
4041
watchEvents: ExecutorPodsWatchSnapshotSource,
4142
pollEvents: ExecutorPodsPollingSnapshotSource)
42-
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
43+
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
4344

44-
private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
45-
requestExecutorsService)
45+
private implicit val requestExecutorContext =
46+
ExecutionContext.fromExecutorService(requestExecutorsService)
4647

4748
protected override val minRegisteredRatio =
4849
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
@@ -60,6 +61,17 @@ private[spark] class KubernetesClusterSchedulerBackend(
6061
removeExecutor(executorId, reason)
6162
}
6263

64+
/**
65+
* Get an application ID associated with the job.
66+
* This returns the string value of spark.app.id if set, otherwise
67+
* the locally-generated ID from the superclass.
68+
*
69+
* @return The application ID
70+
*/
71+
override def applicationId(): String = {
72+
conf.getOption("spark.app.id").map(_.toString).getOrElse(super.applicationId)
73+
}
74+
6375
override def start(): Unit = {
6476
super.start()
6577
if (!Utils.isDynamicAllocationEnabled(conf)) {
@@ -88,7 +100,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
88100

89101
if (shouldDeleteExecutors) {
90102
Utils.tryLogNonFatalError {
91-
kubernetesClient.pods()
103+
kubernetesClient
104+
.pods()
92105
.withLabel(SPARK_APP_ID_LABEL, applicationId())
93106
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
94107
.delete()
@@ -120,7 +133,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
120133
}
121134

122135
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
123-
kubernetesClient.pods()
136+
kubernetesClient
137+
.pods()
124138
.withLabel(SPARK_APP_ID_LABEL, applicationId())
125139
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
126140
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*)
@@ -133,7 +147,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
133147
}
134148

135149
private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
136-
extends DriverEndpoint(rpcEnv, sparkProperties) {
150+
extends DriverEndpoint(rpcEnv, sparkProperties) {
137151

138152
override def onDisconnected(rpcAddress: RpcAddress): Unit = {
139153
// Don't do anything besides disabling the executor - allow the Kubernetes API events to

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
3737
private val requestExecutorsService = new DeterministicScheduler()
3838
private val sparkConf = new SparkConf(false)
3939
.set("spark.executor.instances", "3")
40+
.set("spark.app.id", TEST_SPARK_APP_ID)
4041

4142
@Mock
4243
private var sc: SparkContext = _
@@ -87,8 +88,10 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
8788
when(sc.env).thenReturn(env)
8889
when(env.rpcEnv).thenReturn(rpcEnv)
8990
driverEndpoint = ArgumentCaptor.forClass(classOf[RpcEndpoint])
90-
when(rpcEnv.setupEndpoint(
91-
mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME), driverEndpoint.capture()))
91+
when(
92+
rpcEnv.setupEndpoint(
93+
mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME),
94+
driverEndpoint.capture()))
9295
.thenReturn(driverEndpointRef)
9396
when(kubernetesClient.pods()).thenReturn(podOperations)
9497
schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend(
@@ -100,9 +103,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
100103
podAllocator,
101104
lifecycleEventHandler,
102105
watchEvents,
103-
pollEvents) {
104-
override def applicationId(): String = TEST_SPARK_APP_ID
105-
}
106+
pollEvents)
106107
}
107108

108109
test("Start all components") {
@@ -127,8 +128,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
127128

128129
test("Remove executor") {
129130
schedulerBackendUnderTest.start()
130-
schedulerBackendUnderTest.doRemoveExecutor(
131-
"1", ExecutorKilled)
131+
schedulerBackendUnderTest.doRemoveExecutor("1", ExecutorKilled)
132132
verify(driverEndpointRef).send(RemoveExecutor("1", ExecutorKilled))
133133
}
134134

0 commit comments

Comments
 (0)