Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-28829][k8s] Support prepreparing K8S resources before JM creation #20498

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ public Fabric8FlinkKubeClient(
public void createJobManagerComponent(KubernetesJobManagerSpecification kubernetesJMSpec) {
final Deployment deployment = kubernetesJMSpec.getDeployment();
final List<HasMetadata> accompanyingResources = kubernetesJMSpec.getAccompanyingResources();
final List<HasMetadata> prePreparedResources = kubernetesJMSpec.getPrePreparedResources();

// before create Deployment
this.internalClient.resourceList(prePreparedResources).createOrReplace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might have resources leak here if the flink client crashed exactly after the pre-prepared resources created.


// create Deployment
LOG.debug(
Expand All @@ -121,6 +125,8 @@ public void createJobManagerComponent(KubernetesJobManagerSpecification kubernet
final Deployment createdDeployment =
this.internalClient.apps().deployments().create(deployment);

// Add all prepared AccompanyingResources to refresh owner reference
accompanyingResources.addAll(prePreparedResources);
// Note that we should use the uid of the created Deployment for the OwnerReference.
setOwnerReference(createdDeployment, accompanyingResources);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,15 @@ public class KubernetesJobManagerSpecification {

private List<HasMetadata> accompanyingResources;

private List<HasMetadata> prePreparedResources;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private List<HasMetadata> prePreparedResources;
private final List<HasMetadata> prePreparedResources;

Copy link
Contributor Author

@bzhaoopenstack bzhaoopenstack Oct 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, how about just follow the style of accompanyingResources? If we change this, we should also keep the same with accompanyingResources. And that was not related with this PR.


public KubernetesJobManagerSpecification(
Deployment deployment, List<HasMetadata> accompanyingResources) {
Deployment deployment,
List<HasMetadata> accompanyingResources,
List<HasMetadata> prePreparedResources) {
this.deployment = deployment;
this.accompanyingResources = accompanyingResources;
this.prePreparedResources = prePreparedResources;
}

public Deployment getDeployment() {
Expand All @@ -43,4 +48,8 @@ public Deployment getDeployment() {
public List<HasMetadata> getAccompanyingResources() {
return accompanyingResources;
}

public List<HasMetadata> getPrePreparedResources() {
return prePreparedResources;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,9 @@ public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException {
return Collections.emptyList();
}

@Override
public List<HasMetadata> buildPrePreparedResources() {
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ public interface KubernetesStepDecorator {
* feature. This could only be applicable on the client-side submission process.
*/
List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException;

/** Build the Kubernetes resources before Flink Job Manager deployment creation. */
List<HasMetadata> buildPrePreparedResources();
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public static KubernetesJobManagerSpecification buildKubernetesJobManagerSpecifi
throws IOException {
FlinkPod flinkPod = Preconditions.checkNotNull(podTemplate).copy();
List<HasMetadata> accompanyingResources = new ArrayList<>();
List<HasMetadata> prePreparedResources = new ArrayList<>();

final KubernetesStepDecorator[] stepDecorators =
new KubernetesStepDecorator[] {
Expand All @@ -76,14 +77,16 @@ public static KubernetesJobManagerSpecification buildKubernetesJobManagerSpecifi
};

for (KubernetesStepDecorator stepDecorator : stepDecorators) {
prePreparedResources.addAll(stepDecorator.buildPrePreparedResources());
flinkPod = stepDecorator.decorateFlinkPod(flinkPod);
accompanyingResources.addAll(stepDecorator.buildAccompanyingKubernetesResources());
}

final Deployment deployment =
createJobManagerDeployment(flinkPod, kubernetesJobManagerParameters);

return new KubernetesJobManagerSpecification(deployment, accompanyingResources);
return new KubernetesJobManagerSpecification(
deployment, accompanyingResources, prePreparedResources);
}

private static Deployment createJobManagerDeployment(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,4 +630,37 @@ private KubernetesConfigMap buildTestingConfigMap() {
.withData(data)
.build());
}

@Test
void testMockPrePreparedResources() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The throws Exception is useless.

// regenerate a test JM spec
Deployment testDeployment = kubernetesJobManagerSpecification.getDeployment();
List<HasMetadata> testAccompanyingResources =
kubernetesJobManagerSpecification.getAccompanyingResources();
List<HasMetadata> mockPrePreparedResources = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Pod mockPod =
new PodBuilder()
.editOrNewMetadata()
.withName("mock-pod-preprepared-resource-" + i)
.endMetadata()
.editOrNewSpec()
.endSpec()
.build();
mockPrePreparedResources.add(mockPod);
}
KubernetesJobManagerSpecification testKubernetesJobManagerSpecification =
new KubernetesJobManagerSpecification(
testDeployment, testAccompanyingResources, mockPrePreparedResources);
flinkKubeClient.createJobManagerComponent(testKubernetesJobManagerSpecification);

// check the preprepared resources had been created.
final List<Pod> resultPods = kubeClient.pods().inNamespace(NAMESPACE).list().getItems();
assertThat(resultPods).hasSize(3);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be at least 4 pods ?
1 pod for the JM itself and 3 mock-pod-preprepared-resource-**.

Copy link
Contributor Author

@bzhaoopenstack bzhaoopenstack Oct 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is UT, it won't create any real Pods, just a k8s models objects. Actually, JM will be created by K8S deployment, In the real env, that's true it should be 4 pods.


final List<Deployment> resultedDeployments =
kubeClient.apps().deployments().inNamespace(NAMESPACE).list().getItems();
// check resource owner reference had been refreshed.
testOwnerReferenceSetting(resultedDeployments.get(0), resultPods);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -468,4 +468,14 @@ void testSetJobManagerDeploymentReplicas() throws Exception {
assertThat(kubernetesJobManagerSpecification.getDeployment().getSpec().getReplicas())
.isEqualTo(JOBMANAGER_REPLICAS);
}

@Test
void testPrePreparedResourcesWontAffectOriginalProcess() throws IOException {
kubernetesJobManagerSpecification =
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
flinkPod, kubernetesJobManagerParameters);
final List<HasMetadata> prePreparedResources =
this.kubernetesJobManagerSpecification.getPrePreparedResources();
assertThat(prePreparedResources).hasSize(0);
}
}