Skip to content

[To dev/1.3] Pipe: Added rate limiter for tsFile sending (#15765) #15872

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: dev/1.3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ protected byte[] generateHandShakeV2Payload() throws IOException {
return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
}

@Override
protected void mayLimitRateAndRecordIO(final long requiredBytes) {
// Do nothing
}

@Override
protected boolean mayNeedHandshakeWhenFail() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ protected PipeTransferFilePieceReq getTransferMultiFilePieceReq(
return PipeTransferConfigSnapshotPieceReq.toTPipeTransferReq(fileName, position, payLoad);
}

@Override
protected void mayLimitRateAndRecordIO(final long requiredBytes) {
// Do nothing
}

@Override
public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_QUERY_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_MODE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.SOURCE_REALTIME_ENABLE_KEY;

public class PipeDataNodeTaskBuilder {

Expand Down Expand Up @@ -181,10 +184,6 @@ private void checkConflict(
extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_SNAPSHOT_VALUE)
|| extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);

if (!insertionDeletionListeningOptionPair.right
&& !shouldTerminatePipeOnAllHistoricalEventsConsumed) {
return;
}
} catch (final IllegalPathException e) {
LOGGER.warn(
"PipeDataNodeTaskBuilder failed to parse 'inclusion' and 'exclusion' parameters: {}",
Expand All @@ -193,29 +192,52 @@ private void checkConflict(
return;
}

final Boolean isRealtime =
connectorParameters.getBooleanByKeys(
PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY,
PipeConnectorConstant.SINK_REALTIME_FIRST_KEY);
if (isRealtime == null) {
connectorParameters.addAttribute(PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY, "false");
if (insertionDeletionListeningOptionPair.right) {
LOGGER.info(
"PipeDataNodeTaskBuilder: When 'inclusion' contains 'data.delete', 'realtime-first' is defaulted to 'false' to prevent sync issues after deletion.");
} else {
LOGGER.info(
"PipeDataNodeTaskBuilder: When extractor uses snapshot model, 'realtime-first' is defaulted to 'false' to prevent premature halt before transfer completion.");
if (insertionDeletionListeningOptionPair.right
|| shouldTerminatePipeOnAllHistoricalEventsConsumed) {
final Boolean isRealtime =
connectorParameters.getBooleanByKeys(
PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY,
PipeConnectorConstant.SINK_REALTIME_FIRST_KEY);
if (isRealtime == null) {
connectorParameters.addAttribute(
PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY, "false");
if (insertionDeletionListeningOptionPair.right) {
LOGGER.info(
"PipeDataNodeTaskBuilder: When 'inclusion' contains 'data.delete', 'realtime-first' is defaulted to 'false' to prevent sync issues after deletion.");
} else {
LOGGER.info(
"PipeDataNodeTaskBuilder: When extractor uses snapshot model, 'realtime-first' is defaulted to 'false' to prevent premature halt before transfer completion.");
}
} else if (isRealtime) {
if (insertionDeletionListeningOptionPair.right) {
LOGGER.warn(
"PipeDataNodeTaskBuilder: When 'inclusion' includes 'data.delete', 'realtime-first' set to 'true' may result in data synchronization issues after deletion.");
} else {
LOGGER.warn(
"PipeDataNodeTaskBuilder: When extractor uses snapshot model, 'realtime-first' set to 'true' may cause prevent premature halt before transfer completion.");
}
}
return;
}

if (isRealtime) {
if (insertionDeletionListeningOptionPair.right) {
LOGGER.warn(
"PipeDataNodeTaskBuilder: When 'inclusion' includes 'data.delete', 'realtime-first' set to 'true' may result in data synchronization issues after deletion.");
} else {
final boolean isRealtimeEnabled =
extractorParameters.getBooleanOrDefault(
Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY),
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE);

if (isRealtimeEnabled && !shouldTerminatePipeOnAllHistoricalEventsConsumed) {
final Boolean enableSendTsFileLimit =
connectorParameters.getBooleanByKeys(
PipeConnectorConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT);

if (enableSendTsFileLimit == null) {
connectorParameters.addAttribute(
PipeConnectorConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, "true");
LOGGER.info(
"PipeDataNodeTaskBuilder: When the realtime sync is enabled, we enable rate limiter in sending tsfile by default to reserve disk and network IO for realtime sending.");
} else if (!enableSendTsFileLimit) {
LOGGER.warn(
"PipeDataNodeTaskBuilder: When extractor uses snapshot model, 'realtime-first' set to 'true' may cause prevent premature halt before transfer completion.");
"PipeDataNodeTaskBuilder: When the realtime sync is enabled, not enabling the rate limiter in sending tsfile may introduce delay for realtime sending.");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.connector.protocol.airgap;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.connector.limiter.TsFileSendRateLimiter;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
Expand All @@ -34,9 +35,12 @@
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionConnectorMetrics;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
Expand All @@ -49,13 +53,32 @@

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;

import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_ENABLE_SEND_TSFILE_LIMIT;

public class IoTDBDataRegionAirGapConnector extends IoTDBDataNodeAirGapConnector {

private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBDataRegionAirGapConnector.class);

private boolean enableSendTsFileLimit;

@Override
public void customize(
final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration)
throws Exception {
super.customize(parameters, configuration);

enableSendTsFileLimit =
parameters.getBooleanOrDefault(
Arrays.asList(SINK_ENABLE_SEND_TSFILE_LIMIT, CONNECTOR_ENABLE_SEND_TSFILE_LIMIT),
CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE);
}

@Override
public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception {
// PipeProcessor can change the type of TabletInsertionEvent
Expand Down Expand Up @@ -294,6 +317,14 @@ private void doTransfer(
}
}

@Override
protected void mayLimitRateAndRecordIO(final long requiredBytes) {
PipeResourceMetrics.getInstance().recordDiskIO(requiredBytes);
if (enableSendTsFileLimit) {
TsFileSendRateLimiter.getInstance().acquire(requiredBytes);
}
}

@Override
protected byte[] getTransferSingleFilePieceBytes(
final String fileName, final long position, final byte[] payLoad) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ private void doTransfer(
}
}

@Override
protected void mayLimitRateAndRecordIO(final long requiredBytes) {
// Do nothing
}

@Override
protected byte[] getTransferSingleFilePieceBytes(
final String fileName, final long position, final byte[] payLoad) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_ENABLE_SEND_TSFILE_LIMIT;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY;
Expand Down Expand Up @@ -118,6 +121,8 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector {
private final Map<PipeTransferTrackableHandler, PipeTransferTrackableHandler> pendingHandlers =
new ConcurrentHashMap<>();

private boolean enableSendTsFileLimit;

@Override
public void validate(final PipeParameterValidator validator) throws Exception {
super.validate(validator);
Expand Down Expand Up @@ -173,6 +178,11 @@ public void customize(
if (isTabletBatchModeEnabled) {
tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
}

enableSendTsFileLimit =
parameters.getBooleanOrDefault(
Arrays.asList(SINK_ENABLE_SEND_TSFILE_LIMIT, CONNECTOR_ENABLE_SEND_TSFILE_LIMIT),
CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE);
}

@Override
Expand Down Expand Up @@ -669,6 +679,10 @@ public void addFailureEventsToRetryQueue(final Iterable<EnrichedEvent> events) {
events.forEach(this::addFailureEventToRetryQueue);
}

public boolean isEnableSendTsFileLimit() {
return enableSendTsFileLimit;
}

//////////////////////////// Operations for close ////////////////////////////

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.limiter.TsFileSendRateLimiter;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.utils.RetryUtils;
Expand All @@ -32,6 +33,7 @@
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFileSealWithModReq;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock;
Expand Down Expand Up @@ -159,6 +161,10 @@ public void transfer(
client.setShouldReturnSelf(false);
client.setTimeoutDynamically(clientManager.getConnectionTimeout());

PipeResourceMetrics.getInstance().recordDiskIO(readFileBufferSize);
if (connector.isEnableSendTsFileLimit()) {
TsFileSendRateLimiter.getInstance().acquire(readFileBufferSize);
}
final int readLength = reader.read(readBuffer);

if (readLength == -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
import org.apache.iotdb.commons.pipe.connector.limiter.TsFileSendRateLimiter;
import org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.utils.RetryUtils;
Expand All @@ -43,6 +44,7 @@
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionConnectorMetrics;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
Expand All @@ -65,16 +67,22 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_ENABLE_SEND_TSFILE_LIMIT;

public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector {

private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionSyncConnector.class);

private PipeTransferBatchReqBuilder tabletBatchBuilder;
private boolean enableSendTsFileLimit;

@Override
public void customize(
Expand All @@ -86,6 +94,11 @@ public void customize(
if (isTabletBatchModeEnabled) {
tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
}

enableSendTsFileLimit =
parameters.getBooleanOrDefault(
Arrays.asList(SINK_ENABLE_SEND_TSFILE_LIMIT, CONNECTOR_ENABLE_SEND_TSFILE_LIMIT),
CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE);
}

@Override
Expand All @@ -100,6 +113,14 @@ protected PipeTransferFilePieceReq getTransferMultiFilePieceReq(
return PipeTransferTsFilePieceWithModReq.toTPipeTransferReq(fileName, position, payLoad);
}

@Override
protected void mayLimitRateAndRecordIO(final long requiredBytes) {
PipeResourceMetrics.getInstance().recordDiskIO(requiredBytes);
if (enableSendTsFileLimit) {
TsFileSendRateLimiter.getInstance().acquire(requiredBytes);
}
}

@Override
public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception {
// PipeProcessor can change the type of TabletInsertionEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,9 @@ protected PipeTransferFilePieceReq getTransferMultiFilePieceReq(
final String fileName, final long position, final byte[] payLoad) throws IOException {
return PipeTransferSchemaSnapshotPieceReq.toTPipeTransferReq(fileName, position, payLoad);
}

@Override
protected void mayLimitRateAndRecordIO(final long requiredBytes) {
// Do nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,20 +193,30 @@ public void validate(final PipeParameterValidator validator) throws Exception {
}

// Validate source.start-time and source.end-time
if (validator.getParameters().hasAnyAttributes(SOURCE_START_TIME_KEY, SOURCE_END_TIME_KEY)
if (validator
.getParameters()
.hasAnyAttributes(
SOURCE_START_TIME_KEY,
EXTRACTOR_START_TIME_KEY,
SOURCE_END_TIME_KEY,
EXTRACTOR_END_TIME_KEY)
&& validator
.getParameters()
.hasAnyAttributes(
EXTRACTOR_HISTORY_ENABLE_KEY,
EXTRACTOR_REALTIME_ENABLE_KEY,
SOURCE_HISTORY_ENABLE_KEY,
SOURCE_REALTIME_ENABLE_KEY)) {
SOURCE_HISTORY_START_TIME_KEY,
EXTRACTOR_HISTORY_START_TIME_KEY,
SOURCE_HISTORY_END_TIME_KEY,
EXTRACTOR_HISTORY_END_TIME_KEY)) {
LOGGER.warn(
"When {}, {}, {} or {} is specified, specifying {}, {}, {} and {} is invalid.",
"When {}, {}, {} or {} is specified, specifying {}, {}, {}, {}, {} and {} is invalid.",
SOURCE_START_TIME_KEY,
EXTRACTOR_START_TIME_KEY,
SOURCE_END_TIME_KEY,
EXTRACTOR_END_TIME_KEY,
SOURCE_HISTORY_ENABLE_KEY,
EXTRACTOR_HISTORY_ENABLE_KEY,
SOURCE_HISTORY_START_TIME_KEY,
EXTRACTOR_HISTORY_START_TIME_KEY,
SOURCE_HISTORY_END_TIME_KEY,
Expand Down
Loading
Loading