Skip to content

Commit

Permalink
[FLINK-17869][task][checkpointing] Ignore out of order checkpoints in…
Browse files Browse the repository at this point in the history
… SubtaskCheckpointCoordinator

Check (by task thread) whether the current checkpoint was already aborted in the following scenario:
1. on checkpoint barrier ThreadSafeUnaligner sends a mail to start checkpointing (netty thread)
2. on cancellation marker CheckpointBarrierUnaligner aborts it (task thread)
3. task thread processes a mail to start checkpointing
  • Loading branch information
rkhachatryan authored and zhijiangW committed Jun 10, 2020
1 parent 2b51cda commit 64ff676
Showing 1 changed file with 19 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.BiFunctionWithException;

import org.slf4j.Logger;
Expand All @@ -57,10 +56,12 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -183,6 +184,16 @@ private static ChannelStateWriter openChannelStateWriter(String taskName, Checkp
@Override
public void abortCheckpointOnBarrier(long checkpointId, Throwable cause, OperatorChain<?, ?> operatorChain) throws IOException {
LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", checkpointId, taskName);
lastCheckpointId = Math.max(lastCheckpointId, checkpointId);
Iterator<Long> iterator = abortedCheckpointIds.iterator();
while (iterator.hasNext()) {
long next = iterator.next();
if (next < lastCheckpointId) {
iterator.remove();
} else {
break;
}
}

checkpointStorage.clearCacheFor(checkpointId);

Expand Down Expand Up @@ -221,9 +232,14 @@ public void checkpointState(
// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
// checkpoint alignments

if (lastCheckpointId >= metadata.getCheckpointId()) {
LOG.info("Out of order checkpoint barrier (aborted previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId());
channelStateWriter.abort(metadata.getCheckpointId(), new CancellationException(), true);
checkAndClearAbortedStatus(metadata.getCheckpointId());
return;
}

// Step (0): Record the last triggered checkpointId.
Preconditions.checkArgument(lastCheckpointId < metadata.getCheckpointId(), String.format(
"Unexpected current checkpoint-id: %s vs last checkpoint-id: %s", metadata.getCheckpointId(), lastCheckpointId));
lastCheckpointId = metadata.getCheckpointId();
if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {
LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", metadata.getCheckpointId());
Expand Down

0 comments on commit 64ff676

Please sign in to comment.