Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Enable partial update to be used with condition update #30242

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/storage/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ Status DeltaWriter::_init() {
sort_key_idxes.begin(), sort_key_idxes.end())) {
_partial_schema_with_sort_key = true;
}
if (!_opt.merge_condition.empty()) {
writer_context.merge_condition = _opt.merge_condition;
}
writer_context.tablet_schema = writer_context.partial_update_tablet_schema;
_tablet_schema = writer_context.partial_update_tablet_schema;
writer_context.partial_update_mode = _opt.partial_update_mode;
Expand Down
3 changes: 1 addition & 2 deletions be/src/storage/rowset_update_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -580,8 +580,7 @@ Status RowsetUpdateState::apply(Tablet* tablet, Rowset* rowset, uint32_t rowset_
EditVersion latest_applied_version, const PrimaryIndex& index,
std::unique_ptr<Column>& delete_pks, int64_t* append_column_size) {
const auto& rowset_meta_pb = rowset->rowset_meta()->get_meta_pb();
if (!rowset_meta_pb.has_txn_meta() || rowset->num_segments() == 0 ||
rowset_meta_pb.txn_meta().has_merge_condition()) {
if (!rowset_meta_pb.has_txn_meta() || rowset->num_segments() == 0) {
return Status::OK();
}
const auto& txn_meta = rowset_meta_pb.txn_meta();
Expand Down
19 changes: 11 additions & 8 deletions fe/fe-core/src/main/java/com/starrocks/load/Load.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -124,7 +125,7 @@ public Load() {
* @return
* @throws UserException
*/
public static void checkMergeCondition(String mergeCondition, OlapTable table,
public static void checkMergeCondition(String mergeCondition, OlapTable table, List<Column> columns,
boolean missAutoIncrementColumn) throws DdlException {
if (mergeCondition == null || mergeCondition.isEmpty()) {
return;
Expand All @@ -134,15 +135,20 @@ public static void checkMergeCondition(String mergeCondition, OlapTable table,
throw new DdlException("Conditional update only support primary key table " + table.getName());
}

if (table.getColumn(mergeCondition) != null) {
if (table.getColumn(mergeCondition).isKey()) {
Optional<Column> conditionCol = columns.stream().filter(c -> c.getName().equalsIgnoreCase(mergeCondition)).findFirst();
if (!conditionCol.isPresent()) {
throw new DdlException("Merge condition column " + mergeCondition +
" does not exist. If you are doing partial update with condition update, please check condition column" +
" is in the given update columns. Otherwise please check condition column is in table " + table.getName());
} else {
if (conditionCol.get().isKey()) {
throw new DdlException("Merge condition column " + mergeCondition
+ " should not be primary key!");
}
if (missAutoIncrementColumn && table.getColumn(mergeCondition).isAutoIncrement()) {
if (missAutoIncrementColumn && conditionCol.get().isAutoIncrement()) {
throw new DdlException("Merge condition column can not be auto increment column in partial update");
}
switch (table.getColumn(mergeCondition).getPrimitiveType()) {
switch (conditionCol.get().getPrimitiveType()) {
case CHAR:
case VARCHAR:
case PERCENTILE:
Expand All @@ -160,9 +166,6 @@ public static void checkMergeCondition(String mergeCondition, OlapTable table,
default:
return;
}
} else {
throw new DdlException("Merge condition column " + mergeCondition
+ " should be a column of the table " + table.getName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ public void plan(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusesLis
slotDesc.setIsNullable(false);
}

Load.checkMergeCondition(mergeConditionStr, table, destColumns, false);
if (Config.enable_shuffle_load && needShufflePlan()) {
if (Config.eliminate_shuffle_load_by_replicated_storage) {
buildDirectPlan(loadId, fileStatusesList, filesAdded, true);
Expand Down Expand Up @@ -250,7 +251,6 @@ public void buildDirectPlan(TUniqueId loadId, List<List<TBrokerFileStatus>> file
table.writeQuorum(), forceReplicatedStorage ? true : table.enableReplicatedStorage(),
checkNullExprInAutoIncrement(), enableAutomaticPartition);
olapTableSink.init(loadId, txnId, dbId, timeoutS);
Load.checkMergeCondition(mergeConditionStr, table, false);
olapTableSink.setPartialUpdateMode(partialUpdateMode);
olapTableSink.complete(mergeConditionStr);

Expand Down Expand Up @@ -332,7 +332,6 @@ public void buildShufflePlan(TUniqueId loadId, List<List<TBrokerFileStatus>> fil
table.writeQuorum(), table.enableReplicatedStorage(),
checkNullExprInAutoIncrement(), enableAutomaticPartition);
olapTableSink.init(loadId, txnId, dbId, timeoutS);
Load.checkMergeCondition(mergeConditionStr, table, false);
olapTableSink.setPartialUpdateMode(partialUpdateMode);
olapTableSink.complete(mergeConditionStr);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) thr
try {
unprotectedCheckMeta(db, stmt.getTableName(), stmt.getRoutineLoadDesc());
Table table = db.getTable(stmt.getTableName());
Load.checkMergeCondition(stmt.getMergeConditionStr(), (OlapTable) table, false);
Load.checkMergeCondition(stmt.getMergeConditionStr(), (OlapTable) table, table.getFullSchema(), false);
tableId = table.getId();
} finally {
db.readUnlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ public static PulsarRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) th
try {
unprotectedCheckMeta(db, stmt.getTableName(), stmt.getRoutineLoadDesc());
Table table = db.getTable(stmt.getTableName());
Load.checkMergeCondition(stmt.getMergeConditionStr(), (OlapTable) table, false);
Load.checkMergeCondition(stmt.getMergeConditionStr(), (OlapTable) table, table.getFullSchema(), false);
tableId = table.getId();
} finally {
db.readUnlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException {
olapTableSink.setMissAutoIncrementColumn();
}
olapTableSink.init(loadId, streamLoadInfo.getTxnId(), db.getId(), streamLoadInfo.getTimeout());
Load.checkMergeCondition(streamLoadInfo.getMergeConditionStr(), destTable, olapTableSink.missAutoIncrementColumn());
Load.checkMergeCondition(streamLoadInfo.getMergeConditionStr(), destTable, destColumns, olapTableSink.missAutoIncrementColumn());
olapTableSink.setPartialUpdateMode(streamLoadInfo.getPartialUpdateMode());
olapTableSink.complete(streamLoadInfo.getMergeConditionStr());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public class LoadStmt extends DdlStmt {
.add(LOG_REJECTED_RECORD_NUM)
.add(PARTIAL_UPDATE_MODE)
.add(SPARK_LOAD_SUBMIT_TIMEOUT)
.add(MERGE_CONDITION)
.build();

public LoadStmt(LabelName label, List<DataDescription> dataDescriptions, BrokerDesc brokerDesc,
Expand Down
1 change: 1 addition & 0 deletions test/common/data/stream_load/sr_condition_update_1.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
100,k2_100,111,100,100,v4_100,v5_100
2 changes: 2 additions & 0 deletions test/common/data/stream_load/sr_condition_update_2.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
100,k2_100,100
200,k2_200,222
125 changes: 125 additions & 0 deletions test/sql/test_condition_update/R/test_condition_update
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
-- name: test_condition_update
show backends;
create database test_condition_update_2fdsiou12s;
-- result:
-- !result
use test_condition_update_2fdsiou12s;
-- result:
-- !result
CREATE table tab1 (
k1 INTEGER,
k2 VARCHAR(50),
v1 INTEGER,
v2 INTEGER,
v3 INTEGER,
v4 varchar(50),
v5 varchar(50)
)
ENGINE=OLAP
PRIMARY KEY(`k1`,`k2`)
DISTRIBUTED BY HASH(`k1`) BUCKETS 10
PROPERTIES (
"replication_num" = "1"
);
-- result:
-- !result
insert into tab1 values (100, "k2_100", 100, 100, 100, "v4_100", "v5_100");
-- result:
-- !result
insert into tab1 values (200, "k2_200", 200, 200, 200, "v4_200", "v5_200");
-- result:
-- !result
insert into tab1 values (300, "k3_300", 300, 300, 300, "v4_300", "v5_300");
-- result:
-- !result
select * from tab1;
-- result:
100 k2_100 100 100 100 v4_100 v5_100
200 k2_200 200 200 200 v4_200 v5_200
300 k3_300 300 300 300 v4_300 v5_300
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_condition_update_1.csv -XPUT -H label:stream_load_condition_update_123432 -H column_separator:, -H merge_condition:k1 ${url}/api/test_condition_update_2fdsiou12s/tab1/_stream_load
-- result:
0
{
"Status": "Fail",
"Message": "Merge condition column k1 should not be primary key!"
}
-- !result
sync;
-- result:
-- !result
select * from tab1;
-- result:
300 k3_300 300 300 300 v4_300 v5_300
200 k2_200 200 200 200 v4_200 v5_200
100 k2_100 100 100 100 v4_100 v5_100
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_condition_update_1.csv -XPUT -H label:stream_load_condition_update_123433 -H column_separator:, -H merge_condition:v6 ${url}/api/test_condition_update_2fdsiou12s/tab1/_stream_load
-- result:
0
{
"Status": "Fail",
"Message": "Merge condition column v6 does not exist. If you are doing partial update with condition update, please check condition column is in the given update columns. Otherwise please check condition column is in table tab1"
}
-- !result
sync;
-- result:
-- !result
select * from tab1;
-- result:
100 k2_100 100 100 100 v4_100 v5_100
200 k2_200 200 200 200 v4_200 v5_200
300 k3_300 300 300 300 v4_300 v5_300
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_condition_update_1.csv -XPUT -H label:stream_load_condition_update_123434 -H column_separator:, -H merge_condition:v1 ${url}/api/test_condition_update_2fdsiou12s/tab1/_stream_load
-- result:
0
{
"Status": "Success",
"Message": "OK"
}
-- !result
sync;
-- result:
-- !result
select * from tab1;
-- result:
100 k2_100 111 100 100 v4_100 v5_100
300 k3_300 300 300 300 v4_300 v5_300
200 k2_200 200 200 200 v4_200 v5_200
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_condition_update_2.csv -XPUT -H label:stream_load_condition_update_123435 -H column_separator:, -H merge_condition:v2 -H partial_update:true -H columns:k1,k2,v1 ${url}/api/test_condition_update_2fdsiou12s/tab1/_stream_load
-- result:
0
{
"Status": "Fail",
"Message": "Merge condition column v2 does not exist. If you are doing partial update with condition update, please check condition column is in the given update columns. Otherwise please check condition column is in table tab1"
}
-- !result
sync;
-- result:
-- !result
select * from tab1;
-- result:
300 k3_300 300 300 300 v4_300 v5_300
200 k2_200 200 200 200 v4_200 v5_200
100 k2_100 111 100 100 v4_100 v5_100
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_condition_update_2.csv -XPUT -H label:stream_load_condition_update_123436 -H column_separator:, -H merge_condition:v1 -H partial_update:true -H columns:k1,k2,v1 ${url}/api/test_condition_update_2fdsiou12s/tab1/_stream_load
-- result:
0
{
"Status": "Success",
"Message": "OK"
}
-- !result
sync;
-- result:
-- !result
select * from tab1;
-- result:
100 k2_100 111 100 100 v4_100 v5_100
300 k3_300 300 300 300 v4_300 v5_300
200 k2_200 222 200 200 v4_200 v5_200
-- !result
48 changes: 48 additions & 0 deletions test/sql/test_condition_update/T/test_condition_update
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
-- name: test_condition_update
show backends;
create database test_condition_update_2fdsiou12s;
use test_condition_update_2fdsiou12s;

CREATE table tab1 (
k1 INTEGER,
k2 VARCHAR(50),
v1 INTEGER,
v2 INTEGER,
v3 INTEGER,
v4 varchar(50),
v5 varchar(50)
)
ENGINE=OLAP
PRIMARY KEY(`k1`,`k2`)
DISTRIBUTED BY HASH(`k1`) BUCKETS 10
PROPERTIES (
"replication_num" = "1"
);

insert into tab1 values (100, "k2_100", 100, 100, 100, "v4_100", "v5_100");
insert into tab1 values (200, "k2_200", 200, 200, 200, "v4_200", "v5_200");
insert into tab1 values (300, "k3_300", 300, 300, 300, "v4_300", "v5_300");
select * from tab1;

shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_condition_update_1.csv -XPUT -H label:stream_load_condition_update_123432 -H column_separator:, -H merge_condition:k1 ${url}/api/test_condition_update_2fdsiou12s/tab1/_stream_load
sync;
select * from tab1;

shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_condition_update_1.csv -XPUT -H label:stream_load_condition_update_123433 -H column_separator:, -H merge_condition:v6 ${url}/api/test_condition_update_2fdsiou12s/tab1/_stream_load
sync;
select * from tab1;

shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_condition_update_1.csv -XPUT -H label:stream_load_condition_update_123434 -H column_separator:, -H merge_condition:v1 ${url}/api/test_condition_update_2fdsiou12s/tab1/_stream_load
sync;
select * from tab1;


shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_condition_update_2.csv -XPUT -H label:stream_load_condition_update_123435 -H column_separator:, -H merge_condition:v2 -H partial_update:true -H columns:k1,k2,v1 ${url}/api/test_condition_update_2fdsiou12s/tab1/_stream_load
sync;
select * from tab1;


shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_condition_update_2.csv -XPUT -H label:stream_load_condition_update_123436 -H column_separator:, -H merge_condition:v1 -H partial_update:true -H columns:k1,k2,v1 ${url}/api/test_condition_update_2fdsiou12s/tab1/_stream_load
sync;
select * from tab1;