Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,15 @@ public UnalignedCheckpointITCase(

@Test
public void execute() throws Exception {
execute(settings);
// Phase 1: Run with WAIT_FOR_CHECKPOINT_AND_CANCEL to produce a checkpoint
settings.setCheckpointGenerationMode(
CheckpointGenerationMode.WAIT_FOR_CHECKPOINT_AND_CANCEL);
String checkpointPath = super.execute(settings);

// Phase 2: Restore from the checkpoint and run normally
settings.setCheckpointGenerationMode(CheckpointGenerationMode.NONE);
settings.setRestoreCheckpoint(checkpointPath);
super.execute(settings);
}

protected void checkCounters(JobExecutionResult result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;

import static org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks;
import static org.apache.flink.util.Preconditions.checkState;
Expand Down Expand Up @@ -93,7 +94,7 @@ public void create(
0,
sourceSleepMillis,
val -> true);
addFailingSink(source, minCheckpoints, slotSharing);
addFailingSink(source, minCheckpoints, slotSharing, expectedRestarts);
}
},

Expand Down Expand Up @@ -132,7 +133,7 @@ public void create(
slotSharing ? "default" : ("min" + inputIndex));
}

addFailingSink(combinedSource, minCheckpoints, slotSharing);
addFailingSink(combinedSource, minCheckpoints, slotSharing, expectedRestarts);
}
},

Expand Down Expand Up @@ -174,7 +175,7 @@ public void create(
.process(new TestKeyedCoProcessFunction())
.setParallelism(parallelism);

addFailingSink(connected, minCheckpoints, slotSharing);
addFailingSink(connected, minCheckpoints, slotSharing, expectedRestarts);
}
},

Expand Down Expand Up @@ -204,7 +205,7 @@ public void create(
combinedSource = combinedSource == null ? source : combinedSource.union(source);
}

addFailingSink(combinedSource, minCheckpoints, slotSharing);
addFailingSink(combinedSource, minCheckpoints, slotSharing, expectedRestarts);
}
},

Expand Down Expand Up @@ -254,7 +255,7 @@ public void create(
.process(new TestBroadcastProcessFunction())
.setParallelism(2 * parallelism);

addFailingSink(joined, minCheckpoints, slotSharing);
addFailingSink(joined, minCheckpoints, slotSharing, expectedRestarts);
}
},

Expand Down Expand Up @@ -330,7 +331,7 @@ public void create(
.process(new TestKeyedBroadcastProcessFunction())
.setParallelism(parallelism + 2);

addFailingSink(joined, minCheckpoints, slotSharing);
addFailingSink(joined, minCheckpoints, slotSharing, expectedRestarts);
}
},
CUSTOM_PARTITIONER {
Expand Down Expand Up @@ -367,7 +368,7 @@ public String map(Long value) throws Exception {
})
.name("long-to-string-map")
.uid("long-to-string-map")
.map(getFailingMapper(minCheckpoints))
.map(getFailingMapper(minCheckpoints, expectedRestarts))
.name("failing-map")
.uid("failing-map")
.setParallelism(parallelism)
Expand All @@ -385,10 +386,13 @@ private String buildString(long partition, long index) {
};

