Skip to content

Commit

Permalink
[FLINK-28825] Add K8S pod scheduler into Kubernetes options
Browse files Browse the repository at this point in the history
We introduce 3 options for supporting specify the K8S pod scheduler
name.
They are:
kubernetes.jobmanager.scheduler-name
kubernetes.taskmanager.scheduler-name
kubernetes.scheduler-name

The first two of them can be specified by users to control the k8s pod
scheduler they used for jobmanager and taskmanager. The last one is the
fallback one, the first two options will overwrite it.
  • Loading branch information
bzhaoopenstack committed Sep 29, 2022
1 parent e1e9ee8 commit f12aefd
Show file tree
Hide file tree
Showing 12 changed files with 141 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@
<td>Integer</td>
<td>Specify how many JobManager pods will be started simultaneously. Configure the value to greater than 1 to start standby JobManagers. It will help to achieve faster recovery. Notice that high availability should be enabled when starting standby JobManagers.</td>
</tr>
<tr>
<td><h5>kubernetes.jobmanager.scheduler-name</h5></td>
<td style="word-wrap: break-word;">"default-scheduler"</td>
<td>String</td>
<td>Specify the kubernetes pod scheduler for jobmanager pods of deployment. The default value is using the kubernetes default pod scheduler. For customerized kubernetes pod scheduler, allow to set pod scheduler for customerized pod scheduling. If not explicitly configured, config option 'kubernetes.scheduler-name' will be used.</td>
</tr>
<tr>
<td><h5>kubernetes.jobmanager.service-account</h5></td>
<td style="word-wrap: break-word;">"default"</td>
Expand Down Expand Up @@ -206,6 +212,12 @@
<td><p>Enum</p></td>
<td>The exposed type of the rest service. The exposed rest service could be used to access the Flink’s Web UI and REST endpoint.<br /><br />Possible values:<ul><li>"ClusterIP"</li><li>"NodePort"</li><li>"LoadBalancer"</li><li>"Headless_ClusterIP"</li></ul></td>
</tr>
<tr>
<td><h5>kubernetes.scheduler-name</h5></td>
<td style="word-wrap: break-word;">"default-scheduler"</td>
<td>String</td>
<td>Specify the kubernetes pod scheduler for Flink pods of deployment. The default value is using the kubernetes default pod scheduler. For customerized kubernetes pod scheduler, allow to set pod scheduler for customerized pod scheduling. Notice that this can be overwritten by config options 'kubernetes.jobmanager.scheduler-name' and 'kubernetes.taskmanager.scheduler-name' for jobmanager and taskmanager respectively.</td>
</tr>
<tr>
<td><h5>kubernetes.secrets</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down Expand Up @@ -260,6 +272,12 @@
<td>Map</td>
<td>The node selector to be set for TaskManager pods. Specified as key:value pairs separated by commas. For example, environment:production,disk:ssd.</td>
</tr>
<tr>
<td><h5>kubernetes.taskmanager.scheduler-name</h5></td>
<td style="word-wrap: break-word;">"default-scheduler"</td>
<td>String</td>
<td>Specify the kubernetes pod scheduler for taskmanager pods of deployment. The default value is using the kubernetes default pod scheduler. For customerized kubernetes pod scheduler, allow to set pod scheduler for customerized pod scheduling. If not explicitly configured, config option 'kubernetes.scheduler-name' will be used.</td>
</tr>
<tr>
<td><h5>kubernetes.taskmanager.service-account</h5></td>
<td style="word-wrap: break-word;">"default"</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class KubernetesConfigOptions {

private static final String KUBERNETES_SERVICE_ACCOUNT_KEY = "kubernetes.service-account";
private static final String KUBERNETES_POD_TEMPLATE_FILE_KEY = "kubernetes.pod-template-file";
private static final String KUBERNETES_POD_SCHEDULER_NAME_KEY = "kubernetes.scheduler-name";

public static final ConfigOption<String> CONTEXT =
key("kubernetes.context")
Expand Down Expand Up @@ -444,6 +445,46 @@ public class KubernetesConfigOptions {

public static final ConfigOption<String> TASK_MANAGER_POD_TEMPLATE;

public static final ConfigOption<String> JOB_MANAGER_POD_SCHEDULER_NAME =
key("kubernetes.jobmanager.scheduler-name")
.stringType()
.defaultValue("default-scheduler")
.withFallbackKeys(KUBERNETES_POD_SCHEDULER_NAME_KEY)
.withDescription(
"Specify the kubernetes pod scheduler for jobmanager pods of deployment. "
+ "The default value is using the kubernetes default pod scheduler. "
+ "For customerized kubernetes pod scheduler, allow to set pod scheduler "
+ "for customerized pod scheduling. If not explicitly configured, config option '"
+ KUBERNETES_POD_SCHEDULER_NAME_KEY
+ "' will be used.");

public static final ConfigOption<String> TASK_MANAGER_POD_SCHEDULER_NAME =
key("kubernetes.taskmanager.scheduler-name")
.stringType()
.defaultValue("default-scheduler")
.withFallbackKeys(KUBERNETES_POD_SCHEDULER_NAME_KEY)
.withDescription(
"Specify the kubernetes pod scheduler for taskmanager pods of deployment. "
+ "The default value is using the kubernetes default pod scheduler. "
+ "For customerized kubernetes pod scheduler, allow to set pod scheduler "
+ "for customerized pod scheduling. If not explicitly configured, config option '"
+ KUBERNETES_POD_SCHEDULER_NAME_KEY
+ "' will be used.");

public static final ConfigOption<String> POD_SCHEDULER_NAME =
key(KUBERNETES_POD_SCHEDULER_NAME_KEY)
.stringType()
.defaultValue("default-scheduler")
.withDescription(
"Specify the kubernetes pod scheduler for Flink pods of deployment. "
+ "The default value is using the kubernetes default pod scheduler. "
+ "For customerized kubernetes pod scheduler, allow to set pod scheduler "
+ "for customerized pod scheduling. Notice that this can be overwritten by config options '"
+ JOB_MANAGER_POD_SCHEDULER_NAME.key()
+ "' and '"
+ TASK_MANAGER_POD_SCHEDULER_NAME.key()
+ "' for jobmanager and taskmanager respectively.");

/**
* This option is here only for documentation generation, it is the fallback key of
* JOB_MANAGER_POD_TEMPLATE and TASK_MANAGER_POD_TEMPLATE.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
kubernetesJobManagerParameters.getTolerations().stream()
.map(e -> KubernetesToleration.fromMap(e).getInternalResource())
.collect(Collectors.toList()))
.withSchedulerName(kubernetesJobManagerParameters.getPodSchedulerName())
.endSpec();

final Container basicMainContainer = decorateMainContainer(flinkPod.getMainContainer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
kubernetesTaskManagerParameters.isHostNetworkEnabled()
? DNS_PLOICY_HOSTNETWORK
: DNS_PLOICY_DEFAULT)
.withSchedulerName(kubernetesTaskManagerParameters.getPodSchedulerName())
.endSpec();

// Merge fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,9 @@ public List<Map<String, String>> getEnvironmentsFromSecrets() {
public boolean isHostNetworkEnabled() {
return flinkConfig.getBoolean(KubernetesConfigOptions.KUBERNETES_HOSTNETWORK_ENABLED);
}

@Override
public String getPodSchedulerName() {
return flinkConfig.get(KubernetesConfigOptions.POD_SCHEDULER_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,9 @@ public int getReplicas() {
public String getEntrypointArgs() {
return flinkConfig.getString(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_ENTRYPOINT_ARGS);
}

@Override
public String getPodSchedulerName() {
return flinkConfig.get(KubernetesConfigOptions.JOB_MANAGER_POD_SCHEDULER_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,7 @@ public interface KubernetesParameters {
* container(s).
*/
List<Map<String, String>> getEnvironmentsFromSecrets();

/** The custom kubernetes pod scheduler name. */
String getPodSchedulerName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,9 @@ public String getEntrypointArgs() {
return flinkConfig.getString(
KubernetesConfigOptions.KUBERNETES_TASKMANAGER_ENTRYPOINT_ARGS);
}

@Override
public String getPodSchedulerName() {
return flinkConfig.get(KubernetesConfigOptions.TASK_MANAGER_POD_SCHEDULER_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class InitJobManagerDecoratorTest extends KubernetesJobManagerTestBase {
new Toleration("NoExecute", "key2", "Exists", 6000L, null));

private static final String USER_DEFINED_FLINK_LOG_DIR = "/path/of/flink-log";
private static final String TEST_K8S_POD_SCHEDULER = "test-k8s-pod-scheduler";

private Pod resultPod;
private Container resultMainContainer;
Expand All @@ -79,6 +80,8 @@ protected void setupFlinkConfig() {
this.flinkConfig.setString(
KubernetesConfigOptions.JOB_MANAGER_TOLERATIONS.key(), TOLERATION_STRING);
this.flinkConfig.set(KubernetesConfigOptions.FLINK_LOG_DIR, USER_DEFINED_FLINK_LOG_DIR);
this.flinkConfig.set(
KubernetesConfigOptions.JOB_MANAGER_POD_SCHEDULER_NAME, TEST_K8S_POD_SCHEDULER);
}

@Override
Expand Down Expand Up @@ -222,4 +225,9 @@ void testFlinkLogDirEnvShouldBeSetIfConfiguredViaOptions() {
envVar.getName().equals(Constants.ENV_FLINK_LOG_DIR)
&& envVar.getValue().equals(USER_DEFINED_FLINK_LOG_DIR));
}

@Test
void testPodSchedulerName() {
assertThat(this.resultPod.getSpec().getSchedulerName()).isEqualTo(TEST_K8S_POD_SCHEDULER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class InitTaskManagerDecoratorTest extends KubernetesTaskManagerTestBase {
private static final String RESOURCE_CONFIG_KEY = "test.com/test";

private static final String USER_DEFINED_FLINK_LOG_DIR = "/path/of/flink-log";
private static final String TEST_K8S_POD_SCHEDULER = "test-k8s-pod-scheduler";

private Pod resultPod;
private Container resultMainContainer;
Expand Down Expand Up @@ -100,6 +101,8 @@ protected void setupFlinkConfig() {
KubernetesConfigOptions.EXTERNAL_RESOURCE_KUBERNETES_CONFIG_KEY_SUFFIX),
RESOURCE_CONFIG_KEY);
this.flinkConfig.set(KubernetesConfigOptions.FLINK_LOG_DIR, USER_DEFINED_FLINK_LOG_DIR);
this.flinkConfig.set(
KubernetesConfigOptions.TASK_MANAGER_POD_SCHEDULER_NAME, TEST_K8S_POD_SCHEDULER);
}

@Override
Expand Down Expand Up @@ -290,4 +293,9 @@ void testNodeAffinity() {
"NotIn",
new ArrayList<>(BLOCKED_NODES)));
}

@Test
void testPodSchedulerName() {
assertThat(this.resultPod.getSpec().getSchedulerName()).isEqualTo(TEST_K8S_POD_SCHEDULER);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,4 +254,27 @@ void testGetReplicas() {
flinkConfig.set(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS, 2);
assertThat(kubernetesJobManagerParameters.getReplicas()).isEqualTo(2);
}

@Test
void testGetPodSchedulerName() {
final String testJobManagerSchedulerName = "test-k8s-job-manager-pod-scheduler";
flinkConfig.set(
KubernetesConfigOptions.JOB_MANAGER_POD_SCHEDULER_NAME,
testJobManagerSchedulerName);
assertThat(kubernetesJobManagerParameters.getPodSchedulerName())
.isEqualTo(testJobManagerSchedulerName);
}

@Test
void testGetPodSchedulerNameWithDefaultValue() {
assertThat(kubernetesJobManagerParameters.getPodSchedulerName())
.isEqualTo("default-scheduler");
}

@Test
void testGetPodSchedulerNameFallback() {
final String testScheduler = "test-k8s-pod-scheduler";
flinkConfig.set(KubernetesConfigOptions.POD_SCHEDULER_NAME, testScheduler);
assertThat(kubernetesJobManagerParameters.getPodSchedulerName()).isEqualTo(testScheduler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,27 @@ void testGetServiceAccountFallback() {
void testGetServiceAccountShouldReturnDefaultIfNotExplicitlySet() {
assertThat(kubernetesTaskManagerParameters.getServiceAccount()).isEqualTo("default");
}

@Test
void testGetPodSchedulerName() {
final String testJobManagerSchedulerName = "test-k8s-job-manager-pod-scheduler";
flinkConfig.set(
KubernetesConfigOptions.TASK_MANAGER_POD_SCHEDULER_NAME,
testJobManagerSchedulerName);
assertThat(kubernetesTaskManagerParameters.getPodSchedulerName())
.isEqualTo(testJobManagerSchedulerName);
}

@Test
void testGetPodSchedulerNameWithDefaultValue() {
assertThat(kubernetesTaskManagerParameters.getPodSchedulerName())
.isEqualTo("default-scheduler");
}

@Test
void testGetPodSchedulerNameFallback() {
final String testScheduler = "test-k8s-pod-scheduler";
flinkConfig.set(KubernetesConfigOptions.POD_SCHEDULER_NAME, testScheduler);
assertThat(kubernetesTaskManagerParameters.getPodSchedulerName()).isEqualTo(testScheduler);
}
}

0 comments on commit f12aefd

Please sign in to comment.