Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractConnectorSubtask;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
Expand Down Expand Up @@ -64,9 +63,6 @@ public class PipeConnectorSubtask extends PipeAbstractConnectorSubtask {
// when no event can be pulled.
public static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT =
new PipeHeartbeatEvent("cron", false);
private static final long CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_MILLISECONDS =
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds() * 1000;
private long lastHeartbeatEventInjectTime = System.currentTimeMillis();

public PipeConnectorSubtask(
final String taskID,
Expand Down Expand Up @@ -105,12 +101,8 @@ protected boolean executeOnce() {
}

try {
if (System.currentTimeMillis() - lastHeartbeatEventInjectTime
> CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_MILLISECONDS) {
transferHeartbeatEvent(CRON_HEARTBEAT_EVENT);
}

if (Objects.isNull(event)) {
transferHeartbeatEvent(CRON_HEARTBEAT_EVENT);
return false;
}

Expand Down Expand Up @@ -187,8 +179,6 @@ private void transferHeartbeatEvent(final PipeHeartbeatEvent event) {
e);
}

lastHeartbeatEventInjectTime = System.currentTimeMillis();

event.onTransferred();
PipeDataRegionConnectorMetrics.getInstance().markPipeHeartbeatEvent(taskID);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_FORMAT_TS_FILE_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_TS_FILE_BATCH_DELAY_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_TS_FILE_BATCH_SIZE_DEFAULT_VALUE;
Expand Down Expand Up @@ -93,14 +93,13 @@ public PipeTransferBatchReqBuilder(final PipeParameters parameters) {
final Integer requestMaxDelayInMillis =
parameters.getIntByKeys(CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY, SINK_IOTDB_BATCH_DELAY_MS_KEY);
if (Objects.isNull(requestMaxDelayInMillis)) {
final int requestMaxDelayInSeconds =
final int requestMaxDelayConfig =
parameters.getIntOrDefault(
Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY, SINK_IOTDB_BATCH_DELAY_KEY),
usingTsFileBatch
? CONNECTOR_IOTDB_TS_FILE_BATCH_DELAY_DEFAULT_VALUE
: CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE);
requestMaxDelayInMs =
requestMaxDelayInSeconds < 0 ? Integer.MAX_VALUE : requestMaxDelayInSeconds * 1000;
? CONNECTOR_IOTDB_TS_FILE_BATCH_DELAY_DEFAULT_VALUE * 1000
: CONNECTOR_IOTDB_BATCH_DELAY_MS_DEFAULT_VALUE);
requestMaxDelayInMs = requestMaxDelayConfig < 0 ? Integer.MAX_VALUE : requestMaxDelayConfig;
} else {
requestMaxDelayInMs =
requestMaxDelayInMillis < 0 ? Integer.MAX_VALUE : requestMaxDelayInMillis;
Expand All @@ -122,20 +121,18 @@ public PipeTransferBatchReqBuilder(final PipeParameters parameters) {
* duplicated.
*
* @param event the given {@link Event}
* @return {@link Pair}<{@link TEndPoint}, {@link PipeTabletEventPlainBatch}> not null means this
* {@link PipeTabletEventPlainBatch} can be transferred. the first element is the leader
* endpoint to transfer to (might be null), the second element is the batch to be transferred.
*/
public synchronized Pair<TEndPoint, PipeTabletEventBatch> onEvent(
final TabletInsertionEvent event) throws IOException, WALPipeException {
public synchronized void onEvent(final TabletInsertionEvent event)
throws IOException, WALPipeException {
if (!(event instanceof EnrichedEvent)) {
LOGGER.warn(
"Unsupported event {} type {} when building transfer request", event, event.getClass());
return null;
return;
}

if (!useLeaderCache) {
return defaultBatch.onEvent(event) ? new Pair<>(null, defaultBatch) : null;
defaultBatch.onEvent(event);
return;
}

String deviceId = null;
Expand All @@ -146,35 +143,38 @@ public synchronized Pair<TEndPoint, PipeTabletEventBatch> onEvent(
}

if (Objects.isNull(deviceId)) {
return defaultBatch.onEvent(event) ? new Pair<>(null, defaultBatch) : null;
defaultBatch.onEvent(event);
return;
}

final TEndPoint endPoint =
IoTDBDataNodeCacheLeaderClientManager.LEADER_CACHE_MANAGER.getLeaderEndPoint(deviceId);
if (Objects.isNull(endPoint)) {
return defaultBatch.onEvent(event) ? new Pair<>(null, defaultBatch) : null;
defaultBatch.onEvent(event);
return;
}

final PipeTabletEventPlainBatch batch =
endPointToBatch.computeIfAbsent(
endPointToBatch
.computeIfAbsent(
endPoint,
k -> new PipeTabletEventPlainBatch(requestMaxDelayInMs, requestMaxBatchSizeInBytes));
return batch.onEvent(event) ? new Pair<>(endPoint, batch) : null;
k -> new PipeTabletEventPlainBatch(requestMaxDelayInMs, requestMaxBatchSizeInBytes))
.onEvent(event);
}

/** Get all batches that have at least 1 event. */
public synchronized List<Pair<TEndPoint, PipeTabletEventBatch>> getAllNonEmptyBatches() {
final List<Pair<TEndPoint, PipeTabletEventBatch>> nonEmptyBatches = new ArrayList<>();
if (!defaultBatch.isEmpty()) {
nonEmptyBatches.add(new Pair<>(null, defaultBatch));
public synchronized List<Pair<TEndPoint, PipeTabletEventBatch>>
getAllNonEmptyAndShouldEmitBatches() {
final List<Pair<TEndPoint, PipeTabletEventBatch>> nonEmptyAndShouldEmitBatches =
new ArrayList<>();
if (!defaultBatch.isEmpty() && defaultBatch.shouldEmit()) {
nonEmptyAndShouldEmitBatches.add(new Pair<>(null, defaultBatch));
}
endPointToBatch.forEach(
(endPoint, batch) -> {
if (!batch.isEmpty()) {
nonEmptyBatches.add(new Pair<>(endPoint, batch));
if (!batch.isEmpty() && batch.shouldEmit()) {
nonEmptyAndShouldEmitBatches.add(new Pair<>(endPoint, batch));
}
});
return nonEmptyBatches;
return nonEmptyAndShouldEmitBatches;
}

public boolean isEmpty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_CONSENSUS_BATCH_SIZE_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_DELAY_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_DELAY_MS_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_BATCH_SIZE_KEY;
Expand Down Expand Up @@ -94,7 +94,7 @@ protected PipeConsensusTransferBatchReqBuilder(
final long requestMaxBatchSizeInBytes =
parameters.getLongOrDefault(
Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, SINK_IOTDB_BATCH_SIZE_KEY),
CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE);
CONNECTOR_IOTDB_CONSENSUS_BATCH_SIZE_DEFAULT_VALUE);

allocatedMemoryBlock =
PipeDataNodeResourceManager.memory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,17 +173,16 @@ public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exc
}

if (isTabletBatchModeEnabled) {
final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch =
tabletBatchBuilder.onEvent(tabletInsertionEvent);
transferInBatchWithoutCheck(endPointAndBatch);
tabletBatchBuilder.onEvent(tabletInsertionEvent);
transferBatchedEventsIfNecessary();
} else {
transferInEventWithoutCheck(tabletInsertionEvent);
}
}

private void transferInBatchWithoutCheck(
final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch)
throws IOException, WriteProcessException, InterruptedException {
throws IOException, WriteProcessException {
if (Objects.isNull(endPointAndBatch)) {
return;
}
Expand Down Expand Up @@ -402,14 +401,13 @@ public void transfer(final Event event) throws Exception {
}

/** Try its best to commit data in order. Flush can also be a trigger to transfer batched data. */
private void transferBatchedEventsIfNecessary()
throws IOException, WriteProcessException, InterruptedException {
private void transferBatchedEventsIfNecessary() throws IOException, WriteProcessException {
if (!isTabletBatchModeEnabled || tabletBatchBuilder.isEmpty()) {
return;
}

for (final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch :
tabletBatchBuilder.getAllNonEmptyBatches()) {
tabletBatchBuilder.getAllNonEmptyAndShouldEmitBatches()) {
transferInBatchWithoutCheck(endPointAndBatch);
}
}
Expand Down Expand Up @@ -535,7 +533,8 @@ private void transferQueuedEventsIfNecessary(final boolean forced) {
private void retryTransfer(final TabletInsertionEvent tabletInsertionEvent) {
if (isTabletBatchModeEnabled) {
try {
transferInBatchWithoutCheck(tabletBatchBuilder.onEvent(tabletInsertionEvent));
tabletBatchBuilder.onEvent(tabletInsertionEvent);
transferBatchedEventsIfNecessary();
if (tabletInsertionEvent instanceof EnrichedEvent) {
((EnrichedEvent) tabletInsertionEvent)
.decreaseReferenceCount(IoTDBDataRegionAsyncConnector.class.getName(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,8 @@ public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exc

try {
if (isTabletBatchModeEnabled) {
final Pair<TEndPoint, PipeTabletEventBatch> endPointAndBatch =
tabletBatchBuilder.onEvent(tabletInsertionEvent);
if (Objects.nonNull(endPointAndBatch)) {
doTransferWrapper(endPointAndBatch);
}
tabletBatchBuilder.onEvent(tabletInsertionEvent);
doTransferWrapper();
} else {
if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
doTransferWrapper((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent);
Expand Down Expand Up @@ -182,9 +179,9 @@ public void transfer(final Event event) throws Exception {
}

private void doTransferWrapper() throws IOException, WriteProcessException {
for (final Pair<TEndPoint, PipeTabletEventBatch> nonEmptyBatch :
tabletBatchBuilder.getAllNonEmptyBatches()) {
doTransferWrapper(nonEmptyBatch);
for (final Pair<TEndPoint, PipeTabletEventBatch> nonEmptyAndShouldEmitBatch :
tabletBatchBuilder.getAllNonEmptyAndShouldEmitBatches()) {
doTransferWrapper(nonEmptyAndShouldEmitBatch);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,14 @@ public class CommonConfig {
private int pipeDataStructureTabletSizeInBytes = 2097152;
private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 0.2;
private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold = 0.2;
private double pipeDataStructureWalMemoryProportion = 0.2;
private double PipeDataStructureBatchMemoryProportion = 0.2;
private double pipeDataStructureWalMemoryProportion = 0.3;
private double PipeDataStructureBatchMemoryProportion = 0.1;
private double pipeTotalFloatingMemoryProportion = 0.2;

private int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount = 10_000;
private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 * 1000L;
private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 1000;
private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 50;

private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 20;
private long pipeSubtaskExecutorForcedRestartIntervalMs = Long.MAX_VALUE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,12 @@ public class PipeConnectorConstant {

public static final String CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY = "connector.batch.max-delay-ms";
public static final String SINK_IOTDB_BATCH_DELAY_MS_KEY = "sink.batch.max-delay-ms";
public static final int CONNECTOR_IOTDB_BATCH_DELAY_MS_DEFAULT_VALUE = 10;

public static final String CONNECTOR_IOTDB_BATCH_SIZE_KEY = "connector.batch.size-bytes";
public static final String SINK_IOTDB_BATCH_SIZE_KEY = "sink.batch.size-bytes";
public static final long CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE = 16 * MB;
public static final long CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE = MB;
public static final long CONNECTOR_IOTDB_CONSENSUS_BATCH_SIZE_DEFAULT_VALUE = 16 * MB;
public static final long CONNECTOR_IOTDB_TS_FILE_BATCH_SIZE_DEFAULT_VALUE = 80 * MB;

public static final String CONNECTOR_IOTDB_USER_KEY = "connector.user";
Expand Down
Loading