void addFailingSink(
DataStream<Long> combinedSource, long minCheckpoints, boolean slotSharing) {
DataStream<Long> combinedSource,
long minCheckpoints,
boolean slotSharing,
int expectedRestarts) {
combinedSource
.shuffle()
.map(getFailingMapper(minCheckpoints))
.map(getFailingMapper(minCheckpoints, expectedRestarts))
.name("failing-map")
.uid("failing-map")
.slotSharingGroup(slotSharing ? "default" : "failing-map")
Expand All @@ -408,13 +412,21 @@ void addFailingSink(
/**
* Creates a FailingMapper that only fails during snapshot operations.
*
* <p>Only fails during snapshotState() when completedCheckpoints >= minCheckpoints/2 AND
* runNumber == 0. After job failovers internally, runNumber becomes attemptNumber > 0, so
* failure condition is no longer satisfied. This ensures the mapper fails exactly once
* during initial run to trigger job failover, but never fails again after failing over and
* recovery from checkpoint.
* <p>When {@code expectedRestarts <= 0}, returns a no-op FailingMapper that never fails.
* This is used in phases where no failure is expected (e.g., checkpoint-and-cancel phase).
*
* <p>When {@code expectedRestarts > 0}, fails during snapshotState() when
* completedCheckpoints >= minCheckpoints/2 AND runNumber == 0. After job failovers
* internally, runNumber becomes attemptNumber > 0, so failure condition is no longer
* satisfied. This ensures the mapper fails exactly once during initial run to trigger job
* failover, but never fails again after recovery from checkpoint.
*/
private static <T> FailingMapper<T> getFailingMapper(long minCheckpoints) {
private static <T> FailingMapper<T> getFailingMapper(
long minCheckpoints, int expectedRestarts) {
if (expectedRestarts <= 0) {
return new FailingMapper<>(
state -> false, state -> false, state -> false, state -> false);
}
return new FailingMapper<>(
state -> false,
state ->
Expand Down Expand Up @@ -549,7 +561,8 @@ public static Object[][] getScaleFactors() {
// captured in-flight records, see FLINK-31963.
Object[][] parameters =
new Object[][] {
new Object[] {"downscale", Topology.CUSTOM_PARTITIONER, 3, 2, 0L},
// Disable CUSTOM_PARTITIONER since it does not work well, see FLINK-39162
// new Object[] {"downscale", Topology.CUSTOM_PARTITIONER, 3, 2, 0L},
new Object[] {"downscale", Topology.KEYED_DIFFERENT_PARALLELISM, 12, 7, 0L},
new Object[] {"upscale", Topology.KEYED_DIFFERENT_PARALLELISM, 7, 12, 0L},
new Object[] {"downscale", Topology.KEYED_DIFFERENT_PARALLELISM, 5, 3, 5L},
Expand Down Expand Up @@ -628,25 +641,45 @@ public UnalignedCheckpointRescaleITCase(
*/
@Test
public void shouldRescaleUnalignedCheckpoint() throws Exception {
// Phase 1: prescale - generate initial checkpoint (unchanged)
final UnalignedSettings prescaleSettings =
new UnalignedSettings(topology)
.setParallelism(oldParallelism)
.setExpectedFailures(1)
.setSourceSleepMs(sourceSleepMs)
.setExpectedFinalJobStatus(JobStatus.FAILED);
prescaleSettings.setGenerateCheckpoint(true);
final String checkpointDir = super.execute(prescaleSettings);
assertThat(checkpointDir)
prescaleSettings.setCheckpointGenerationMode(CheckpointGenerationMode.WAIT_FOR_JOB_RESULT);
final String checkpointDir1 = super.execute(prescaleSettings);
assertThat(checkpointDir1)
.as("First job must generate a checkpoint for rescale test to be valid.")
.isNotNull();
// resume
final UnalignedSettings postscaleSettings =

// Phase 2: postscale-checkpoint - recover from checkpoint1 and generate new checkpoint
// expectedFailures defaults to 0, so expectedRestarts passed to create() is also 0,
// which causes getFailingMapper to return a no-op mapper that never fails.
final UnalignedSettings phase2Settings =
new UnalignedSettings(topology)
.setParallelism(newParallelism)
.setCheckpointGenerationMode(
CheckpointGenerationMode.WAIT_FOR_CHECKPOINT_AND_CANCEL)
.setRestoreCheckpoint(checkpointDir1)
.setSourceSleepMs(sourceSleepMs);
final String checkpointDir2 = super.execute(phase2Settings);
assertThat(checkpointDir2)
.as("Phase 2 must generate a checkpoint for phase 3 to be valid.")
.isNotNull();

// Phase 3: recovery - recover from checkpoint2 and run to completion
// Randomly choose parallelism from oldParallelism or newParallelism
int phase3Parallelism =
ThreadLocalRandom.current().nextBoolean() ? oldParallelism : newParallelism;
final UnalignedSettings phase3Settings =
new UnalignedSettings(topology)
.setParallelism(phase3Parallelism)
.setExpectedFailures(1)
.setRestoreCheckpoint(checkpointDir2)
.setExpectedFinalJobStatus(JobStatus.FINISHED);
postscaleSettings.setRestoreCheckpoint(checkpointDir);
super.execute(postscaleSettings);
super.execute(phase3Settings);
}

protected void checkCounters(JobExecutionResult result) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,19 +121,32 @@ public void testRescaleFromUnalignedCheckpoint() throws Exception {

CommonTestUtils.waitForJobStatus(jobClient1, Collections.singletonList(JobStatus.RUNNING));
CommonTestUtils.waitForAllTaskRunning(miniCluster, jobClient1.getJobID(), false);
String checkpointPath =
String checkpointPath1 =
CommonTestUtils.waitForCheckpointWithInflightBuffers(
jobClient1.getJobID(), miniCluster);
jobClient1.cancel().get();

// Step 2: Restore the job with a different parallelism
JobClient jobClient2 =
executeJobViaEnv.executeJob(getUnalignedCheckpointEnv(checkpointPath));
executeJobViaEnv.executeJob(getUnalignedCheckpointEnv(checkpointPath1));

CommonTestUtils.waitForJobStatus(jobClient2, Collections.singletonList(JobStatus.RUNNING));
CommonTestUtils.waitForAllTaskRunning(miniCluster, jobClient2.getJobID(), false);
CommonTestUtils.waitForCheckpointWithInflightBuffers(jobClient2.getJobID(), miniCluster);
String checkpointPath2 =
CommonTestUtils.waitForCheckpointWithInflightBuffers(
jobClient2.getJobID(), miniCluster);
jobClient2.cancel().get();

// Step 3: Restore from Step 2's checkpoint with random parallelism. This validates
// that a checkpoint produced after recovery can be used for another recovery.
JobClient jobClient3 =
executeJobViaEnv.executeJob(getUnalignedCheckpointEnv(checkpointPath2));

CommonTestUtils.waitForJobStatus(jobClient3, Collections.singletonList(JobStatus.RUNNING));
CommonTestUtils.waitForAllTaskRunning(miniCluster, jobClient3.getJobID(), false);
// Wait for at least one checkpoint to verify the recovery was successful
CommonTestUtils.waitForCheckpointWithInflightBuffers(jobClient3.getJobID(), miniCluster);
jobClient3.cancel().get();
}

private StreamExecutionEnvironment getUnalignedCheckpointEnv(@Nullable String recoveryPath)
Expand Down
Loading