Skip to content

Commit cb03ad8

Browse files
authored
Subscription: fix consumer infinite pulling event & fully managed tsfile parsing process & increase the reference count for subscribed parsed raw tablet event & disrupt parsing requests through the introduction of randomness & disable prefetch by default (#14856)
1 parent 3cc339b commit cb03ad8

File tree

13 files changed

+153
-40
lines changed

13 files changed

+153
-40
lines changed

iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,12 @@ public Map<String, String> getAttributesWithProcessorPrefix() {
194194
/////////////////////////////// connector attributes mapping ///////////////////////////////
195195

196196
public Map<String, String> getAttributesWithSinkFormat() {
197-
return SINK_HYBRID_FORMAT_CONFIG; // default to hybrid
197+
// refer to
198+
// org.apache.iotdb.db.pipe.agent.task.connection.PipeEventCollector.parseAndCollectEvent(org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent)
199+
return TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equalsIgnoreCase(
200+
attributes.getOrDefault(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_DEFAULT_VALUE))
201+
? SINK_TS_FILE_FORMAT_CONFIG
202+
: SINK_TABLET_FORMAT_CONFIG;
198203
}
199204

200205
public Map<String, String> getAttributesWithSinkPrefix() {

iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TabletsPayload.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ public class TabletsPayload implements SubscriptionPollPayload {
4040
* <ul>
4141
* <li>If nextOffset is 1, it indicates that the current payload is the first payload (its
4242
* tablets are empty) and the fetching should continue.
43-
* <li>If nextOffset is negative, it indicates all tablets have been fetched, and -nextOffset
44-
* represents the total number of tablets.
43+
* <li>If nextOffset is negative (or zero), it indicates all tablets have been fetched, and
44+
* -nextOffset represents the total number of tablets.
4545
* </ul>
4646
*/
4747
private transient int nextOffset;

iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -935,7 +935,7 @@ private Optional<SubscriptionMessage> pollTabletsInternal(
935935

936936
int nextOffset = ((TabletsPayload) initialResponse.getPayload()).getNextOffset();
937937
while (true) {
938-
if (nextOffset < 0) {
938+
if (nextOffset <= 0) {
939939
if (!Objects.equals(tablets.size(), -nextOffset)) {
940940
final String errorMessage =
941941
String.format(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
5050
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
5151
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TABLET_VALUE;
52+
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TS_FILE_VALUE;
5253
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_FORMAT_KEY;
5354

5455
public class PipeDataNodeTaskBuilder {
@@ -139,7 +140,16 @@ public PipeDataNodeTask build() {
139140
.getStringOrDefault(
140141
Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
141142
CONNECTOR_FORMAT_HYBRID_VALUE)
142-
.equals(CONNECTOR_FORMAT_TABLET_VALUE));
143+
.equals(CONNECTOR_FORMAT_TABLET_VALUE),
144+
PipeType.SUBSCRIPTION.equals(pipeType)
145+
&&
146+
// should not skip parsing when the format is tsfile
147+
!pipeStaticMeta
148+
.getConnectorParameters()
149+
.getStringOrDefault(
150+
Arrays.asList(CONNECTOR_FORMAT_KEY, SINK_FORMAT_KEY),
151+
CONNECTOR_FORMAT_HYBRID_VALUE)
152+
.equals(CONNECTOR_FORMAT_TS_FILE_VALUE));
143153

144154
return new PipeDataNodeTask(
145155
pipeStaticMeta.getPipeName(), regionId, extractorStage, processorStage, connectorStage);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ public class PipeEventCollector implements EventCollector {
5555

5656
private final boolean forceTabletFormat;
5757

58+
private final boolean skipParseTsFile;
59+
5860
private final AtomicInteger collectInvocationCount = new AtomicInteger(0);
5961
private boolean hasNoGeneratedEvent = true;
6062
private boolean isFailedToIncreaseReferenceCount = false;
@@ -63,11 +65,13 @@ public PipeEventCollector(
6365
final UnboundedBlockingPendingQueue<Event> pendingQueue,
6466
final long creationTime,
6567
final int regionId,
66-
final boolean forceTabletFormat) {
68+
final boolean forceTabletFormat,
69+
final boolean skipParseTsFile) {
6770
this.pendingQueue = pendingQueue;
6871
this.creationTime = creationTime;
6972
this.regionId = regionId;
7073
this.forceTabletFormat = forceTabletFormat;
74+
this.skipParseTsFile = skipParseTsFile;
7175
}
7276

7377
@Override
@@ -117,6 +121,11 @@ private void parseAndCollectEvent(final PipeTsFileInsertionEvent sourceEvent) th
117121
return;
118122
}
119123

124+
if (skipParseTsFile) {
125+
collectEvent(sourceEvent);
126+
return;
127+
}
128+
120129
if (!forceTabletFormat
121130
&& (!sourceEvent.shouldParseTimeOrPattern()
122131
|| (sourceEvent.isTableModelEvent()

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ public PipeTaskProcessorStage(
6868
final UnboundedBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue,
6969
final PipeProcessorSubtaskExecutor executor,
7070
final PipeTaskMeta pipeTaskMeta,
71-
final boolean forceTabletFormat) {
71+
final boolean forceTabletFormat,
72+
final boolean skipParseTsFile) {
7273
final PipeProcessorRuntimeConfiguration runtimeConfiguration =
7374
new PipeTaskRuntimeConfiguration(
7475
new PipeTaskProcessorRuntimeEnvironment(
@@ -100,7 +101,11 @@ public PipeTaskProcessorStage(
100101
final String taskId = pipeName + "_" + regionId + "_" + creationTime;
101102
final PipeEventCollector pipeConnectorOutputEventCollector =
102103
new PipeEventCollector(
103-
pipeConnectorOutputPendingQueue, creationTime, regionId, forceTabletFormat);
104+
pipeConnectorOutputPendingQueue,
105+
creationTime,
106+
regionId,
107+
forceTabletFormat,
108+
skipParseTsFile);
104109
this.pipeProcessorSubtask =
105110
new PipeProcessorSubtask(
106111
taskId,

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,8 @@ public SubscriptionEvent pollInternal(final String consumerId) {
197197
return null;
198198
},
199199
SubscriptionAgent.receiver().remainingMs());
200-
} catch (final Exception ignored) {
200+
} catch (final Exception e) {
201+
LOGGER.warn("Exception {} occurred when {} execute receiver subtask", this, e, e);
201202
}
202203
}
203204

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public SubscriptionEvent pollTablets(
159159
String.format(
160160
"exception occurred when fetching next response: %s, consumer id: %s, commit context: %s, offset: %s, prefetching queue: %s",
161161
e, consumerId, commitContext, offset, this);
162-
LOGGER.warn(errorMessage);
162+
LOGGER.warn(errorMessage, e);
163163
eventRef.set(generateSubscriptionPollErrorResponse(errorMessage));
164164
}
165165

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ public SubscriptionEvent pollTsFile(
211211
String.format(
212212
"exception occurred when fetching next response: %s, consumer id: %s, commit context: %s, writing offset: %s, prefetching queue: %s",
213213
e, consumerId, commitContext, writingOffset, this);
214-
LOGGER.warn(errorMessage);
214+
LOGGER.warn(errorMessage, e);
215215
eventRef.set(generateSubscriptionPollErrorResponse(errorMessage));
216216
}
217217

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import org.slf4j.Logger;
3838
import org.slf4j.LoggerFactory;
3939

40-
import java.util.ArrayList;
4140
import java.util.Collections;
4241
import java.util.Iterator;
4342
import java.util.List;
@@ -61,7 +60,7 @@ public class SubscriptionPipeTabletEventBatch extends SubscriptionPipeEventBatch
6160
private final Meter insertNodeTabletInsertionEventSizeEstimator;
6261
private final Meter rawTabletInsertionEventSizeEstimator;
6362

64-
private volatile List<EnrichedEvent> iteratedEnrichedEvents;
63+
private volatile SubscriptionPipeTabletIterationSnapshot iterationSnapshot;
6564
private final AtomicInteger referenceCount = new AtomicInteger();
6665

6766
private static final long ITERATED_COUNT_REPORT_FREQ =
@@ -88,7 +87,17 @@ public SubscriptionPipeTabletEventBatch(
8887
@Override
8988
public synchronized void ack() {
9089
referenceCount.decrementAndGet();
91-
// do nothing for iterated enriched events, see SubscriptionPipeTabletBatchEvents
90+
91+
// we decrease the reference count of events if and only if when the whole batch is consumed
92+
if (!hasNext() && referenceCount.get() == 0) {
93+
for (final EnrichedEvent enrichedEvent : enrichedEvents) {
94+
if (enrichedEvent instanceof PipeTsFileInsertionEvent) {
95+
// close data container in tsfile event
96+
((PipeTsFileInsertionEvent) enrichedEvent).close();
97+
}
98+
enrichedEvent.decreaseReferenceCount(this.getClass().getName(), true);
99+
}
100+
}
92101
}
93102

94103
@Override
@@ -200,26 +209,29 @@ private void updateEstimatedRawTabletInsertionEventSize(final long size) {
200209

201210
/////////////////////////////// iterator ///////////////////////////////
202211

203-
public List<EnrichedEvent> sendIterationSnapshot() {
204-
final List<EnrichedEvent> result = Collections.unmodifiableList(iteratedEnrichedEvents);
205-
iteratedEnrichedEvents = new ArrayList<>();
212+
public synchronized SubscriptionPipeTabletIterationSnapshot sendIterationSnapshot() {
213+
final SubscriptionPipeTabletIterationSnapshot result = iterationSnapshot;
214+
iterationSnapshot = new SubscriptionPipeTabletIterationSnapshot();
206215
referenceCount.incrementAndGet();
207216
return result;
208217
}
209218

210-
public void resetForIteration() {
219+
public synchronized void resetForIteration() {
211220
currentEnrichedEventsIterator = enrichedEvents.iterator();
212221
currentTabletInsertionEventsIterator = null;
213222
currentTsFileInsertionEvent = null;
214223

215-
iteratedEnrichedEvents = new ArrayList<>();
224+
if (Objects.nonNull(iterationSnapshot)) {
225+
iterationSnapshot.clear();
226+
}
227+
iterationSnapshot = new SubscriptionPipeTabletIterationSnapshot();
216228
referenceCount.set(0);
217229

218230
iteratedCount.set(0);
219231
}
220232

221233
@Override
222-
public boolean hasNext() {
234+
public synchronized boolean hasNext() {
223235
if (Objects.nonNull(currentTabletInsertionEventsIterator)) {
224236
if (currentTabletInsertionEventsIterator.hasNext()) {
225237
return true;
@@ -245,7 +257,7 @@ public boolean hasNext() {
245257
}
246258

247259
@Override
248-
public List<Tablet> next() {
260+
public synchronized List<Tablet> next() {
249261
final List<Tablet> tablets = nextInternal();
250262
if (Objects.isNull(tablets)) {
251263
return null;
@@ -267,8 +279,16 @@ private List<Tablet> nextInternal() {
267279
if (currentTabletInsertionEventsIterator.hasNext()) {
268280
final TabletInsertionEvent tabletInsertionEvent =
269281
currentTabletInsertionEventsIterator.next();
282+
if (!((EnrichedEvent) tabletInsertionEvent)
283+
.increaseReferenceCount(this.getClass().getName())) {
284+
LOGGER.warn(
285+
"SubscriptionPipeTabletEventBatch: Failed to increase the reference count of event {}, skipping it.",
286+
((EnrichedEvent) tabletInsertionEvent).coreReportMessage());
287+
} else {
288+
iterationSnapshot.addParsedEnrichedEvent((EnrichedEvent) tabletInsertionEvent);
289+
}
270290
if (!currentTabletInsertionEventsIterator.hasNext()) {
271-
iteratedEnrichedEvents.add((EnrichedEvent) currentTsFileInsertionEvent);
291+
iterationSnapshot.addIteratedEnrichedEvent((EnrichedEvent) currentTsFileInsertionEvent);
272292
}
273293
return convertToTablets(tabletInsertionEvent);
274294
} else {
@@ -297,11 +317,13 @@ private List<Tablet> nextInternal() {
297317
currentTsFileInsertionEvent = tsFileInsertionEvent;
298318
currentTabletInsertionEventsIterator =
299319
tsFileInsertionEvent
300-
.toTabletInsertionEvents(SubscriptionAgent.receiver().remainingMs())
320+
.toTabletInsertionEvents(
321+
// disrupt parsing requests through the introduction of randomness
322+
(long) ((1 + Math.random()) * SubscriptionAgent.receiver().remainingMs()))
301323
.iterator();
302324
return next();
303325
} else if (enrichedEvent instanceof TabletInsertionEvent) {
304-
iteratedEnrichedEvents.add(enrichedEvent);
326+
iterationSnapshot.addIteratedEnrichedEvent(enrichedEvent);
305327
return convertToTablets((TabletInsertionEvent) enrichedEvent);
306328
} else {
307329
LOGGER.warn(

0 commit comments

Comments
 (0)