Skip to content

Commit 2c6ad83

Browse files
authored
Pipe: Fixed the bug that the insertion for newer tsFile in one region may report progress beyond the older tsFile when it is not flushed (#15515) (#15596)
1 parent 02708ec commit 2c6ad83

File tree

7 files changed

+67
-61
lines changed

7 files changed

+67
-61
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
3131
import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
3232
import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainerProvider;
33-
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTimePartitionProgressIndexKeeper;
33+
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
3434
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
3535
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
3636
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
@@ -342,11 +342,8 @@ protected void reportProgress() {
342342

343343
public void eliminateProgressIndex() {
344344
if (Objects.isNull(overridingProgressIndex)) {
345-
PipeTimePartitionProgressIndexKeeper.getInstance()
346-
.eliminateProgressIndex(
347-
resource.getDataRegionId(),
348-
resource.getTimePartition(),
349-
resource.getMaxProgressIndexAfterClose());
345+
PipeTsFileEpochProgressIndexKeeper.getInstance()
346+
.eliminateProgressIndex(resource.getDataRegionId(), resource.getTsFilePath());
350347
}
351348
}
352349

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,11 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa
130130
return event.internallyDecreaseResourceReferenceCount(holderMessage);
131131
}
132132

133+
@Override
134+
public void bindProgressIndex(final ProgressIndex progressIndex) {
135+
event.bindProgressIndex(progressIndex);
136+
}
137+
133138
@Override
134139
public ProgressIndex getProgressIndex() {
135140
return event.getProgressIndex();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
3030
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
3131
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
32+
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
3233
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
3334
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
3435
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
@@ -471,6 +472,8 @@ private Event supplyTabletInsertion(final PipeRealtimeEvent event) {
471472
switch (state) {
472473
case USING_TSFILE:
473474
// If the state is USING_TSFILE, discard the event and poll the next one.
475+
PipeTsFileEpochProgressIndexKeeper.getInstance()
476+
.eliminateProgressIndex(dataRegionId, event.getTsFileEpoch().getFilePath());
474477
return null;
475478
case EMPTY:
476479
case USING_TABLET:

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
2727
import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
2828
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
29+
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
2930
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
3031
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
3132
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEventFactory;
@@ -61,7 +62,7 @@ public class PipeDataRegionAssigner implements Closeable {
6162

6263
private int counter = 0;
6364

64-
private final AtomicReference<ProgressIndex> maxProgressIndexForTsFileInsertionEvent =
65+
private final AtomicReference<ProgressIndex> maxProgressIndexForRealtimeEvent =
6566
new AtomicReference<>(MinimumProgressIndex.INSTANCE);
6667

6768
private final PipeEventCounter eventCounter = new PipeDataRegionEventCounter();
@@ -170,7 +171,11 @@ private void assignToExtractor(
170171
(PipeTsFileInsertionEvent) innerEvent;
171172
tsFileInsertionEvent.disableMod4NonTransferPipes(
172173
extractor.isShouldTransferModFile());
173-
bindOrUpdateProgressIndexForTsFileInsertionEvent(tsFileInsertionEvent);
174+
}
175+
176+
if (innerEvent instanceof PipeTsFileInsertionEvent
177+
|| innerEvent instanceof PipeInsertNodeTabletInsertionEvent) {
178+
bindOrUpdateProgressIndexForRealtimeEvent(copiedEvent);
174179
}
175180

176181
if (!copiedEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
@@ -183,25 +188,34 @@ private void assignToExtractor(
183188
});
184189
}
185190

186-
private void bindOrUpdateProgressIndexForTsFileInsertionEvent(
187-
final PipeTsFileInsertionEvent event) {
188-
if (PipeTimePartitionProgressIndexKeeper.getInstance()
191+
private void bindOrUpdateProgressIndexForRealtimeEvent(final PipeRealtimeEvent event) {
192+
if (PipeTsFileEpochProgressIndexKeeper.getInstance()
189193
.isProgressIndexAfterOrEquals(
190-
dataRegionId, event.getTimePartitionId(), event.forceGetProgressIndex())) {
191-
event.bindProgressIndex(maxProgressIndexForTsFileInsertionEvent.get());
194+
dataRegionId,
195+
event.getTsFileEpoch().getFilePath(),
196+
getProgressIndex4RealtimeEvent(event))) {
197+
event.bindProgressIndex(maxProgressIndexForRealtimeEvent.get());
192198
if (LOGGER.isDebugEnabled()) {
193199
LOGGER.debug(
194200
"Data region {} bind {} to event {} because it was flushed prematurely.",
195201
dataRegionId,
196-
maxProgressIndexForTsFileInsertionEvent,
202+
maxProgressIndexForRealtimeEvent,
197203
event.coreReportMessage());
198204
}
199205
} else {
200-
maxProgressIndexForTsFileInsertionEvent.updateAndGet(
201-
index -> index.updateToMinimumEqualOrIsAfterProgressIndex(event.forceGetProgressIndex()));
206+
maxProgressIndexForRealtimeEvent.updateAndGet(
207+
index ->
208+
index.updateToMinimumEqualOrIsAfterProgressIndex(
209+
getProgressIndex4RealtimeEvent(event)));
202210
}
203211
}
204212

213+
private ProgressIndex getProgressIndex4RealtimeEvent(final PipeRealtimeEvent event) {
214+
return event.getEvent() instanceof PipeTsFileInsertionEvent
215+
? ((PipeTsFileInsertionEvent) event.getEvent()).forceGetProgressIndex()
216+
: event.getProgressIndex();
217+
}
218+
205219
public void startAssignTo(final PipeRealtimeDataRegionExtractor extractor) {
206220
matcher.register(extractor);
207221
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeTimePartitionProgressIndexKeeper.java renamed to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java

Lines changed: 18 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -21,61 +21,44 @@
2121

2222
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
2323

24-
import org.apache.tsfile.utils.Pair;
25-
2624
import java.util.Map;
2725
import java.util.Map.Entry;
2826
import java.util.Objects;
2927
import java.util.concurrent.ConcurrentHashMap;
3028

31-
public class PipeTimePartitionProgressIndexKeeper {
29+
public class PipeTsFileEpochProgressIndexKeeper {
3230

33-
// data region id -> (time partition id, <max progress index, is valid>)
34-
private final Map<String, Map<Long, Pair<ProgressIndex, Boolean>>> progressIndexKeeper =
31+
// data region id -> (tsFile path, max progress index)
32+
private final Map<String, Map<String, ProgressIndex>> progressIndexKeeper =
3533
new ConcurrentHashMap<>();
3634

3735
public synchronized void updateProgressIndex(
38-
final String dataRegionId, final long timePartitionId, final ProgressIndex progressIndex) {
36+
final String dataRegionId, final String tsFileName, final ProgressIndex progressIndex) {
3937
progressIndexKeeper
4038
.computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
4139
.compute(
42-
timePartitionId,
43-
(k, v) -> {
44-
if (v == null) {
45-
return new Pair<>(progressIndex, true);
46-
}
47-
return new Pair<>(
48-
v.getLeft().updateToMinimumEqualOrIsAfterProgressIndex(progressIndex), true);
49-
});
40+
tsFileName,
41+
(k, v) ->
42+
v == null
43+
? progressIndex
44+
: v.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex));
5045
}
5146

5247
public synchronized void eliminateProgressIndex(
53-
final String dataRegionId, final long timePartitionId, final ProgressIndex progressIndex) {
48+
final String dataRegionId, final String filePath) {
5449
progressIndexKeeper
5550
.computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
56-
.compute(
57-
timePartitionId,
58-
(k, v) -> {
59-
if (v == null) {
60-
return null;
61-
}
62-
if (v.getRight() && !v.getLeft().isAfter(progressIndex)) {
63-
return new Pair<>(v.getLeft(), false);
64-
}
65-
return v;
66-
});
51+
.remove(filePath);
6752
}
6853

6954
public synchronized boolean isProgressIndexAfterOrEquals(
70-
final String dataRegionId, final long timePartitionId, final ProgressIndex progressIndex) {
55+
final String dataRegionId, final String tsFilePath, final ProgressIndex progressIndex) {
7156
return progressIndexKeeper
7257
.computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
7358
.entrySet()
7459
.stream()
75-
.filter(entry -> entry.getKey() != timePartitionId)
60+
.filter(entry -> !Objects.equals(entry.getKey(), tsFilePath))
7661
.map(Entry::getValue)
77-
.filter(pair -> pair.right)
78-
.map(Pair::getLeft)
7962
.filter(Objects::nonNull)
8063
.anyMatch(index -> !index.isAfter(progressIndex));
8164
}
@@ -84,19 +67,19 @@ public synchronized boolean isProgressIndexAfterOrEquals(
8467

8568
private static class PipeTimePartitionProgressIndexKeeperHolder {
8669

87-
private static final PipeTimePartitionProgressIndexKeeper INSTANCE =
88-
new PipeTimePartitionProgressIndexKeeper();
70+
private static final PipeTsFileEpochProgressIndexKeeper INSTANCE =
71+
new PipeTsFileEpochProgressIndexKeeper();
8972

9073
private PipeTimePartitionProgressIndexKeeperHolder() {
9174
// empty constructor
9275
}
9376
}
9477

95-
public static PipeTimePartitionProgressIndexKeeper getInstance() {
96-
return PipeTimePartitionProgressIndexKeeper.PipeTimePartitionProgressIndexKeeperHolder.INSTANCE;
78+
public static PipeTsFileEpochProgressIndexKeeper getInstance() {
79+
return PipeTsFileEpochProgressIndexKeeper.PipeTimePartitionProgressIndexKeeperHolder.INSTANCE;
9780
}
9881

99-
private PipeTimePartitionProgressIndexKeeper() {
82+
private PipeTsFileEpochProgressIndexKeeper() {
10083
// empty constructor
10184
}
10285
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpoch.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,20 +34,20 @@ public class TsFileEpoch {
3434
dataRegionExtractor2State;
3535
private final AtomicLong insertNodeMinTime;
3636

37-
public TsFileEpoch(String filePath) {
37+
public TsFileEpoch(final String filePath) {
3838
this.filePath = filePath;
3939
this.dataRegionExtractor2State = new ConcurrentHashMap<>();
4040
this.insertNodeMinTime = new AtomicLong(Long.MAX_VALUE);
4141
}
4242

43-
public TsFileEpoch.State getState(PipeRealtimeDataRegionExtractor extractor) {
43+
public TsFileEpoch.State getState(final PipeRealtimeDataRegionExtractor extractor) {
4444
return dataRegionExtractor2State
4545
.computeIfAbsent(extractor, o -> new AtomicReference<>(State.EMPTY))
4646
.get();
4747
}
4848

4949
public void migrateState(
50-
PipeRealtimeDataRegionExtractor extractor, TsFileEpochStateMigrator visitor) {
50+
final PipeRealtimeDataRegionExtractor extractor, final TsFileEpochStateMigrator visitor) {
5151
dataRegionExtractor2State
5252
.computeIfAbsent(extractor, o -> new AtomicReference<>(State.EMPTY))
5353
.getAndUpdate(visitor::migrate);
@@ -60,14 +60,18 @@ public void setExtractorsRecentProcessedTsFileEpochState() {
6060
.setRecentProcessedTsFileEpochState(extractor.getTaskID(), state.get()));
6161
}
6262

63-
public void updateInsertNodeMinTime(long newComingMinTime) {
63+
public void updateInsertNodeMinTime(final long newComingMinTime) {
6464
insertNodeMinTime.updateAndGet(recordedMinTime -> Math.min(recordedMinTime, newComingMinTime));
6565
}
6666

6767
public long getInsertNodeMinTime() {
6868
return insertNodeMinTime.get();
6969
}
7070

71+
public String getFilePath() {
72+
return filePath;
73+
}
74+
7175
@Override
7276
public String toString() {
7377
return "TsFileEpoch{"
@@ -90,7 +94,7 @@ public enum State {
9094

9195
private final int id;
9296

93-
State(int id) {
97+
State(final int id) {
9498
this.id = id;
9599
}
96100

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.apache.iotdb.db.conf.IoTDBConfig;
2929
import org.apache.iotdb.db.conf.IoTDBDescriptor;
3030
import org.apache.iotdb.db.exception.load.PartitionViolationException;
31-
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTimePartitionProgressIndexKeeper;
31+
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
3232
import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
3333
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
3434
import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.InsertionCompactionCandidateStatus;
@@ -1200,8 +1200,8 @@ public void updateProgressIndex(ProgressIndex progressIndex) {
12001200
? progressIndex
12011201
: maxProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex));
12021202

1203-
PipeTimePartitionProgressIndexKeeper.getInstance()
1204-
.updateProgressIndex(getDataRegionId(), getTimePartition(), maxProgressIndex);
1203+
PipeTsFileEpochProgressIndexKeeper.getInstance()
1204+
.updateProgressIndex(getDataRegionId(), getTsFilePath(), maxProgressIndex);
12051205
}
12061206

12071207
public void setProgressIndex(ProgressIndex progressIndex) {
@@ -1211,8 +1211,8 @@ public void setProgressIndex(ProgressIndex progressIndex) {
12111211

12121212
maxProgressIndex = progressIndex;
12131213

1214-
PipeTimePartitionProgressIndexKeeper.getInstance()
1215-
.updateProgressIndex(getDataRegionId(), getTimePartition(), maxProgressIndex);
1214+
PipeTsFileEpochProgressIndexKeeper.getInstance()
1215+
.updateProgressIndex(getDataRegionId(), getTsFilePath(), maxProgressIndex);
12161216
}
12171217

12181218
public ProgressIndex getMaxProgressIndexAfterClose() throws IllegalStateException {

0 commit comments

Comments
 (0)