Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
Expand Down Expand Up @@ -304,6 +305,34 @@ public int getAsyncConnectorRetryEventQueueSize() {
: 0;
}

public int getPendingHandlersSize() {
return outputPipeConnector instanceof IoTDBDataRegionAsyncConnector
? ((IoTDBDataRegionAsyncConnector) outputPipeConnector).getPendingHandlersSize()
: 0;
}

public int getBatchSize() {
if (outputPipeConnector instanceof IoTDBDataRegionAsyncConnector) {
return ((IoTDBDataRegionAsyncConnector) outputPipeConnector).getBatchSize();
}
if (outputPipeConnector instanceof IoTDBDataRegionSyncConnector) {
return ((IoTDBDataRegionSyncConnector) outputPipeConnector).getBatchSize();
}
return 0;
}

public double getTotalUncompressedSize() {
return outputPipeConnector instanceof IoTDBConnector
? ((IoTDBConnector) outputPipeConnector).getTotalUncompressedSize()
: 0;
}

public double getTotalCompressedSize() {
return outputPipeConnector instanceof IoTDBConnector
? ((IoTDBConnector) outputPipeConnector).getTotalCompressedSize()
: 0;
}

//////////////////////////// Error report ////////////////////////////

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public synchronized String register(
connectorNum = 1;
attributeSortedString = "schema_" + attributeSortedString;
}
environment.setAttributeSortedString(attributeSortedString);

if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
final List<PipeConnectorSubtaskLifeCycle> pipeConnectorSubtaskLifeCycleList =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {

private static final Logger LOGGER = LoggerFactory.getLogger(PipeTabletEventPlainBatch.class);
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTabletEventBatch.class);

private final List<ByteBuffer> binaryBuffers = new ArrayList<>();
private final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_HYBRID_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
Expand Down Expand Up @@ -74,7 +74,8 @@ public class PipeTransferBatchReqBuilder implements AutoCloseable {
// If the leader cache is enabled, the batch will be divided by the leader endpoint,
// each endpoint has a batch.
// This is only used in plain batch since tsfile does not return redirection info.
private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch = new HashMap<>();
private final Map<TEndPoint, PipeTabletEventPlainBatch> endPointToBatch =
new ConcurrentHashMap<>();

public PipeTransferBatchReqBuilder(final PipeParameters parameters) {
final boolean usingTsFileBatch =
Expand Down Expand Up @@ -186,6 +187,21 @@ public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final
endPointToBatch.values().forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop, regionId));
}

public int size() {
try {
return defaultBatch.events.size()
+ endPointToBatch.values().stream()
.map(batch -> batch.events.size())
.reduce(0, Integer::sum);
} catch (final Exception e) {
LOGGER.warn(
"Failed to get the size of PipeTransferBatchReqBuilder, return 0. Exception: {}",
e.getMessage(),
e);
return 0;
}
}

@Override
public synchronized void close() {
defaultBatch.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionConnectorMetrics;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.event.Event;
Expand Down Expand Up @@ -304,4 +305,13 @@ protected byte[] getTransferMultiFilePieceBytes(
final String fileName, final long position, final byte[] payLoad) throws IOException {
return PipeTransferTsFilePieceWithModReq.toTPipeTransferBytes(fileName, position, payLoad);
}

@Override
protected byte[] compressIfNeeded(final byte[] reqInBytes) throws IOException {
if (Objects.isNull(compressionTimer)) {
compressionTimer =
PipeDataRegionConnectorMetrics.getInstance().getCompressionTimer(attributeSortedString);
}
return super.compressIfNeeded(reqInBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionConnectorMetrics;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.pipe.api.PipeConnector;
Expand Down Expand Up @@ -413,6 +414,15 @@ private void transferBatchedEventsIfNecessary()
}
}

@Override
public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws IOException {
if (Objects.isNull(compressionTimer)) {
compressionTimer =
PipeDataRegionConnectorMetrics.getInstance().getCompressionTimer(attributeSortedString);
}
return super.compressIfNeeded(req);
}

//////////////////////////// Leader cache update ////////////////////////////

public void updateLeaderCache(final String deviceId, final TEndPoint endPoint) {
Expand Down Expand Up @@ -682,6 +692,14 @@ public int getRetryEventQueueSize() {
return retryEventQueue.size();
}

public int getBatchSize() {
return tabletBatchBuilder.size();
}

public int getPendingHandlersSize() {
return pendingHandlers.size();
}

//////////////////////// APIs provided for PipeTransferTrackableHandler ////////////////////////

public boolean isClosed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventPlainBatch;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
Expand Down Expand Up @@ -63,11 +62,7 @@ public PipeTransferTabletBatchEventHandler(
pipeName2BytesAccumulated = batch.deepCopyPipeName2BytesAccumulated();

final TPipeTransferReq uncompressedReq = batch.toTPipeTransferReq();
req =
connector.isRpcCompressionEnabled()
? PipeTransferCompressedReq.toTPipeTransferReq(
uncompressedReq, connector.getCompressors())
: uncompressedReq;
req = connector.compressIfNeeded(uncompressedReq);
reqCompressionRatio = (double) req.getBody().length / uncompressedReq.getBody().length;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.utils.RetryUtils;
Expand Down Expand Up @@ -181,11 +180,7 @@ public void transfer(
? PipeTransferTsFileSealWithModReq.toTPipeTransferReq(
modFile.getName(), modFile.length(), tsFile.getName(), tsFile.length())
: PipeTransferTsFileSealReq.toTPipeTransferReq(tsFile.getName(), tsFile.length());
final TPipeTransferReq req =
connector.isRpcCompressionEnabled()
? PipeTransferCompressedReq.toTPipeTransferReq(
uncompressedReq, connector.getCompressors())
: uncompressedReq;
final TPipeTransferReq req = connector.compressIfNeeded(uncompressedReq);

pipeName2WeightMap.forEach(
(pipePair, weight) ->
Expand All @@ -212,11 +207,7 @@ public void transfer(
currentFile.getName(), position, payload)
: PipeTransferTsFilePieceReq.toTPipeTransferReq(
currentFile.getName(), position, payload);
final TPipeTransferReq req =
connector.isRpcCompressionEnabled()
? PipeTransferCompressedReq.toTPipeTransferReq(
uncompressedReq, connector.getCompressors())
: uncompressedReq;
final TPipeTransferReq req = connector.compressIfNeeded(uncompressedReq);

pipeName2WeightMap.forEach(
(pipePair, weight) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionConnectorMetrics;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
Expand Down Expand Up @@ -497,11 +498,24 @@ private void doTransfer(
LOGGER.info("Successfully transferred file {}.", tsFile);
}

@Override
public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws IOException {
if (Objects.isNull(compressionTimer)) {
compressionTimer =
PipeDataRegionConnectorMetrics.getInstance().getCompressionTimer(attributeSortedString);
}
return super.compressIfNeeded(req);
}

@Override
public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) {
tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId);
}

public int getBatchSize() {
return tabletBatchBuilder.size();
}

@Override
public void close() {
if (tabletBatchBuilder != null) {
Expand Down
Loading
Loading