Skip to content

Commit 3ed3a08

Browse files
authored
Pipe: add mark-as-general-write-request parameter in pipe to force forwarding event (#15572) (#15582)
1 parent d20c59d commit 3ed3a08

File tree

2 files changed

+29
-3
lines changed

2 files changed

+29
-3
lines changed

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,12 @@ public class PipeConnectorConstant {
250250
public static final String SINK_MARK_AS_PIPE_REQUEST_KEY = "sink.mark-as-pipe-request";
251251
public static final boolean CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE = true;
252252

253+
public static final String CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_KEY =
254+
"connector.mark-as-general-write-request";
255+
public static final String SINK_MARK_AS_GENERAL_WRITE_REQUEST_KEY =
256+
"sink.mark-as-general-write-request";
257+
public static final boolean CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_DEFAULT_VALUE = false;
258+
253259
public static final String CONNECTOR_SKIP_IF_KEY = "connector.skipif";
254260
public static final String SINK_SKIP_IF_KEY = "sink.skipif";
255261
public static final String CONNECTOR_IOTDB_SKIP_IF_NO_PRIVILEGES = "no-privileges";

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@
101101
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_STRATEGY_SYNC_VALUE;
102102
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_VALIDATION_DEFAULT_VALUE;
103103
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_VALIDATION_KEY;
104+
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_DEFAULT_VALUE;
105+
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_KEY;
104106
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE;
105107
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_PIPE_REQUEST_KEY;
106108
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_DEFAULT_VALUE;
@@ -130,6 +132,7 @@
130132
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY;
131133
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_STRATEGY_KEY;
132134
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_VALIDATION_KEY;
135+
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_MARK_AS_GENERAL_WRITE_REQUEST_KEY;
133136
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_MARK_AS_PIPE_REQUEST_KEY;
134137
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_RATE_LIMIT_KEY;
135138
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_SKIP_IF_KEY;
@@ -225,6 +228,13 @@ public void validate(final PipeParameterValidator validator) throws Exception {
225228
Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_KEY, SINK_IOTDB_BATCH_DELAY_KEY),
226229
false);
227230

231+
// Check coexistence of mark-as-pipe-request and mark-as-general-write-request
232+
validator.validateSynonymAttributes(
233+
Arrays.asList(CONNECTOR_MARK_AS_PIPE_REQUEST_KEY, SINK_MARK_AS_PIPE_REQUEST_KEY),
234+
Arrays.asList(
235+
CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_KEY, SINK_MARK_AS_GENERAL_WRITE_REQUEST_KEY),
236+
false);
237+
228238
username =
229239
parameters.getStringOrDefault(
230240
Arrays.asList(
@@ -379,10 +389,20 @@ public void customize(
379389
.equals(CONNECTOR_FORMAT_TS_FILE_VALUE);
380390
LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}", isTabletBatchModeEnabled);
381391

382-
shouldMarkAsPipeRequest =
392+
final boolean shouldMarkAsGeneralWriteRequest =
383393
parameters.getBooleanOrDefault(
384-
Arrays.asList(CONNECTOR_MARK_AS_PIPE_REQUEST_KEY, SINK_MARK_AS_PIPE_REQUEST_KEY),
385-
CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE);
394+
Arrays.asList(
395+
CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_KEY,
396+
SINK_MARK_AS_GENERAL_WRITE_REQUEST_KEY),
397+
CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_DEFAULT_VALUE);
398+
if (shouldMarkAsGeneralWriteRequest) {
399+
shouldMarkAsPipeRequest = false;
400+
} else {
401+
shouldMarkAsPipeRequest =
402+
parameters.getBooleanOrDefault(
403+
Arrays.asList(CONNECTOR_MARK_AS_PIPE_REQUEST_KEY, SINK_MARK_AS_PIPE_REQUEST_KEY),
404+
CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE);
405+
}
386406
LOGGER.info("IoTDBConnector shouldMarkAsPipeRequest: {}", shouldMarkAsPipeRequest);
387407

388408
final String connectorSkipIfValue =

0 commit comments

Comments
 (0)