Skip to content

Commit 1489c84

Browse files
Pipe: discard batched events before restarting pipes (#13238)
Co-authored-by: Steve Yurong Su <rong@apache.org>
1 parent 5c5c922 commit 1489c84

File tree

7 files changed

+42
-8
lines changed

7 files changed

+42
-8
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
2121

2222
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
23+
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
2324
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
2425
import org.apache.iotdb.pipe.api.event.Event;
2526
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -115,7 +116,23 @@ public synchronized void close() {
115116
events.clear();
116117
}
117118

118-
public void decreaseEventsReferenceCount(final String holderMessage, final boolean shouldReport) {
119+
/**
120+
* Discard all events of the given pipe. This method only clears the reference count of the events
121+
* and discard them, but do not modify other objects (such as buffers) for simplicity.
122+
*/
123+
public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
124+
events.removeIf(
125+
event -> {
126+
if (pipeNameToDrop.equals(event.getPipeName())) {
127+
event.clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
128+
return true;
129+
}
130+
return false;
131+
});
132+
}
133+
134+
public synchronized void decreaseEventsReferenceCount(
135+
final String holderMessage, final boolean shouldReport) {
119136
events.forEach(event -> event.decreaseReferenceCount(holderMessage, shouldReport));
120137
}
121138

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,11 @@ public boolean isEmpty() {
184184
&& endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty);
185185
}
186186

187+
public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
188+
defaultBatch.discardEventsOfPipe(pipeNameToDrop);
189+
endPointToBatch.values().forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop));
190+
}
191+
187192
@Override
188193
public synchronized void close() {
189194
defaultBatch.close();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -515,11 +515,9 @@ public synchronized void clearRetryEventsReferenceCount() {
515515

516516
//////////////////////////// Operations for close ////////////////////////////
517517

518-
/**
519-
* When a pipe is dropped, the connector maybe reused and will not be closed. So we just discard
520-
* its queued events in the output pipe connector.
521-
*/
518+
@Override
522519
public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
520+
tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop);
523521
retryEventQueue.removeIf(
524522
event -> {
525523
if (event instanceof EnrichedEvent

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,11 @@ private void doTransfer(
492492
LOGGER.info("Successfully transferred file {}.", tsFile);
493493
}
494494

495+
@Override
496+
public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
497+
tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop);
498+
}
499+
495500
@Override
496501
public void close() {
497502
if (tabletBatchBuilder != null) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
2323
import org.apache.iotdb.commons.pipe.config.PipeConfig;
24+
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
2425
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2526
import org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
2627
import org.apache.iotdb.commons.pipe.task.subtask.PipeAbstractConnectorSubtask;
@@ -268,8 +269,8 @@ public void discardEventsOfPipe(final String pipeNameToDrop) {
268269
}
269270
}
270271

271-
if (outputPipeConnector instanceof IoTDBDataRegionAsyncConnector) {
272-
((IoTDBDataRegionAsyncConnector) outputPipeConnector).discardEventsOfPipe(pipeNameToDrop);
272+
if (outputPipeConnector instanceof IoTDBConnector) {
273+
((IoTDBConnector) outputPipeConnector).discardEventsOfPipe(pipeNameToDrop);
273274
}
274275
}
275276

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public synchronized void register() {
8989
* the {@link PipeConnectorSubtask} should never be used again
9090
* @throws IllegalStateException if {@link PipeConnectorSubtaskLifeCycle#registeredTaskCount} <= 0
9191
*/
92-
public synchronized boolean deregister(String pipeNameToDeregister) {
92+
public synchronized boolean deregister(final String pipeNameToDeregister) {
9393
if (registeredTaskCount <= 0) {
9494
throw new IllegalStateException("registeredTaskCount <= 0");
9595
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,14 @@ public void rateLimitIfNeeded(
452452
GLOBAL_RATE_LIMITER.acquire(bytesLength);
453453
}
454454

455+
/**
456+
* When a pipe is dropped, the connector maybe reused and will not be closed. We need to discard
457+
* its batched or queued events in the output pipe connector.
458+
*/
459+
public synchronized void discardEventsOfPipe(final String pipeName) {
460+
// Do nothing by default
461+
}
462+
455463
public PipeReceiverStatusHandler statusHandler() {
456464
return receiverStatusHandler;
457465
}

0 commit comments

Comments
 (0)