Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,30 @@ private boolean isSnapshotMode(final PipeParameters parameters) {
return isSnapshotMode;
}

///////////////////////// Shutdown Logic /////////////////////////

public void persistAllProgressIndexLocally() {
if (!PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
LOGGER.info(
"Pipe progress index persist disabled. Skipping persist all progress index locally.");
return;
}
if (!tryReadLockWithTimeOut(10)) {
LOGGER.info("Failed to persist all progress index locally because of timeout.");
return;
}
try {
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
pipeMeta.getRuntimeMeta().persistProgressIndex();
}
LOGGER.info("Persist all progress index locally successfully.");
} catch (final Exception e) {
LOGGER.warn("Failed to record all progress index locally, because {}.", e.getMessage(), e);
} finally {
releaseReadLock();
}
}

///////////////////////// Pipe Consensus /////////////////////////

public ProgressIndex getPipeTaskProgressIndex(final String pipeName, final int consensusGroupId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ public void run() {
triggerSnapshotForAllDataRegion();
}

// Persist progress index before shutdown to accurate recovery after restart
PipeDataNodeAgent.task().persistAllProgressIndexLocally();
// Shutdown all consensus pipe's receiver
PipeDataNodeAgent.receiver().pipeConsensus().closeReceiverExecutor();
// Shutdown pipe progressIndex background service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,15 @@ public void setIsStoppedByRuntimeException(boolean isStoppedByRuntimeException)
this.isStoppedByRuntimeException.set(isStoppedByRuntimeException);
}

public void persistProgressIndex() {
// Iterate through all the task metas and persist their progress index
for (final PipeTaskMeta taskMeta : consensusGroupId2TaskMetaMap.values()) {
if (taskMeta.getProgressIndex() != null) {
taskMeta.persistProgressIndex();
}
}
}

/**
* We use negative regionId to identify the external pipe source, which is not a consensus group
* id. Then we can reuse the regionId to schedule the external pipe source and store the progress
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,34 +113,31 @@ public ProgressIndex getProgressIndex() {
public ProgressIndex updateProgressIndex(final ProgressIndex updateIndex) {
// only pipeTaskMeta that need to updateProgressIndex will persist progress index
// isRegisterPersistTask is used to avoid multiple threads registering persist task concurrently
if (Objects.nonNull(progressIndexPersistFile)
&& !isRegisterPersistTask.getAndSet(true)
if (PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()
&& this.persistProgressIndexFuture == null
&& PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
&& !isRegisterPersistTask.getAndSet(true)) {
this.persistProgressIndexFuture =
PipePeriodicalJobExecutor.submitBackgroundJob(
() -> {
if (PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
persistProgressIndex();
}
},
this::persistProgressIndex,
0,
PipeConfig.getInstance().getPipeProgressIndexFlushIntervalMs());
}

progressIndex.updateAndGet(
index -> index.updateToMinimumEqualOrIsAfterProgressIndex(updateIndex));
if (Objects.nonNull(progressIndexPersistFile)
&& updateCount.incrementAndGet() - lastPersistCount.get() > checkPointGap
&& PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {

if (PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()
&& updateCount.incrementAndGet() - lastPersistCount.get() > checkPointGap) {
persistProgressIndex();
}

return progressIndex.get();
}

private synchronized void persistProgressIndex() {
if (lastPersistCount.get() == updateCount.get()) {
// in case of multiple threads calling updateProgressIndex at the same time
public synchronized void persistProgressIndex() {
if (Objects.isNull(progressIndexPersistFile)
// in case of multiple threads calling updateProgressIndex at the same time
|| lastPersistCount.get() == updateCount.get()) {
return;
}

Expand Down
Loading