Skip to content

Commit 1174ce7

Browse files
Pipe: Enable flushing storage engine after last terminate event reported (#15465) (#15481)
Co-authored-by: Steve Yurong Su <rong@apache.org>
1 parent 41dea10 commit 1174ce7

File tree

5 files changed

+98
-0
lines changed

5 files changed

+98
-0
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.iotdb.commons.utils.TestOnly;
3636
import org.apache.iotdb.db.conf.IoTDBDescriptor;
3737
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
38+
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
3839
import org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningQueue;
3940
import org.apache.iotdb.db.pipe.resource.PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner;
4041
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
@@ -89,6 +90,11 @@ public synchronized void start() throws StartupException {
8990
"PipeTaskAgent#restartAllStuckPipes",
9091
PipeDataNodeAgent.task()::restartAllStuckPipes,
9192
PipeConfig.getInstance().getPipeStuckRestartIntervalSeconds());
93+
registerPeriodicalJob(
94+
"PipeTaskAgent#flushDataRegionIfNeeded",
95+
PipeTerminateEvent::flushDataRegionIfNeeded,
96+
PipeConfig.getInstance().getPipeFlushAfterLastTerminateSeconds());
97+
9298
pipePeriodicalJobExecutor.start();
9399

94100
if (PipeConfig.getInstance().getPipeEventReferenceTrackingEnabled()) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/terminate/PipeTerminateEvent.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,23 @@
1919

2020
package org.apache.iotdb.db.pipe.event.common.terminate;
2121

22+
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
2223
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
2324
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
2425
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
26+
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2527
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
2628
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2729
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
2830
import org.apache.iotdb.db.pipe.agent.task.PipeDataNodeTask;
2931
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
32+
import org.apache.iotdb.db.storageengine.StorageEngine;
33+
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
3036

3137
import java.util.concurrent.CompletableFuture;
38+
import java.util.concurrent.atomic.AtomicLong;
3239

3340
/**
3441
* The {@link PipeTerminateEvent} is an {@link EnrichedEvent} that controls the termination of pipe,
@@ -37,6 +44,41 @@
3744
* be discarded.
3845
*/
3946
public class PipeTerminateEvent extends EnrichedEvent {
47+
48+
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTerminateEvent.class);
49+
50+
private static final AtomicLong PROGRESS_REPORT_COUNT = new AtomicLong(0);
51+
private static final AtomicLong LAST_PROGRESS_REPORT_TIME = new AtomicLong(0);
52+
53+
public static void flushDataRegionIfNeeded() {
54+
if (PROGRESS_REPORT_COUNT.get() > 0
55+
&& PROGRESS_REPORT_COUNT.get()
56+
> PipeConfig.getInstance().getPipeFlushAfterTerminateCount()) {
57+
flushDataRegion();
58+
return;
59+
}
60+
61+
if (LAST_PROGRESS_REPORT_TIME.get() > 0
62+
&& System.currentTimeMillis() - LAST_PROGRESS_REPORT_TIME.get()
63+
> PipeConfig.getInstance().getPipeFlushAfterLastTerminateSeconds() * 1000L) {
64+
flushDataRegion();
65+
}
66+
}
67+
68+
private static void flushDataRegion() {
69+
try {
70+
StorageEngine.getInstance().operateFlush(new TFlushReq());
71+
PROGRESS_REPORT_COUNT.set(0);
72+
LAST_PROGRESS_REPORT_TIME.set(0);
73+
LOGGER.info("Force flush all data regions because of last progress report time.");
74+
} catch (final Exception e) {
75+
LOGGER.warn(
76+
"Failed to flush all data regions, please check the error message: {}",
77+
e.getMessage(),
78+
e);
79+
}
80+
}
81+
4082
private final int dataRegionId;
4183

4284
public PipeTerminateEvent(
@@ -93,6 +135,9 @@ public boolean mayEventPathsOverlappedWithPattern() {
93135

94136
@Override
95137
public void reportProgress() {
138+
PROGRESS_REPORT_COUNT.incrementAndGet();
139+
LAST_PROGRESS_REPORT_TIME.set(System.currentTimeMillis());
140+
96141
// To avoid deadlock
97142
CompletableFuture.runAsync(
98143
() -> PipeDataNodeAgent.task().markCompleted(pipeName, dataRegionId));

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,8 @@ public class CommonConfig {
273273
private long pipeStuckRestartIntervalSeconds = 120;
274274
private long pipeStuckRestartMinIntervalMs = 5 * 60 * 1000L; // 5 minutes
275275
private boolean pipeEpochKeepTsFileAfterStuckRestartEnabled = false;
276+
private long pipeFlushAfterLastTerminateSeconds = 30;
277+
private long pipeFlushAfterTerminateCount = 30;
276278
private long pipeStorageEngineFlushTimeIntervalMs = Long.MAX_VALUE;
277279

278280
private int pipeMetaReportMaxLogNumPerRound = 10;
@@ -1478,6 +1480,31 @@ public void setPipeStorageEngineFlushTimeIntervalMs(long pipeStorageEngineFlushT
14781480
"pipeStorageEngineFlushTimeIntervalMs is set to {}", pipeStorageEngineFlushTimeIntervalMs);
14791481
}
14801482

1483+
public long getPipeFlushAfterLastTerminateSeconds() {
1484+
return pipeFlushAfterLastTerminateSeconds;
1485+
}
1486+
1487+
public void setPipeFlushAfterLastTerminateSeconds(long pipeFlushAfterLastTerminateSeconds) {
1488+
if (this.pipeFlushAfterLastTerminateSeconds == pipeFlushAfterLastTerminateSeconds) {
1489+
return;
1490+
}
1491+
this.pipeFlushAfterLastTerminateSeconds = pipeFlushAfterLastTerminateSeconds;
1492+
logger.info(
1493+
"pipeFlushAfterLastTerminateSeconds is set to {}", pipeFlushAfterLastTerminateSeconds);
1494+
}
1495+
1496+
public long getPipeFlushAfterTerminateCount() {
1497+
return pipeFlushAfterTerminateCount;
1498+
}
1499+
1500+
public void setPipeFlushAfterTerminateCount(long pipeFlushAfterTerminateCount) {
1501+
if (this.pipeFlushAfterTerminateCount == pipeFlushAfterTerminateCount) {
1502+
return;
1503+
}
1504+
this.pipeFlushAfterTerminateCount = pipeFlushAfterTerminateCount;
1505+
logger.info("pipeFlushAfterTerminateCount is set to {}", pipeFlushAfterTerminateCount);
1506+
}
1507+
14811508
public int getPipeMetaReportMaxLogNumPerRound() {
14821509
return pipeMetaReportMaxLogNumPerRound;
14831510
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,14 @@ public boolean isPipeEpochKeepTsFileAfterStuckRestartEnabled() {
303303
return COMMON_CONFIG.isPipeEpochKeepTsFileAfterStuckRestartEnabled();
304304
}
305305

306+
public long getPipeFlushAfterTerminateCount() {
307+
return COMMON_CONFIG.getPipeFlushAfterTerminateCount();
308+
}
309+
310+
public long getPipeFlushAfterLastTerminateSeconds() {
311+
return COMMON_CONFIG.getPipeFlushAfterLastTerminateSeconds();
312+
}
313+
306314
public long getPipeStorageEngineFlushTimeIntervalMs() {
307315
return COMMON_CONFIG.getPipeStorageEngineFlushTimeIntervalMs();
308316
}
@@ -531,6 +539,8 @@ public void printAllConfigs() {
531539
LOGGER.info(
532540
"PipeEpochKeepTsFileAfterStuckRestartEnabled: {}",
533541
isPipeEpochKeepTsFileAfterStuckRestartEnabled());
542+
LOGGER.info("PipeFlushAfterTerminateCount: {}", getPipeFlushAfterTerminateCount());
543+
LOGGER.info("PipeFlushAfterLastTerminateSeconds: {}", getPipeFlushAfterLastTerminateSeconds());
534544
LOGGER.info(
535545
"PipeStorageEngineFlushTimeIntervalMs: {}", getPipeStorageEngineFlushTimeIntervalMs());
536546

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,16 @@ public static void loadPipeInternalConfig(CommonConfig config, TrimProperties pr
431431
properties.getProperty(
432432
"pipe_stuck_restart_min_interval_ms",
433433
String.valueOf(config.getPipeStuckRestartMinIntervalMs()))));
434+
config.setPipeFlushAfterLastTerminateSeconds(
435+
Long.parseLong(
436+
properties.getProperty(
437+
"pipe_flush_after_last_terminate_seconds",
438+
String.valueOf(config.getPipeFlushAfterLastTerminateSeconds()))));
439+
config.setPipeFlushAfterTerminateCount(
440+
Long.parseLong(
441+
properties.getProperty(
442+
"pipe_flush_after_terminate_count",
443+
String.valueOf(config.getPipeFlushAfterTerminateCount()))));
434444
config.setPipeEpochKeepTsFileAfterStuckRestartEnabled(
435445
Boolean.parseBoolean(
436446
properties.getProperty(

0 commit comments

Comments
 (0)