Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 committed Feb 24, 2025
1 parent b663dd9 commit 09d9a0b
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,6 @@ public void plan(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusesLis
long txnTimeout = timeoutS == 0 ? ConnectContext.get().getExecTimeout() : timeoutS;
olapTableSink.init(loadId, txnId, dbId, timeoutS, sendBatchParallelism, singleTabletLoadPerSink, strictMode,
txnTimeout);
LOG.info("[xxx LoadingTaskPlanner] isPartialUpdate={}, partialUpdateInputColumns={}, column size={}",
isPartialUpdate, partialUpdateInputColumns, partialUpdateInputColumns.size());
olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns);

olapTableSink.complete(analyzer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,7 @@ private LogicalPlan completeQueryPlan(ConnectContext ctx, BulkLoadDataDesc dataD
tvfLogicalPlan = new LogicalProject<>(selectLists, tvfLogicalPlan);
checkAndAddSequenceCol(olapTable, dataDesc, sinkCols, selectLists);
boolean isPartialUpdate = olapTable.getEnableUniqueKeyMergeOnWrite()
&& properties.getOrDefault("partial_columns", "false").equalsIgnoreCase("true");
LOG.info("[xxx completeQueryPlan] properties={}, isPartialUpdate={}", properties, isPartialUpdate);
&& sinkCols.size() < olapTable.getColumns().size();
return UnboundTableSinkCreator.createUnboundTableSink(dataDesc.getNameParts(), sinkCols, ImmutableList.of(),
false, dataDesc.getPartitionNames(), isPartialUpdate, DMLCommandType.LOAD, tvfLogicalPlan);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,22 @@
3 3 3 3 3

-- !sql --
1 1 123 876 1
2 2 345 678 2
1 \N \N \N 99
2 2 2 2 2
3 3 3 3 3
4 \N \N \N 88

-- !sql --
1 \N \N \N 99
2 2 2 2 2
3 333 \N \N \N
4 \N \N \N 88
5 555 \N \N \N

-- !sql --
1 \N 123 876 99
2 2 345 678 2
3 333 \N \N \N
4 \N \N \N 88
5 555 \N \N \N

Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,57 @@ suite("test_partial_update_s3_load", "p0") {
sql "sync;"
qt_sql "select * from ${tableName} order by k1;"


def label = "test_pu" + UUID.randomUUID().toString().replace("-", "_")
logger.info("test_primary_key_partial_update, label: $label")
// 1,99
// 4,88
sql """
LOAD LABEL $label (
DATA INFILE("s3://${getS3BucketName()}/regression/unqiue_with_mow_p0/partial_update/row_s3_1.csv")
INTO TABLE ${tableName}
COLUMNS TERMINATED BY ","
(k1,c4)
) WITH S3 (
"AWS_ACCESS_KEY" = "${getS3AK()}",
"AWS_SECRET_KEY" = "${getS3SK()}",
"AWS_ENDPOINT" = "${getS3Endpoint()}",
"AWS_REGION" = "${getS3Region()}",
"provider" = "${getS3Provider()}"
);
"""
waitForBrokerLoadDone(label)
qt_sql "select * from ${tableName} order by k1;"


label = "test_pu" + UUID.randomUUID().toString().replace("-", "_")
logger.info("test_primary_key_partial_update, label: $label")
// 3,333
// 5,555
sql """
LOAD LABEL $label (
DATA INFILE("s3://${getS3BucketName()}/regression/unqiue_with_mow_p0/partial_update/row_s3_2.csv")
INTO TABLE ${tableName}
COLUMNS TERMINATED BY ","
(k1,c1)
) WITH S3 (
"AWS_ACCESS_KEY" = "${getS3AK()}",
"AWS_SECRET_KEY" = "${getS3SK()}",
"AWS_ENDPOINT" = "${getS3Endpoint()}",
"AWS_REGION" = "${getS3Region()}",
"provider" = "${getS3Provider()}"
) properties(
"partial_columns" = "false"
);
"""
waitForBrokerLoadDone(label)
qt_sql "select * from ${tableName} order by k1;"


label = "test_pu" + UUID.randomUUID().toString().replace("-", "_")
logger.info("test_primary_key_partial_update, label: $label")
// 1,123,876
// 2,345,678

sql """
LOAD LABEL $label (
DATA INFILE("s3://${getS3BucketName()}/regression/unqiue_with_mow_p0/partial_update/pu_s3.csv")
Expand All @@ -60,8 +105,6 @@ suite("test_partial_update_s3_load", "p0") {
"partial_columns" = "true"
);
"""

waitForBrokerLoadDone(label)

qt_sql "select * from ${tableName} order by k1;"
}

0 comments on commit 09d9a0b

Please sign in to comment.