Skip to content

Commit

Permalink
FLINK-28171 enable add appProtocol via the configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
Sigalit Aliazov committed Jul 3, 2023
1 parent 3f70900 commit 8e5b519
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.flink.kubernetes.utils.Constants.API_VERSION;
Expand Down Expand Up @@ -152,7 +154,7 @@ private Container decorateMainContainer(Container container) {

// Merge fields
mainContainerBuilder
.addAllToPorts(getContainerPorts())
.addAllToPorts(getContainerPorts(container))
.addAllToEnv(getCustomizedEnvs())
.addNewEnv()
.withName(ENV_FLINK_POD_IP_ADDRESS)
Expand All @@ -165,23 +167,32 @@ private Container decorateMainContainer(Container container) {
return mainContainerBuilder.build();
}

private List<ContainerPort> getContainerPorts() {
private List<ContainerPort> getContainerPorts(Container container) {
if (kubernetesJobManagerParameters.isHostNetworkEnabled()) {
return Collections.emptyList();
}
return Arrays.asList(
new ContainerPortBuilder()
.withName(Constants.REST_PORT_NAME)
.withContainerPort(kubernetesJobManagerParameters.getRestPort())
.build(),
new ContainerPortBuilder()
.withName(Constants.JOB_MANAGER_RPC_PORT_NAME)
.withContainerPort(kubernetesJobManagerParameters.getRPCPort())
.build(),
new ContainerPortBuilder()
.withName(Constants.BLOB_SERVER_PORT_NAME)
.withContainerPort(kubernetesJobManagerParameters.getBlobServerPort())
.build());
List<ContainerPort> defaultContainerPorts =
Arrays.asList(
new ContainerPortBuilder()
.withName(Constants.REST_PORT_NAME)
.withContainerPort(kubernetesJobManagerParameters.getRestPort())
.build(),
new ContainerPortBuilder()
.withName(Constants.JOB_MANAGER_RPC_PORT_NAME)
.withContainerPort(kubernetesJobManagerParameters.getRPCPort())
.build(),
new ContainerPortBuilder()
.withName(Constants.BLOB_SERVER_PORT_NAME)
.withContainerPort(
kubernetesJobManagerParameters.getBlobServerPort())
.build());

Map<String, ContainerPort> containerPortMap =
container.getPorts().stream()
.collect(Collectors.toMap(ContainerPort::getName, Function.identity()));
return defaultContainerPorts.stream()
.filter(x -> !containerPortMap.containsKey(x.getName()))
.collect(Collectors.toList());
}

private List<EnvVar> getCustomizedEnvs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.flink.kubernetes.utils.Constants.API_VERSION;
Expand Down Expand Up @@ -189,7 +191,7 @@ private Container decorateMainContainer(Container container) {

// Merge fields
mainContainerBuilder
.addAllToPorts(getContainerPorts())
.addAllToPorts(getContainerPorts(container))
.addAllToEnv(getCustomizedEnvs())
.addNewEnv()
.withName(ENV_FLINK_POD_NODE_ID)
Expand All @@ -203,15 +205,24 @@ private Container decorateMainContainer(Container container) {
return mainContainerBuilder.build();
}

private List<ContainerPort> getContainerPorts() {
private List<ContainerPort> getContainerPorts(Container container) {
if (kubernetesTaskManagerParameters.isHostNetworkEnabled()) {
return Collections.emptyList();
}
return Collections.singletonList(
new ContainerPortBuilder()
.withName(Constants.TASK_MANAGER_RPC_PORT_NAME)
.withContainerPort(kubernetesTaskManagerParameters.getRPCPort())
.build());
List<ContainerPort> defaultContainerPorts =
Collections.singletonList(
new ContainerPortBuilder()
.withName(Constants.TASK_MANAGER_RPC_PORT_NAME)
.withContainerPort(kubernetesTaskManagerParameters.getRPCPort())
.build());

Map<String, ContainerPort> containerPortMap =
container.getPorts().stream()
.collect(Collectors.toMap(ContainerPort::getName, Function.identity()));

return defaultContainerPorts.stream()
.filter(x -> !containerPortMap.containsKey(x.getName()))
.collect(Collectors.toList());
}

private List<EnvVar> getCustomizedEnvs() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.kubernetes.utils.Constants;

import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.ContainerPort;
import io.fabric8.kubernetes.api.model.ContainerPortBuilder;
import io.fabric8.kubernetes.api.model.EnvVar;
Expand Down Expand Up @@ -227,4 +228,36 @@ void testFlinkLogDirEnvShouldBeSetIfConfiguredViaOptions() {
void testDNSPolicyDefaultValue() {
assertThat(this.resultPod.getSpec().getDnsPolicy()).isEqualTo(Constants.DNS_POLICY_DEFAULT);
}

@Test
void testMainContainerPortsAsInput() {
Container podMainContainer = new ContainerBuilder().build();
ContainerPort rpcConatinerPort =
new ContainerPortBuilder()
.withName(Constants.JOB_MANAGER_RPC_PORT_NAME)
.withContainerPort(RPC_PORT)
.build();
rpcConatinerPort.setAdditionalProperty("appProtocol", "tcp");
podMainContainer.setPorts(Arrays.asList(rpcConatinerPort));
final FlinkPod baseFlinkPodWithPorts =
new FlinkPod.Builder().withMainContainer(podMainContainer).build();

final InitJobManagerDecorator initJobManagerDecorator =
new InitJobManagerDecorator(this.kubernetesJobManagerParameters);
final FlinkPod resultFlinkPodWithPort =
initJobManagerDecorator.decorateFlinkPod(baseFlinkPodWithPorts);
final List<ContainerPort> expectedContainerPorts =
Arrays.asList(
rpcConatinerPort,
new ContainerPortBuilder()
.withName(Constants.REST_PORT_NAME)
.withContainerPort(REST_PORT)
.build(),
new ContainerPortBuilder()
.withName(Constants.BLOB_SERVER_PORT_NAME)
.withContainerPort(BLOB_SERVER_PORT)
.build());
assertThat(resultFlinkPodWithPort.getMainContainer().getPorts())
.isEqualTo(expectedContainerPorts);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.kubernetes.utils.Constants;

import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.ContainerPort;
import io.fabric8.kubernetes.api.model.ContainerPortBuilder;
import io.fabric8.kubernetes.api.model.EnvVar;
Expand Down Expand Up @@ -290,4 +291,27 @@ void testNodeAffinity() {
"NotIn",
new ArrayList<>(BLOCKED_NODES)));
}

@Test
void testMainContainerPortsAsInput() {
Container podMainContainer = new ContainerBuilder().build();
ContainerPort rpcConatinerPort =
new ContainerPortBuilder()
.withName(Constants.TASK_MANAGER_RPC_PORT_NAME)
.withContainerPort(RPC_PORT)
.build();
rpcConatinerPort.setAdditionalProperty("appProtocol", "tcp");
podMainContainer.setPorts(Arrays.asList(rpcConatinerPort));
final FlinkPod baseFlinkPodWithPorts =
new FlinkPod.Builder().withMainContainer(podMainContainer).build();

final InitTaskManagerDecorator initTaskManagerDecorator =
new InitTaskManagerDecorator(kubernetesTaskManagerParameters);

final FlinkPod resultFlinkPodWithPort =
initTaskManagerDecorator.decorateFlinkPod(baseFlinkPodWithPorts);
final List<ContainerPort> expectedContainerPorts = Arrays.asList(rpcConatinerPort);
assertThat(resultFlinkPodWithPort.getMainContainer().getPorts())
.isEqualTo(expectedContainerPorts);
}
}

0 comments on commit 8e5b519

Please sign in to comment.