Skip to content

Commit e3dda34

Browse files
authored
Subscription: fully managed tsfile parsing process for tsfile format topic (#15524) (#15529)
1 parent 7d3efb2 commit e3dda34

File tree

4 files changed

+36
-15
lines changed

4 files changed

+36
-15
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
5050
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
5151
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TABLET_VALUE;
52-
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TS_FILE_VALUE;
5352
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY;
5453
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_DEFAULT_VALUE;
5554
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_KEY;
@@ -146,15 +145,7 @@ public PipeDataNodeTask build() {
146145
Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
147146
CONNECTOR_FORMAT_HYBRID_VALUE)
148147
.equals(CONNECTOR_FORMAT_TABLET_VALUE),
149-
PipeType.SUBSCRIPTION.equals(pipeType)
150-
&&
151-
// should not skip parsing when the format is tsfile
152-
!pipeStaticMeta
153-
.getConnectorParameters()
154-
.getStringOrDefault(
155-
Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
156-
CONNECTOR_FORMAT_HYBRID_VALUE)
157-
.equals(CONNECTOR_FORMAT_TS_FILE_VALUE));
148+
PipeType.SUBSCRIPTION.equals(pipeType));
158149

159150
return new PipeDataNodeTask(
160151
pipeStaticMeta.getPipeName(), regionId, extractorStage, processorStage, connectorStage);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ private void parseAndCollectEvent(final PipeTsFileInsertionEvent sourceEvent) th
134134
return;
135135
}
136136

137-
if (!forceTabletFormat && !sourceEvent.shouldParseTimeOrPattern()) {
137+
if (!forceTabletFormat && canSkipParsing4TsFileEvent(sourceEvent)) {
138138
collectEvent(sourceEvent);
139139
return;
140140
}
@@ -148,6 +148,10 @@ private void parseAndCollectEvent(final PipeTsFileInsertionEvent sourceEvent) th
148148
}
149149
}
150150

151+
public static boolean canSkipParsing4TsFileEvent(final PipeTsFileInsertionEvent sourceEvent) {
152+
return !sourceEvent.shouldParseTimeOrPattern();
153+
}
154+
151155
private void collectParsedRawTableEvent(final PipeRawTabletInsertionEvent parsedEvent) {
152156
if (!parsedEvent.hasNoNeedParsingAndIsEmpty()) {
153157
hasNoGeneratedEvent = false;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
package org.apache.iotdb.db.subscription.broker;
2121

22+
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2223
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
24+
import org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector;
2325
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
2426
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
2527
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
@@ -245,6 +247,10 @@ public SubscriptionEvent pollTsFile(
245247

246248
@Override
247249
protected boolean onEvent(final TsFileInsertionEvent event) {
250+
if (!PipeEventCollector.canSkipParsing4TsFileEvent((PipeTsFileInsertionEvent) event)) {
251+
return batches.onEvent((EnrichedEvent) event, this::prefetchEvent);
252+
}
253+
248254
final SubscriptionCommitContext commitContext = generateSubscriptionCommitContext();
249255
final SubscriptionEvent ev =
250256
new SubscriptionEvent(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2323
import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
24+
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
2425
import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTsFileQueue;
2526
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
2627
import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFileBatchEvents;
@@ -81,10 +82,29 @@ protected void onTabletInsertionEvent(final TabletInsertionEvent event) {
8182

8283
@Override
8384
protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) {
84-
LOGGER.warn(
85-
"SubscriptionPipeTsFileEventBatch {} ignore TsFileInsertionEvent {} when batching.",
86-
this,
87-
event);
85+
// TODO: parse tsfile event on the fly like SubscriptionPipeTabletEventBatch
86+
try {
87+
for (final TabletInsertionEvent parsedEvent : event.toTabletInsertionEvents()) {
88+
if (!((PipeRawTabletInsertionEvent) parsedEvent)
89+
.increaseReferenceCount(this.getClass().getName())) {
90+
LOGGER.warn(
91+
"SubscriptionPipeTsFileEventBatch: Failed to increase the reference count of event {}, skipping it.",
92+
((PipeRawTabletInsertionEvent) parsedEvent).coreReportMessage());
93+
} else {
94+
try {
95+
batch.onEvent(parsedEvent);
96+
} catch (final Exception ignored) {
97+
// no exceptions will be thrown
98+
}
99+
}
100+
}
101+
} finally {
102+
try {
103+
event.close();
104+
} catch (final Exception ignored) {
105+
// no exceptions will be thrown
106+
}
107+
}
88108
}
89109

90110
@Override

0 commit comments

Comments
 (0)