Skip to content

Commit

Permalink
[FLINK-15816][k8s] Prevent labels using kubernetes.cluster-id to exce…
Browse files Browse the repository at this point in the history
…ed the limit of 63 characters
  • Loading branch information
alpreu committed Jan 7, 2022
1 parent 6d52d10 commit 2a61cd8
Show file tree
Hide file tree
Showing 13 changed files with 125 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesLabel;
import org.apache.flink.util.AbstractID;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -75,7 +75,6 @@ public Optional<String> getApplicationTargetName() {

private String generateClusterId() {
final String randomID = new AbstractID().toString();
return (CLUSTER_ID_PREFIX + randomID)
.substring(0, Constants.MAXIMUM_CHARACTERS_OF_CLUSTER_ID);
return (CLUSTER_ID_PREFIX + randomID).substring(0, KubernetesLabel.getClusterIdMaxLength());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.configuration.ExternalResourceOptions;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesLabel;
import org.apache.flink.runtime.util.EnvironmentInformation;

import java.util.List;
Expand Down Expand Up @@ -220,10 +221,14 @@ public class KubernetesConfigOptions {
.withDescription(
Description.builder()
.text(
"The cluster-id, which should be no more than 45 characters, is used for identifying a unique Flink cluster. "
"The cluster-id, which should be no more than %s characters, is used for identifying a unique Flink cluster. "
+ "The id must only contain lowercase alphanumeric characters and \"-\". "
+ "The required format is %s. "
+ "If not set, the client will automatically generate it with a random ID.",
text(
String.valueOf(
KubernetesLabel
.getClusterIdMaxLength())),
code("[a-z]([-a-z0-9]*[a-z0-9])"))
.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesLabel;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.Service;
Expand Down Expand Up @@ -69,7 +70,7 @@ public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOExcepti

/** Generate name of the external Service. */
public static String getExternalServiceName(String clusterId) {
return clusterId + Constants.FLINK_REST_SERVICE_SUFFIX;
return KubernetesLabel.FLINK_REST_SERVICE.generateWith(clusterId);
}

/** Generate namespaced name of the external Service. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesLabel;

import org.apache.flink.shaded.guava30.com.google.common.io.Files;

Expand Down Expand Up @@ -55,7 +56,6 @@
import static org.apache.flink.configuration.GlobalConfiguration.FLINK_CONF_FILENAME;
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOGBACK_NAME;
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_MAP_PREFIX;
import static org.apache.flink.kubernetes.utils.Constants.FLINK_CONF_VOLUME;
import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -192,6 +192,6 @@ private List<File> getLocalLogConfFiles() {

@VisibleForTesting
public static String getFlinkConfConfigMapName(String clusterId) {
return CONFIG_MAP_PREFIX + clusterId;
return KubernetesLabel.CONFIG_MAP.generateWith(clusterId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesLabel;
import org.apache.flink.util.FileUtils;

import io.fabric8.kubernetes.api.model.ConfigMap;
Expand Down Expand Up @@ -202,6 +203,6 @@ private List<File> getHadoopConfigurationFileItems(String localHadoopConfigurati
}

public static String getHadoopConfConfigMapName(String clusterId) {
return Constants.HADOOP_CONF_CONFIG_MAP_PREFIX + clusterId;
return KubernetesLabel.HADOOP_CONF_CONFIG_MAP.generateWith(clusterId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesLabel;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.util.StringUtils;

Expand Down Expand Up @@ -198,10 +199,10 @@ public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOExcepti
}

public static String getKerberosKeytabSecretName(String clusterId) {
return Constants.KERBEROS_KEYTAB_SECRET_PREFIX + clusterId;
return KubernetesLabel.KERBEROS_KEYTAB_SECRET.generateWith(clusterId);
}

public static String getKerberosKrb5confConfigMapName(String clusterID) {
return Constants.KERBEROS_KRB5CONF_CONFIG_MAP_PREFIX + clusterID;
public static String getKerberosKrb5confConfigMapName(String clusterId) {
return KubernetesLabel.KERBEROS_KRB5CONF_CONFIG_MAP.generateWith(clusterId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesLabel;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.function.FunctionUtils;

Expand Down Expand Up @@ -63,7 +64,8 @@ public class PodTemplateMountDecorator extends AbstractKubernetesStepDecorator {
public PodTemplateMountDecorator(AbstractKubernetesParameters kubernetesComponentConf) {
this.kubernetesComponentConf = checkNotNull(kubernetesComponentConf);
this.podTemplateConfigMapName =
Constants.POD_TEMPLATE_CONFIG_MAP_PREFIX + kubernetesComponentConf.getClusterId();
KubernetesLabel.POD_TEMPLATE_CONFIG_MAP.generateWith(
kubernetesComponentConf.getClusterId());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesLabel;
import org.apache.flink.kubernetes.utils.KubernetesUtils;

import io.fabric8.kubernetes.api.model.LocalObjectReference;
Expand All @@ -38,6 +39,7 @@
import static org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOGBACK_NAME;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;

/** Abstract class for the {@link KubernetesParameters}. */
public abstract class AbstractKubernetesParameters implements KubernetesParameters {
Expand Down Expand Up @@ -67,16 +69,16 @@ public String getConfigDirectory() {
public String getClusterId() {
final String clusterId = flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID);

if (StringUtils.isBlank(clusterId)) {
throw new IllegalArgumentException(
KubernetesConfigOptions.CLUSTER_ID.key() + " must not be blank.");
} else if (clusterId.length() > Constants.MAXIMUM_CHARACTERS_OF_CLUSTER_ID) {
throw new IllegalArgumentException(
KubernetesConfigOptions.CLUSTER_ID.key()
+ " must be no more than "
+ Constants.MAXIMUM_CHARACTERS_OF_CLUSTER_ID
+ " characters.");
}
checkArgument(
!isNullOrWhitespaceOnly(clusterId),
"%s must not be blank.",
KubernetesConfigOptions.CLUSTER_ID.key());
checkArgument(
clusterId.length() <= KubernetesLabel.getClusterIdMaxLength(),
"%s must be no more than %s characters. Please change %s.",
clusterId,
KubernetesLabel.getClusterIdMaxLength(),
KubernetesConfigOptions.CLUSTER_ID.key());

return clusterId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,18 @@ public class Constants {
public static final String MAIN_CONTAINER_NAME = "flink-main-container";

public static final String FLINK_CONF_VOLUME = "flink-config-volume";
public static final String CONFIG_MAP_PREFIX = "flink-config-";

public static final String HADOOP_CONF_VOLUME = "hadoop-config-volume";
public static final String HADOOP_CONF_CONFIG_MAP_PREFIX = "hadoop-config-";
public static final String HADOOP_CONF_DIR_IN_POD = "/opt/hadoop/conf";
public static final String ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR";
public static final String ENV_HADOOP_HOME = "HADOOP_HOME";

public static final String KERBEROS_KEYTAB_VOLUME = "kerberos-keytab-volume";
public static final String KERBEROS_KEYTAB_SECRET_PREFIX = "kerberos-keytab-";
public static final String KERBEROS_KEYTAB_MOUNT_POINT = "/opt/kerberos/kerberos-keytab";
public static final String KERBEROS_KRB5CONF_VOLUME = "kerberos-krb5conf-volume";
public static final String KERBEROS_KRB5CONF_CONFIG_MAP_PREFIX = "kerberos-krb5conf-";
public static final String KERBEROS_KRB5CONF_MOUNT_DIR = "/etc";
public static final String KERBEROS_KRB5CONF_FILE = "krb5.conf";

public static final String FLINK_REST_SERVICE_SUFFIX = "-rest";

public static final String NAME_SEPARATOR = "-";

// Constants for label builder
Expand Down Expand Up @@ -83,8 +77,6 @@ public class Constants {

public static final String HEADLESS_SERVICE_CLUSTER_IP = "None";

public static final int MAXIMUM_CHARACTERS_OF_CLUSTER_ID = 45;

public static final String RESTART_POLICY_OF_NEVER = "Never";

// Constants for Kubernetes high availability
Expand All @@ -102,7 +94,6 @@ public class Constants {
public static final String TASK_MANAGER_POD_TEMPLATE_FILE_NAME =
"taskmanager-pod-template.yaml";
public static final String POD_TEMPLATE_DIR_IN_POD = "/opt/flink/pod-template";
public static final String POD_TEMPLATE_CONFIG_MAP_PREFIX = "pod-template-";
public static final String POD_TEMPLATE_VOLUME = "pod-template-volume";

// Kubernetes start scripts
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.kubernetes.utils;

import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;

import java.util.Arrays;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;

/** Collection of kubernetes labels and helper methods. */
public enum KubernetesLabel {
CONFIG_MAP("flink-config-", ""),
HADOOP_CONF_CONFIG_MAP("hadoop-config-", ""),
KERBEROS_KEYTAB_SECRET("kerberos-keytab-", ""),
KERBEROS_KRB5CONF_CONFIG_MAP("kerberos-krb5conf-", ""),
POD_TEMPLATE_CONFIG_MAP("pod-template-", ""),
FLINK_REST_SERVICE("", "-rest");

private static final int KUBERNETES_LABEL_MAX_LENGTH = 63;

private final String prefix;
private final String suffix;

KubernetesLabel(String prefix, String suffix) {
this.prefix = prefix;
this.suffix = suffix;
}

private int getTotalLength() {
return prefix.length() + suffix.length();
}

public static int getClusterIdMaxLength() {
int longestAffixLength =
Arrays.stream(KubernetesLabel.values())
.map(KubernetesLabel::getTotalLength)
.max(Integer::compareTo)
.orElseThrow(() -> new IllegalStateException("No enum value is present."));

return KUBERNETES_LABEL_MAX_LENGTH - longestAffixLength;
}

/**
* Generates the kubernetes label containing the clusterId.
*
* @param clusterId The clusterId
* @return a String containing the kubernetes label with the clusterId
*/
public String generateWith(String clusterId) {
checkArgument(
!isNullOrWhitespaceOnly(clusterId),
"%s must not be blank.",
KubernetesConfigOptions.CLUSTER_ID.key());

String labelWithClusterId = prefix + clusterId + suffix;

checkArgument(
labelWithClusterId.length() <= KUBERNETES_LABEL_MAX_LENGTH,
"%s must be no more than %s characters. Please change %s.",
labelWithClusterId,
KUBERNETES_LABEL_MAX_LENGTH,
KubernetesConfigOptions.CLUSTER_ID.key());

return labelWithClusterId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.kubernetes.kubeclient.decorators.InternalServiceDecorator;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesLabel;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;

import io.fabric8.kubernetes.api.model.Container;
Expand Down Expand Up @@ -207,7 +208,8 @@ public void testDeployApplicationClusterWithClusterIP() throws Exception {
.deployApplicationCluster(clusterSpecification, appConfig)
.getClusterClient();

final String address = CLUSTER_ID + Constants.FLINK_REST_SERVICE_SUFFIX + "." + NAMESPACE;
final String address =
KubernetesLabel.FLINK_REST_SERVICE.generateWith(CLUSTER_ID) + "." + NAMESPACE;
final int port = flinkConfig.get(RestOptions.PORT);
assertThat(
clusterClient.getWebInterfaceURL(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.kubernetes.kubeclient.FlinkPod;
import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerTestBase;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesLabel;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.HasMetadata;
Expand Down Expand Up @@ -136,7 +137,7 @@ private List<Volume> getExpectedVolumes(List<KeyToPath> keyToPaths) {
new VolumeBuilder()
.withName(Constants.POD_TEMPLATE_VOLUME)
.withNewConfigMap()
.withName(Constants.POD_TEMPLATE_CONFIG_MAP_PREFIX + CLUSTER_ID)
.withName(KubernetesLabel.POD_TEMPLATE_CONFIG_MAP.generateWith(CLUSTER_ID))
.withItems(keyToPaths)
.endConfigMap()
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesLabel;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.RunnableWithException;
Expand Down Expand Up @@ -64,12 +65,10 @@ public void testClusterIdMustNotBeBlank() {
public void testClusterIdLengthLimitation() {
final String stringWithIllegalLength =
StringUtils.generateRandomAlphanumericString(
new Random(), Constants.MAXIMUM_CHARACTERS_OF_CLUSTER_ID + 1);
new Random(), KubernetesLabel.getClusterIdMaxLength() + 1);
flinkConfig.set(KubernetesConfigOptions.CLUSTER_ID, stringWithIllegalLength);
assertThrows(
"must be no more than "
+ Constants.MAXIMUM_CHARACTERS_OF_CLUSTER_ID
+ " characters",
"must be no more than " + KubernetesLabel.getClusterIdMaxLength() + " characters",
IllegalArgumentException.class,
testingKubernetesParameters::getClusterId);
}
Expand Down

0 comments on commit 2a61cd8

Please sign in to comment.