Skip to content

Commit

Permalink
[feature](merge-cloud) Set the txn timeout seconds of TDataSink (#30558)
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter authored Jan 30, 2024
1 parent 35064dc commit b5a87fa
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ public void analyze(Analyzer analyzer) throws UserException {
boolean isInsertStrict = analyzer.getContext().getSessionVariable().getEnableInsertStrict()
&& !isFromDeleteOrUpdateStmt;
sink.init(loadId, transactionId, db.getId(), timeoutSecond,
sendBatchParallelism, false, isInsertStrict);
sendBatchParallelism, false, isInsertStrict, timeoutSecond);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,9 @@ public void plan(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusesLis
List<Long> partitionIds = getAllPartitionIds();
OlapTableSink olapTableSink = new OlapTableSink(table, destTupleDesc, partitionIds,
Config.enable_single_replica_load);
olapTableSink.init(loadId, txnId, dbId, timeoutS, sendBatchParallelism, singleTabletLoadPerSink, strictMode);
long txnTimeout = timeoutS == 0 ? ConnectContext.get().getExecTimeout() : timeoutS;
olapTableSink.init(loadId, txnId, dbId, timeoutS, sendBatchParallelism, singleTabletLoadPerSink, strictMode,
txnTimeout);
olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns);

olapTableSink.complete(analyzer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,13 @@ public void finalizeSink(DataSink sink, boolean isPartialUpdate, boolean isFromI
&& isFromInsert;
try {
// TODO refactor this to avoid call legacy planner's function
int timeout = ctx.getExecTimeout();
olapTableSink.init(ctx.queryId(), txnId, database.getId(),
ctx.getExecTimeout(),
timeout,
ctx.getSessionVariable().getSendBatchParallelism(),
false,
isStrictMode);
isStrictMode,
timeout);
olapTableSink.complete(new Analyzer(Env.getCurrentEnv(), ctx));
if (!allowAutoPartition) {
olapTableSink.setAutoPartition(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List<L
}

public void init(TUniqueId loadId, long txnId, long dbId, long loadChannelTimeoutS, int sendBatchParallelism,
boolean loadToSingleTablet, boolean isStrictMode) throws AnalysisException {
boolean loadToSingleTablet, boolean isStrictMode, long txnExpirationS) throws AnalysisException {
TOlapTableSink tSink = new TOlapTableSink();
tSink.setLoadId(loadId);
tSink.setTxnId(txnId);
Expand All @@ -134,6 +134,7 @@ public void init(TUniqueId loadId, long txnId, long dbId, long loadChannelTimeou
"if load_to_single_tablet set to true," + " the olap table must be with random distribution");
}
tSink.setLoadToSingleTablet(loadToSingleTablet);
tSink.setTxnTimeoutS(txnExpirationS);
tDataSink = new TDataSink(getDataSinkType());
tDataSink.setOlapTableSink(tSink);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.LoadTaskInfo;
import org.apache.doris.task.StreamLoadTask;
Expand Down Expand Up @@ -266,8 +267,9 @@ public TExecPlanFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde
} else {
olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, Config.enable_single_replica_load);
}
int txnTimeout = timeout == 0 ? ConnectContext.get().getExecTimeout() : timeout;
olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout, taskInfo.getSendBatchParallelism(),
taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode());
taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode(), txnTimeout);
olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns);
olapTableSink.complete(analyzer);

Expand Down Expand Up @@ -492,8 +494,10 @@ public TPipelineFragmentParams planForPipeline(TUniqueId loadId, int fragmentIns
} else {
olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, Config.enable_single_replica_load);
}
int txnTimeout = timeout == 0 ? ConnectContext.get().getExecTimeout() : timeout;
olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout,
taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode());
taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode(),
txnTimeout);
olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns);
olapTableSink.complete(analyzer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void testSinglePartition() throws UserException {
new DataProperty(DataProperty.DEFAULT_STORAGE_MEDIUM));
dstTable.getPartitionInfo().setIsMutable(partition.getId(), true);
OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(2L), false);
sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false);
sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false, 5);
sink.complete(null);
LOG.info("sink is {}", sink.toThrift());
LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL));
Expand Down Expand Up @@ -148,7 +148,7 @@ public void testRangePartition(
};

OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(p1.getId()), false);
sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false);
sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false, 5);
try {
sink.complete(null);
} catch (UserException e) {
Expand All @@ -173,7 +173,7 @@ public void testRangeUnknownPartition(
};

OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(unknownPartId), false);
sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false);
sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false, 5);
sink.complete(null);
LOG.info("sink is {}", sink.toThrift());
LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL));
Expand Down Expand Up @@ -212,7 +212,7 @@ public void testListPartition(
};

OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(p1.getId()), false);
sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false);
sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false, 5);
try {
sink.complete(null);
} catch (UserException e) {
Expand Down

0 comments on commit b5a87fa

Please sign in to comment.