diff --git a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html index 0b544e50fdb0dd..9a9fa4064d0cdc 100644 --- a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html +++ b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html @@ -152,6 +152,12 @@ Integer Specify how many JobManager pods will be started simultaneously. Configure the value to greater than 1 to start standby JobManagers. It will help to achieve faster recovery. Notice that high availability should be enabled when starting standby JobManagers. + +
kubernetes.jobmanager.scheduler-name
+ "default-scheduler" + String + Specify the kubernetes pod scheduler for jobmanager pods of deployment. The default value is using the kubernetes default pod scheduler. For customerized kubernetes pod scheduler, allow to set pod scheduler for customerized pod scheduling. If not explicitly configured, config option 'kubernetes.scheduler-name' will be used. +
kubernetes.jobmanager.service-account
"default" @@ -206,6 +212,12 @@

Enum

The exposed type of the rest service. The exposed rest service could be used to access the Flinkā€™s Web UI and REST endpoint.

Possible values: + +
kubernetes.scheduler-name
+ "default-scheduler" + String + Specify the kubernetes pod scheduler for Flink pods of deployment. The default value is using the kubernetes default pod scheduler. For customerized kubernetes pod scheduler, allow to set pod scheduler for customerized pod scheduling. Notice that this can be overwritten by config options 'kubernetes.jobmanager.scheduler-name' and 'kubernetes.taskmanager.scheduler-name' for jobmanager and taskmanager respectively. +
kubernetes.secrets
(none) @@ -260,6 +272,12 @@ Map The node selector to be set for TaskManager pods. Specified as key:value pairs separated by commas. For example, environment:production,disk:ssd. + +
kubernetes.taskmanager.scheduler-name
+ "default-scheduler" + String + Specify the kubernetes pod scheduler for taskmanager pods of deployment. The default value is using the kubernetes default pod scheduler. For customerized kubernetes pod scheduler, allow to set pod scheduler for customerized pod scheduling. If not explicitly configured, config option 'kubernetes.scheduler-name' will be used. +
kubernetes.taskmanager.service-account
"default" diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java index 15da84f71d37ff..8c72d1c64fe3be 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java @@ -49,6 +49,7 @@ public class KubernetesConfigOptions { private static final String KUBERNETES_SERVICE_ACCOUNT_KEY = "kubernetes.service-account"; private static final String KUBERNETES_POD_TEMPLATE_FILE_KEY = "kubernetes.pod-template-file"; + private static final String KUBERNETES_POD_SCHEDULER_NAME_KEY = "kubernetes.scheduler-name"; public static final ConfigOption CONTEXT = key("kubernetes.context") @@ -444,6 +445,46 @@ public class KubernetesConfigOptions { public static final ConfigOption TASK_MANAGER_POD_TEMPLATE; + public static final ConfigOption JOB_MANAGER_POD_SCHEDULER_NAME = + key("kubernetes.jobmanager.scheduler-name") + .stringType() + .defaultValue("default-scheduler") + .withFallbackKeys(KUBERNETES_POD_SCHEDULER_NAME_KEY) + .withDescription( + "Specify the kubernetes pod scheduler for jobmanager pods of deployment. " + + "The default value is using the kubernetes default pod scheduler. " + + "For customerized kubernetes pod scheduler, allow to set pod scheduler " + + "for customerized pod scheduling. If not explicitly configured, config option '" + + KUBERNETES_POD_SCHEDULER_NAME_KEY + + "' will be used."); + + public static final ConfigOption TASK_MANAGER_POD_SCHEDULER_NAME = + key("kubernetes.taskmanager.scheduler-name") + .stringType() + .defaultValue("default-scheduler") + .withFallbackKeys(KUBERNETES_POD_SCHEDULER_NAME_KEY) + .withDescription( + "Specify the kubernetes pod scheduler for taskmanager pods of deployment. " + + "The default value is using the kubernetes default pod scheduler. " + + "For customerized kubernetes pod scheduler, allow to set pod scheduler " + + "for customerized pod scheduling. If not explicitly configured, config option '" + + KUBERNETES_POD_SCHEDULER_NAME_KEY + + "' will be used."); + + public static final ConfigOption POD_SCHEDULER_NAME = + key(KUBERNETES_POD_SCHEDULER_NAME_KEY) + .stringType() + .defaultValue("default-scheduler") + .withDescription( + "Specify the kubernetes pod scheduler for Flink pods of deployment. " + + "The default value is using the kubernetes default pod scheduler. " + + "For customerized kubernetes pod scheduler, allow to set pod scheduler " + + "for customerized pod scheduling. Notice that this can be overwritten by config options '" + + JOB_MANAGER_POD_SCHEDULER_NAME.key() + + "' and '" + + TASK_MANAGER_POD_SCHEDULER_NAME.key() + + "' for jobmanager and taskmanager respectively."); + /** * This option is here only for documentation generation, it is the fallback key of * JOB_MANAGER_POD_TEMPLATE and TASK_MANAGER_POD_TEMPLATE. diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java index 90318554f98071..0c85c2636a3058 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java @@ -102,6 +102,7 @@ public FlinkPod decorateFlinkPod(FlinkPod flinkPod) { kubernetesJobManagerParameters.getTolerations().stream() .map(e -> KubernetesToleration.fromMap(e).getInternalResource()) .collect(Collectors.toList())) + .withSchedulerName(kubernetesJobManagerParameters.getPodSchedulerName()) .endSpec(); final Container basicMainContainer = decorateMainContainer(flinkPod.getMainContainer()); diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java index 2218505445aad5..b40d2e47ef4de1 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecorator.java @@ -95,6 +95,7 @@ public FlinkPod decorateFlinkPod(FlinkPod flinkPod) { kubernetesTaskManagerParameters.isHostNetworkEnabled() ? DNS_PLOICY_HOSTNETWORK : DNS_PLOICY_DEFAULT) + .withSchedulerName(kubernetesTaskManagerParameters.getPodSchedulerName()) .endSpec(); // Merge fields diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java index c9bd7ef0397d19..43b8e087a2f162 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java @@ -206,4 +206,9 @@ public List> getEnvironmentsFromSecrets() { public boolean isHostNetworkEnabled() { return flinkConfig.getBoolean(KubernetesConfigOptions.KUBERNETES_HOSTNETWORK_ENABLED); } + + @Override + public String getPodSchedulerName() { + return flinkConfig.get(KubernetesConfigOptions.POD_SCHEDULER_NAME); + } } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java index 7bb8926d22a3b3..112fc8f4bae164 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java @@ -204,4 +204,9 @@ public int getReplicas() { public String getEntrypointArgs() { return flinkConfig.getString(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_ENTRYPOINT_ARGS); } + + @Override + public String getPodSchedulerName() { + return flinkConfig.get(KubernetesConfigOptions.JOB_MANAGER_POD_SCHEDULER_NAME); + } } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java index c6b335449a8850..a0fe1bce49c7a5 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesParameters.java @@ -110,4 +110,7 @@ public interface KubernetesParameters { * container(s). */ List> getEnvironmentsFromSecrets(); + + /** The custom kubernetes pod scheduler name. */ + String getPodSchedulerName(); } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParameters.java index 402ea3e4fed2d9..4061485372d1af 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParameters.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParameters.java @@ -196,4 +196,9 @@ public String getEntrypointArgs() { return flinkConfig.getString( KubernetesConfigOptions.KUBERNETES_TASKMANAGER_ENTRYPOINT_ARGS); } + + @Override + public String getPodSchedulerName() { + return flinkConfig.get(KubernetesConfigOptions.TASK_MANAGER_POD_SCHEDULER_NAME); + } } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecoratorTest.java index 104e15de01119d..30178024b488d3 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecoratorTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecoratorTest.java @@ -63,6 +63,7 @@ class InitJobManagerDecoratorTest extends KubernetesJobManagerTestBase { new Toleration("NoExecute", "key2", "Exists", 6000L, null)); private static final String USER_DEFINED_FLINK_LOG_DIR = "/path/of/flink-log"; + private static final String TEST_K8S_POD_SCHEDULER = "test-k8s-pod-scheduler"; private Pod resultPod; private Container resultMainContainer; @@ -79,6 +80,8 @@ protected void setupFlinkConfig() { this.flinkConfig.setString( KubernetesConfigOptions.JOB_MANAGER_TOLERATIONS.key(), TOLERATION_STRING); this.flinkConfig.set(KubernetesConfigOptions.FLINK_LOG_DIR, USER_DEFINED_FLINK_LOG_DIR); + this.flinkConfig.set( + KubernetesConfigOptions.JOB_MANAGER_POD_SCHEDULER_NAME, TEST_K8S_POD_SCHEDULER); } @Override @@ -222,4 +225,9 @@ void testFlinkLogDirEnvShouldBeSetIfConfiguredViaOptions() { envVar.getName().equals(Constants.ENV_FLINK_LOG_DIR) && envVar.getValue().equals(USER_DEFINED_FLINK_LOG_DIR)); } + + @Test + void testPodSchedulerName() { + assertThat(this.resultPod.getSpec().getSchedulerName()).isEqualTo(TEST_K8S_POD_SCHEDULER); + } } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java index 9ac7b7c239966b..8518082b2fa56b 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java @@ -72,6 +72,7 @@ class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase { private static final String RESOURCE_CONFIG_KEY = "test.com/test"; private static final String USER_DEFINED_FLINK_LOG_DIR = "/path/of/flink-log"; + private static final String TEST_K8S_POD_SCHEDULER = "test-k8s-pod-scheduler"; private Pod resultPod; private Container resultMainContainer; @@ -100,6 +101,8 @@ protected void setupFlinkConfig() { KubernetesConfigOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX), RESOURCE_CONFIG_KEY); this.flinkConfig.set(KubernetesConfigOptions.FLINK_LOG_DIR, USER_DEFINED_FLINK_LOG_DIR); + this.flinkConfig.set( + KubernetesConfigOptions.TASK_MANAGER_POD_SCHEDULER_NAME, TEST_K8S_POD_SCHEDULER); } @Override @@ -290,4 +293,9 @@ void testNodeAffinity() { "NotIn", new ArrayList<>(BLOCKED_NODES))); } + + @Test + void testPodSchedulerName() { + assertThat(this.resultPod.getSpec().getSchedulerName()).isEqualTo(TEST_K8S_POD_SCHEDULER); + } } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java index 067d8cd3b198d8..c40bde742f4660 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java @@ -254,4 +254,27 @@ void testGetReplicas() { flinkConfig.set(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS, 2); assertThat(kubernetesJobManagerParameters.getReplicas()).isEqualTo(2); } + + @Test + void testGetPodSchedulerName() { + final String testJobManagerSchedulerName = "test-k8s-job-manager-pod-scheduler"; + flinkConfig.set( + KubernetesConfigOptions.JOB_MANAGER_POD_SCHEDULER_NAME, + testJobManagerSchedulerName); + assertThat(kubernetesJobManagerParameters.getPodSchedulerName()) + .isEqualTo(testJobManagerSchedulerName); + } + + @Test + void testGetPodSchedulerNameWithDefaultValue() { + assertThat(kubernetesJobManagerParameters.getPodSchedulerName()) + .isEqualTo("default-scheduler"); + } + + @Test + void testGetPodSchedulerNameFallback() { + final String testScheduler = "test-k8s-pod-scheduler"; + flinkConfig.set(KubernetesConfigOptions.POD_SCHEDULER_NAME, testScheduler); + assertThat(kubernetesJobManagerParameters.getPodSchedulerName()).isEqualTo(testScheduler); + } } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParametersTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParametersTest.java index 41d8e5d7dd71b0..0e7d35d459c1ca 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParametersTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesTaskManagerParametersTest.java @@ -202,4 +202,27 @@ void testGetServiceAccountFallback() { void testGetServiceAccountShouldReturnDefaultIfNotExplicitlySet() { assertThat(kubernetesTaskManagerParameters.getServiceAccount()).isEqualTo("default"); } + + @Test + void testGetPodSchedulerName() { + final String testJobManagerSchedulerName = "test-k8s-job-manager-pod-scheduler"; + flinkConfig.set( + KubernetesConfigOptions.TASK_MANAGER_POD_SCHEDULER_NAME, + testJobManagerSchedulerName); + assertThat(kubernetesTaskManagerParameters.getPodSchedulerName()) + .isEqualTo(testJobManagerSchedulerName); + } + + @Test + void testGetPodSchedulerNameWithDefaultValue() { + assertThat(kubernetesTaskManagerParameters.getPodSchedulerName()) + .isEqualTo("default-scheduler"); + } + + @Test + void testGetPodSchedulerNameFallback() { + final String testScheduler = "test-k8s-pod-scheduler"; + flinkConfig.set(KubernetesConfigOptions.POD_SCHEDULER_NAME, testScheduler); + assertThat(kubernetesTaskManagerParameters.getPodSchedulerName()).isEqualTo(testScheduler); + } }