Skip to content

Commit

Permalink
[hotfix][test] Adds some additional logging to RescaleOnCheckpointITCase
Browse files Browse the repository at this point in the history
  • Loading branch information
XComp committed Sep 16, 2024
1 parent 0731f4f commit 621ce32
Showing 1 changed file with 34 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -40,6 +41,8 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Iterator;
Expand All @@ -51,6 +54,8 @@
@ExtendWith(TestLoggerExtension.class)
class RescaleOnCheckpointITCase {

private static final Logger LOG = LoggerFactory.getLogger(RescaleOnCheckpointITCase.class);

// Scaling down is used here because scaling up is not supported by the NumberSequenceSource
// that's used in this test.
private static final int NUMBER_OF_SLOTS = 4;
Expand Down Expand Up @@ -111,34 +116,59 @@ void testRescaleOnCheckpoint(
assertThat(jobVertexIterator.hasNext())
.as("There needs to be at least one JobVertex.")
.isTrue();
final JobVertexID jobVertexId = jobVertexIterator.next().getID();
final JobResourceRequirements jobResourceRequirements =
JobResourceRequirements.newBuilder()
.setParallelismForJobVertex(
jobVertexIterator.next().getID(), 1, AFTER_RESCALE_PARALLELISM)
.setParallelismForJobVertex(jobVertexId, 1, AFTER_RESCALE_PARALLELISM)
.build();
assertThat(jobVertexIterator.hasNext())
.as("This test expects to have only one JobVertex.")
.isFalse();

restClusterClient.submitJob(jobGraph).join();

final JobID jobId = jobGraph.getJobID();
try {
final JobID jobId = jobGraph.getJobID();

LOG.info(
"Waiting for job {} to reach parallelism of {} for vertex {}.",
jobId,
BEFORE_RESCALE_PARALLELISM,
jobVertexId);
waitForRunningTasks(restClusterClient, jobId, BEFORE_RESCALE_PARALLELISM);

LOG.info(
"Job {} reached parallelism of {} for vertex {}. Updating the vertex parallelism next to {}.",
jobId,
BEFORE_RESCALE_PARALLELISM,
jobVertexId,
AFTER_RESCALE_PARALLELISM);
restClusterClient.updateJobResourceRequirements(jobId, jobResourceRequirements).join();

// timeout to allow any unexpected rescaling to happen anyway
Thread.sleep(REQUIREMENT_UPDATE_TO_CHECKPOINT_GAP.toMillis());

// verify that the previous timeout didn't result in a change of parallelism
LOG.info(
"Checking that job {} hasn't changed its parallelism even after some delay, yet.",
jobId);
waitForRunningTasks(restClusterClient, jobId, BEFORE_RESCALE_PARALLELISM);

miniCluster.triggerCheckpoint(jobId);

LOG.info(
"Waiting for job {} to reach parallelism of {} for vertex {}.",
jobId,
AFTER_RESCALE_PARALLELISM,
jobVertexId);
waitForRunningTasks(restClusterClient, jobId, AFTER_RESCALE_PARALLELISM);

waitForAvailableSlots(restClusterClient, NUMBER_OF_SLOTS - AFTER_RESCALE_PARALLELISM);
final int expectedFreeSlotCount = NUMBER_OF_SLOTS - AFTER_RESCALE_PARALLELISM;
LOG.info(
"Waiting for {} slot(s) to become available due to the scale down.",
expectedFreeSlotCount);
waitForAvailableSlots(restClusterClient, expectedFreeSlotCount);
LOG.info("{} free slot(s) detected. Finishing test.", expectedFreeSlotCount);
} finally {
restClusterClient.cancel(jobGraph.getJobID()).join();
}
Expand Down

0 comments on commit 621ce32

Please sign in to comment.