Skip to content

Commit

Permalink
[SPARK-49838] Add spark-version label to Spark Cluster resources
Browse files Browse the repository at this point in the history
  • Loading branch information
dongjoon-hyun committed Sep 30, 2024
1 parent 6c6a260 commit 9474930
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class Constants {
public static final String LABEL_SPARK_ROLE_CLUSTER_VALUE = "cluster";
public static final String LABEL_SPARK_ROLE_MASTER_VALUE = "master";
public static final String LABEL_SPARK_ROLE_WORKER_VALUE = "worker";
public static final String LABEL_SPARK_VERSION_NAME = "spark-version";
public static final String SENTINEL_RESOURCE_DUMMY_FIELD = "sentinel.dummy.number";

public static final String DRIVER_SPARK_CONTAINER_PROP_KEY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_ROLE_CLUSTER_VALUE;
import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_ROLE_DRIVER_VALUE;
import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_ROLE_EXECUTOR_VALUE;
import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_VERSION_NAME;
import static org.apache.spark.k8s.operator.config.SparkOperatorConf.OPERATOR_APP_NAME;
import static org.apache.spark.k8s.operator.config.SparkOperatorConf.OPERATOR_WATCHED_NAMESPACES;
import static org.apache.spark.k8s.operator.config.SparkOperatorConf.SPARK_APP_STATUS_LISTENER_CLASS_NAMES;
Expand Down Expand Up @@ -111,6 +112,7 @@ public static Map<String, String> executorLabels(final SparkApplication app) {
public static Map<String, String> sparkClusterResourceLabels(final SparkCluster cluster) {
Map<String, String> labels = commonManagedResourceLabels();
labels.put(Constants.LABEL_SPARK_CLUSTER_NAME, cluster.getMetadata().getName());
labels.put(LABEL_SPARK_VERSION_NAME, cluster.getSpec().getRuntimeVersions().getSparkVersion());
return labels;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public SparkClusterResourceSpec(SparkCluster cluster, SparkConf conf) {
String namespace = conf.get(Config.KUBERNETES_NAMESPACE().key(), clusterNamespace);
String image = conf.get(Config.CONTAINER_IMAGE().key(), "apache/spark:4.0.0-preview2");
ClusterSpec spec = cluster.getSpec();
String version = spec.getRuntimeVersions().getSparkVersion();
StringBuilder options = new StringBuilder();
for (Tuple2<String, String> t : conf.getAll()) {
options.append(String.format("-D%s=\"%s\" ", t._1, t._2));
Expand All @@ -69,15 +70,24 @@ public SparkClusterResourceSpec(SparkCluster cluster, SparkConf conf) {
WorkerSpec workerSpec = spec.getWorkerSpec();
masterService =
buildMasterService(
clusterName, namespace, masterSpec.getServiceMetadata(), masterSpec.getServiceSpec());
clusterName,
namespace,
version,
masterSpec.getServiceMetadata(),
masterSpec.getServiceSpec());
workerService =
buildWorkerService(
clusterName, namespace, workerSpec.getServiceMetadata(), workerSpec.getServiceSpec());
clusterName,
namespace,
version,
workerSpec.getServiceMetadata(),
workerSpec.getServiceSpec());
masterStatefulSet =
buildMasterStatefulSet(
scheduler,
clusterName,
namespace,
version,
image,
options.toString(),
masterSpec.getStatefulSetMetadata(),
Expand All @@ -87,6 +97,7 @@ public SparkClusterResourceSpec(SparkCluster cluster, SparkConf conf) {
scheduler,
clusterName,
namespace,
version,
image,
spec.getClusterTolerations().getInstanceConfig().getInitWorkers(),
options.toString(),
Expand All @@ -96,11 +107,12 @@ public SparkClusterResourceSpec(SparkCluster cluster, SparkConf conf) {
}

private static Service buildMasterService(
String name, String namespace, ObjectMeta metadata, ServiceSpec serviceSpec) {
String name, String namespace, String version, ObjectMeta metadata, ServiceSpec serviceSpec) {
return new ServiceBuilder()
.withNewMetadataLike(metadata)
.withName(name + "-master-svc")
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)
.addToLabels(LABEL_SPARK_VERSION_NAME, version)
.withNamespace(namespace)
.endMetadata()
.withNewSpecLike(serviceSpec)
Expand All @@ -127,11 +139,12 @@ private static Service buildMasterService(
}

private static Service buildWorkerService(
String name, String namespace, ObjectMeta metadata, ServiceSpec serviceSpec) {
String name, String namespace, String version, ObjectMeta metadata, ServiceSpec serviceSpec) {
return new ServiceBuilder()
.withNewMetadataLike(metadata)
.withName(name + "-worker-svc")
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
.addToLabels(LABEL_SPARK_VERSION_NAME, version)
.withNamespace(namespace)
.endMetadata()
.withNewSpecLike(serviceSpec)
Expand All @@ -151,6 +164,7 @@ private static StatefulSet buildMasterStatefulSet(
String scheduler,
String name,
String namespace,
String version,
String image,
String options,
ObjectMeta objectMeta,
Expand All @@ -160,6 +174,7 @@ private static StatefulSet buildMasterStatefulSet(
.withNewMetadataLike(objectMeta)
.withName(name + "-master")
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)
.addToLabels(LABEL_SPARK_VERSION_NAME, version)
.withNamespace(namespace)
.endMetadata()
.withNewSpecLike(statefulSetSpec)
Expand All @@ -171,6 +186,7 @@ private static StatefulSet buildMasterStatefulSet(
.editOrNewTemplate()
.editOrNewMetadata()
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_MASTER_VALUE)
.addToLabels(LABEL_SPARK_VERSION_NAME, version)
.endMetadata()
.editOrNewSpec()
.withSchedulerName(scheduler)
Expand Down Expand Up @@ -213,6 +229,7 @@ private static StatefulSet buildWorkerStatefulSet(
String scheduler,
String name,
String namespace,
String version,
String image,
int initWorkers,
String options,
Expand All @@ -223,6 +240,7 @@ private static StatefulSet buildWorkerStatefulSet(
.withNewMetadataLike(metadata)
.withName(name + "-worker")
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
.addToLabels(LABEL_SPARK_VERSION_NAME, version)
.withNamespace(namespace)
.endMetadata()
.withNewSpecLike(statefulSetSpec)
Expand All @@ -235,6 +253,7 @@ private static StatefulSet buildWorkerStatefulSet(
.editOrNewTemplate()
.editOrNewMetadata()
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
.addToLabels(LABEL_SPARK_VERSION_NAME, version)
.endMetadata()
.editOrNewSpec()
.withSchedulerName(scheduler)
Expand Down Expand Up @@ -320,6 +339,7 @@ private static Optional<HorizontalPodAutoscaler> buildHorizontalPodAutoscaler(
.withNewMetadata()
.withNamespace(namespace)
.withName(clusterName + "-worker-hpa")
.addToLabels(LABEL_SPARK_VERSION_NAME, spec.getRuntimeVersions().getSparkVersion())
.endMetadata()
.withNewSpecLike(horizontalPodAutoscalerSpec)
.withNewScaleTargetRef()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.spark.k8s.operator;

import static org.apache.spark.k8s.operator.Constants.LABEL_SPARK_VERSION_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
Expand All @@ -41,6 +42,7 @@
import org.apache.spark.k8s.operator.spec.ClusterSpec;
import org.apache.spark.k8s.operator.spec.ClusterTolerations;
import org.apache.spark.k8s.operator.spec.MasterSpec;
import org.apache.spark.k8s.operator.spec.RuntimeVersions;
import org.apache.spark.k8s.operator.spec.WorkerInstanceConfig;
import org.apache.spark.k8s.operator.spec.WorkerSpec;

Expand All @@ -52,6 +54,7 @@ class SparkClusterResourceSpecTest {
ServiceSpec serviceSpec;
MasterSpec masterSpec;
WorkerSpec workerSpec;
RuntimeVersions runtimeVersions = new RuntimeVersions();
SparkConf sparkConf = new SparkConf().set("spark.kubernetes.namespace", "other-namespace");
ClusterTolerations clusterTolerations = new ClusterTolerations();

Expand All @@ -71,6 +74,8 @@ void setUp() {
when(clusterSpec.getClusterTolerations()).thenReturn(clusterTolerations);
when(clusterSpec.getMasterSpec()).thenReturn(masterSpec);
when(clusterSpec.getWorkerSpec()).thenReturn(workerSpec);
when(clusterSpec.getRuntimeVersions()).thenReturn(runtimeVersions);
runtimeVersions.setSparkVersion("4.0.0");
when(masterSpec.getStatefulSetSpec()).thenReturn(statefulSetSpec);
when(masterSpec.getStatefulSetMetadata()).thenReturn(objectMeta);
when(masterSpec.getServiceSpec()).thenReturn(serviceSpec);
Expand All @@ -86,16 +91,19 @@ void testMasterService() {
Service service1 = new SparkClusterResourceSpec(cluster, new SparkConf()).getMasterService();
assertEquals("my-namespace", service1.getMetadata().getNamespace());
assertEquals("cluster-name-master-svc", service1.getMetadata().getName());
assertEquals("4.0.0", service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));

Service service2 = new SparkClusterResourceSpec(cluster, sparkConf).getMasterService();
assertEquals("other-namespace", service2.getMetadata().getNamespace());
assertEquals("4.0.0", service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
}

@Test
void testWorkerService() {
Service service1 = new SparkClusterResourceSpec(cluster, new SparkConf()).getWorkerService();
assertEquals("my-namespace", service1.getMetadata().getNamespace());
assertEquals("cluster-name-worker-svc", service1.getMetadata().getName());
assertEquals("4.0.0", service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));

Service service2 = new SparkClusterResourceSpec(cluster, sparkConf).getMasterService();
assertEquals("other-namespace", service2.getMetadata().getNamespace());
Expand All @@ -119,6 +127,7 @@ void testWorkerServiceWithTemplate() {
assertEquals("my-namespace", service1.getMetadata().getNamespace());
assertEquals("cluster-name-worker-svc", service1.getMetadata().getName());
assertEquals("bar", service1.getMetadata().getLabels().get("foo"));
assertEquals("4.0.0", service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
assertEquals("foo", service1.getSpec().getExternalName());
}

Expand All @@ -140,6 +149,7 @@ void testMasterServiceWithTemplate() {
assertEquals("my-namespace", service1.getMetadata().getNamespace());
assertEquals("cluster-name-master-svc", service1.getMetadata().getName());
assertEquals("bar", service1.getMetadata().getLabels().get("foo"));
assertEquals("4.0.0", service1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
assertEquals("foo", service1.getSpec().getExternalName());
}

Expand All @@ -149,6 +159,9 @@ void testMasterStatefulSet() {
StatefulSet statefulSet1 = spec1.getMasterStatefulSet();
assertEquals("my-namespace", statefulSet1.getMetadata().getNamespace());
assertEquals("cluster-name-master", statefulSet1.getMetadata().getName());
assertEquals("4.0.0", statefulSet1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
assertEquals("4.0.0",
statefulSet1.getSpec().getTemplate().getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));

SparkClusterResourceSpec spec2 = new SparkClusterResourceSpec(cluster, sparkConf);
StatefulSet statefulSet2 = spec2.getMasterStatefulSet();
Expand Down Expand Up @@ -185,8 +198,11 @@ void testMasterStatefulSetWithTemplate() {
assertEquals("my-namespace", statefulSet1.getMetadata().getNamespace());
assertEquals("cluster-name-master", statefulSet1.getMetadata().getName());
assertEquals("bar", statefulSet1.getMetadata().getLabels().get("foo"));
assertEquals("4.0.0", statefulSet1.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
assertEquals(1, statefulSet1.getSpec().getTemplate().getSpec().getInitContainers().size());
assertEquals(2, statefulSet1.getSpec().getTemplate().getSpec().getContainers().size());
assertEquals("4.0.0",
statefulSet1.getSpec().getTemplate().getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
}

@Test
Expand All @@ -195,6 +211,9 @@ void testWorkerStatefulSet() {
StatefulSet statefulSet = spec.getWorkerStatefulSet();
assertEquals("my-namespace", statefulSet.getMetadata().getNamespace());
assertEquals("cluster-name-worker", statefulSet.getMetadata().getName());
assertEquals("4.0.0", statefulSet.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
assertEquals("4.0.0",
statefulSet.getSpec().getTemplate().getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));

SparkClusterResourceSpec spec2 = new SparkClusterResourceSpec(cluster, sparkConf);
StatefulSet statefulSet2 = spec2.getWorkerStatefulSet();
Expand Down Expand Up @@ -230,6 +249,9 @@ void testWorkerStatefulSetWithTemplate() {
StatefulSet statefulSet = spec.getWorkerStatefulSet();
assertEquals("my-namespace", statefulSet.getMetadata().getNamespace());
assertEquals("cluster-name-worker", statefulSet.getMetadata().getName());
assertEquals("4.0.0", statefulSet.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
assertEquals("4.0.0",
statefulSet.getSpec().getTemplate().getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
}

@Test
Expand All @@ -255,6 +277,7 @@ void testHorizontalPodAutoscaler() {
assertEquals("HorizontalPodAutoscaler", hpa.getKind());
assertEquals("my-namespace", hpa.getMetadata().getNamespace());
assertEquals("cluster-name-worker-hpa", hpa.getMetadata().getName());
assertEquals("4.0.0", hpa.getMetadata().getLabels().get(LABEL_SPARK_VERSION_NAME));
assertEquals(1, hpa.getSpec().getMinReplicas());
assertEquals(3, hpa.getSpec().getMaxReplicas());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.spark.k8s.operator.spec.ClusterSpec;
import org.apache.spark.k8s.operator.spec.ClusterTolerations;
import org.apache.spark.k8s.operator.spec.MasterSpec;
import org.apache.spark.k8s.operator.spec.RuntimeVersions;
import org.apache.spark.k8s.operator.spec.WorkerSpec;

class SparkClusterSubmissionWorkerTest {
Expand All @@ -40,6 +41,7 @@ class SparkClusterSubmissionWorkerTest {
ClusterTolerations clusterTolerations = new ClusterTolerations();
MasterSpec masterSpec;
WorkerSpec workerSpec;
RuntimeVersions runtimeVersions = new RuntimeVersions();

@BeforeEach
void setUp() {
Expand All @@ -55,6 +57,7 @@ void setUp() {
when(clusterSpec.getClusterTolerations()).thenReturn(clusterTolerations);
when(clusterSpec.getMasterSpec()).thenReturn(masterSpec);
when(clusterSpec.getWorkerSpec()).thenReturn(workerSpec);
when(clusterSpec.getRuntimeVersions()).thenReturn(runtimeVersions);
}

@Test
Expand Down

0 comments on commit 9474930

Please sign in to comment.