Skip to content

Commit

Permalink
[FLINK-28829] Support prepreparing K8S resources before JM creation
Browse files Browse the repository at this point in the history
In this PR, we introduces several things below:
1. We introduce a new interface func for all decorators to support
   create pre-prepared k8s resources.
2. We extend a attribute for JM spec for storing the resources list.
3. Extending the ability for supporting refresh the preprepared
   resource's ownerreference based on original code logic.
4. Add the ability for creating K8S resource before JM deployment
   creation in Fabric client.
  • Loading branch information
bzhaoopenstack committed Sep 29, 2022
1 parent e1e9ee8 commit 211d0ef
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ public Fabric8FlinkKubeClient(
public void createJobManagerComponent(KubernetesJobManagerSpecification kubernetesJMSpec) {
final Deployment deployment = kubernetesJMSpec.getDeployment();
final List<HasMetadata> accompanyingResources = kubernetesJMSpec.getAccompanyingResources();
final List<HasMetadata> prePreparedResources = kubernetesJMSpec.getPrePreparedResources();

// before create Deployment
this.internalClient.resourceList(prePreparedResources).createOrReplace();

// create Deployment
LOG.debug(
Expand All @@ -121,6 +125,8 @@ public void createJobManagerComponent(KubernetesJobManagerSpecification kubernet
final Deployment createdDeployment =
this.internalClient.apps().deployments().create(deployment);

// Add all prepared AccompanyingResources to refresh owner reference
accompanyingResources.addAll(prePreparedResources);
// Note that we should use the uid of the created Deployment for the OwnerReference.
setOwnerReference(createdDeployment, accompanyingResources);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,15 @@ public class KubernetesJobManagerSpecification {

private List<HasMetadata> accompanyingResources;

private List<HasMetadata> prePreparedResources;

public KubernetesJobManagerSpecification(
Deployment deployment, List<HasMetadata> accompanyingResources) {
Deployment deployment,
List<HasMetadata> accompanyingResources,
List<HasMetadata> prePreparedResources) {
this.deployment = deployment;
this.accompanyingResources = accompanyingResources;
this.prePreparedResources = prePreparedResources;
}

public Deployment getDeployment() {
Expand All @@ -43,4 +48,8 @@ public Deployment getDeployment() {
public List<HasMetadata> getAccompanyingResources() {
return accompanyingResources;
}

public List<HasMetadata> getPrePreparedResources() {
return prePreparedResources;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,9 @@ public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException {
return Collections.emptyList();
}

@Override
public List<HasMetadata> buildPrePreparedResources() {
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ public interface KubernetesStepDecorator {
* feature. This could only be applicable on the client-side submission process.
*/
List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException;

/** Build the Kubernetes resources before Flink Job Manager deployment creation. */
List<HasMetadata> buildPrePreparedResources();
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public static KubernetesJobManagerSpecification buildKubernetesJobManagerSpecifi
throws IOException {
FlinkPod flinkPod = Preconditions.checkNotNull(podTemplate).copy();
List<HasMetadata> accompanyingResources = new ArrayList<>();
List<HasMetadata> prePreparedResources = new ArrayList<>();

final KubernetesStepDecorator[] stepDecorators =
new KubernetesStepDecorator[] {
Expand All @@ -76,14 +77,16 @@ public static KubernetesJobManagerSpecification buildKubernetesJobManagerSpecifi
};

for (KubernetesStepDecorator stepDecorator : stepDecorators) {
prePreparedResources.addAll(stepDecorator.buildPrePreparedResources());
flinkPod = stepDecorator.decorateFlinkPod(flinkPod);
accompanyingResources.addAll(stepDecorator.buildAccompanyingKubernetesResources());
}

final Deployment deployment =
createJobManagerDeployment(flinkPod, kubernetesJobManagerParameters);

return new KubernetesJobManagerSpecification(deployment, accompanyingResources);
return new KubernetesJobManagerSpecification(
deployment, accompanyingResources, prePreparedResources);
}

private static Deployment createJobManagerDeployment(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,4 +630,37 @@ private KubernetesConfigMap buildTestingConfigMap() {
.withData(data)
.build());
}

@Test
void testMockPrePreparedResources() throws Exception {
// regenerate a test JM spec
Deployment testDeployment = kubernetesJobManagerSpecification.getDeployment();
List<HasMetadata> testAccompanyingResources =
kubernetesJobManagerSpecification.getAccompanyingResources();
List<HasMetadata> mockPrePreparedResources = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Pod mockPod =
new PodBuilder()
.editOrNewMetadata()
.withName(String.format("mock-pod-preprepared-resource%d", i))
.endMetadata()
.editOrNewSpec()
.endSpec()
.build();
mockPrePreparedResources.add(mockPod);
}
KubernetesJobManagerSpecification testKubernetesJobManagerSpecification =
new KubernetesJobManagerSpecification(
testDeployment, testAccompanyingResources, mockPrePreparedResources);
flinkKubeClient.createJobManagerComponent(testKubernetesJobManagerSpecification);

// check the preprepared resources had been created.
final List<Pod> resultPods = kubeClient.pods().inNamespace(NAMESPACE).list().getItems();
assertThat(resultPods).hasSize(3);

final List<Deployment> resultedDeployments =
kubeClient.apps().deployments().inNamespace(NAMESPACE).list().getItems();
// check resource owner reference had been refreshed.
testOwnerReferenceSetting(resultedDeployments.get(0), resultPods);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -468,4 +468,15 @@ void testSetJobManagerDeploymentReplicas() throws Exception {
assertThat(kubernetesJobManagerSpecification.getDeployment().getSpec().getReplicas())
.isEqualTo(JOBMANAGER_REPLICAS);
}

@Test
void testPrePreparedResourcesWontAffectOriginalProcess() throws IOException {
kubernetesJobManagerSpecification =
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
flinkPod, kubernetesJobManagerParameters);
final List<HasMetadata> prePreparedResources =
this.kubernetesJobManagerSpecification.getPrePreparedResources();
assertThat(prePreparedResources).hasSize(0);
assertThat(prePreparedResources).isEmpty();
}
}

0 comments on commit 211d0ef

Please sign in to comment.