Skip to content

Commit

Permalink
[Enhancement] Enable partial update to be used with condition update (#…
Browse files Browse the repository at this point in the history
…30242) (#30489)

Actually, we can do a partial update with a condition update, but we reject it. This pr enables it.

Signed-off-by: zhangqiang <qiangzh95@gmail.com>
  • Loading branch information
sevev authored Sep 6, 2023
1 parent 2d8e34a commit a368a71
Show file tree
Hide file tree
Showing 12 changed files with 196 additions and 15 deletions.
3 changes: 3 additions & 0 deletions be/src/storage/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ Status DeltaWriter::_init() {
sort_key_idxes.begin(), sort_key_idxes.end())) {
_partial_schema_with_sort_key = true;
}
if (!_opt.merge_condition.empty()) {
writer_context.merge_condition = _opt.merge_condition;
}
writer_context.tablet_schema = writer_context.partial_update_tablet_schema.get();
writer_context.partial_update_mode = _opt.partial_update_mode;
} else {
Expand Down
3 changes: 1 addition & 2 deletions be/src/storage/rowset_update_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,8 +581,7 @@ Status RowsetUpdateState::apply(Tablet* tablet, Rowset* rowset, uint32_t rowset_
EditVersion latest_applied_version, const PrimaryIndex& index,
std::unique_ptr<Column>& delete_pks, int64_t* append_column_size) {
const auto& rowset_meta_pb = rowset->rowset_meta()->get_meta_pb();
if (!rowset_meta_pb.has_txn_meta() || rowset->num_segments() == 0 ||
rowset_meta_pb.txn_meta().has_merge_condition()) {
if (!rowset_meta_pb.has_txn_meta() || rowset->num_segments() == 0) {
return Status::OK();
}
const auto& txn_meta = rowset_meta_pb.txn_meta();
Expand Down
19 changes: 11 additions & 8 deletions fe/fe-core/src/main/java/com/starrocks/load/Load.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -124,7 +125,7 @@ public Load() {
* @return
* @throws UserException
*/
public static void checkMergeCondition(String mergeCondition, OlapTable table,
public static void checkMergeCondition(String mergeCondition, OlapTable table, List<Column> columns,
boolean missAutoIncrementColumn) throws DdlException {
if (mergeCondition == null || mergeCondition.isEmpty()) {
return;
Expand All @@ -134,15 +135,20 @@ public static void checkMergeCondition(String mergeCondition, OlapTable table,
throw new DdlException("Conditional update only support primary key table " + table.getName());
}

if (table.getColumn(mergeCondition) != null) {
if (table.getColumn(mergeCondition).isKey()) {
Optional<Column> conditionCol = columns.stream().filter(c -> c.getName().equalsIgnoreCase(mergeCondition)).findFirst();
if (!conditionCol.isPresent()) {
throw new DdlException("Merge condition column " + mergeCondition +
" does not exist. If you are doing partial update with condition update, please check condition column" +
" is in the given update columns. Otherwise please check condition column is in table " + table.getName());
} else {
if (conditionCol.get().isKey()) {
throw new DdlException("Merge condition column " + mergeCondition
+ " should not be primary key!");
}
if (missAutoIncrementColumn && table.getColumn(mergeCondition).isAutoIncrement()) {
if (missAutoIncrementColumn && conditionCol.get().isAutoIncrement()) {
throw new DdlException("Merge condition column can not be auto increment column in partial update");
}
switch (table.getColumn(mergeCondition).getPrimitiveType()) {
switch (conditionCol.get().getPrimitiveType()) {
case CHAR:
case VARCHAR:
case PERCENTILE:
Expand All @@ -160,9 +166,6 @@ public static void checkMergeCondition(String mergeCondition, OlapTable table,
default:
return;
}
} else {
throw new DdlException("Merge condition column " + mergeCondition
+ " should be a column of the table " + table.getName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ public void plan(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusesLis
slotDesc.setIsNullable(false);
}

Load.checkMergeCondition(mergeConditionStr, table, destColumns, false);
if (Config.enable_shuffle_load && needShufflePlan()) {
if (Config.eliminate_shuffle_load_by_replicated_storage) {
buildDirectPlan(loadId, fileStatusesList, filesAdded, true);
Expand Down Expand Up @@ -250,7 +251,6 @@ public void buildDirectPlan(TUniqueId loadId, List<List<TBrokerFileStatus>> file
table.writeQuorum(), forceReplicatedStorage ? true : table.enableReplicatedStorage(),
checkNullExprInAutoIncrement(), enableAutomaticPartition);
olapTableSink.init(loadId, txnId, dbId, timeoutS);
Load.checkMergeCondition(mergeConditionStr, table, false);
olapTableSink.setPartialUpdateMode(partialUpdateMode);
olapTableSink.complete(mergeConditionStr);

Expand Down Expand Up @@ -332,7 +332,6 @@ public void buildShufflePlan(TUniqueId loadId, List<List<TBrokerFileStatus>> fil
table.writeQuorum(), table.enableReplicatedStorage(),
checkNullExprInAutoIncrement(), enableAutomaticPartition);
olapTableSink.init(loadId, txnId, dbId, timeoutS);
Load.checkMergeCondition(mergeConditionStr, table, false);
olapTableSink.setPartialUpdateMode(partialUpdateMode);
olapTableSink.complete(mergeConditionStr);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ public static KafkaRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) thr
try {
unprotectedCheckMeta(db, stmt.getTableName(), stmt.getRoutineLoadDesc());
Table table = db.getTable(stmt.getTableName());
Load.checkMergeCondition(stmt.getMergeConditionStr(), (OlapTable) table, false);
Load.checkMergeCondition(stmt.getMergeConditionStr(), (OlapTable) table, table.getFullSchema(), false);
tableId = table.getId();
} finally {
db.readUnlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ public static PulsarRoutineLoadJob fromCreateStmt(CreateRoutineLoadStmt stmt) th
try {
unprotectedCheckMeta(db, stmt.getTableName(), stmt.getRoutineLoadDesc());
Table table = db.getTable(stmt.getTableName());
Load.checkMergeCondition(stmt.getMergeConditionStr(), (OlapTable) table, false);
Load.checkMergeCondition(stmt.getMergeConditionStr(), (OlapTable) table, table.getFullSchema(), false);
tableId = table.getId();
} finally {
db.readUnlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException {
olapTableSink.setMissAutoIncrementColumn();
}
olapTableSink.init(loadId, streamLoadInfo.getTxnId(), db.getId(), streamLoadInfo.getTimeout());
Load.checkMergeCondition(streamLoadInfo.getMergeConditionStr(), destTable, olapTableSink.missAutoIncrementColumn());
Load.checkMergeCondition(streamLoadInfo.getMergeConditionStr(), destTable, destColumns, olapTableSink.missAutoIncrementColumn());
olapTableSink.setPartialUpdateMode(streamLoadInfo.getPartialUpdateMode());
olapTableSink.complete(streamLoadInfo.getMergeConditionStr());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public class LoadStmt extends DdlStmt {
.add(LOG_REJECTED_RECORD_NUM)
.add(PARTIAL_UPDATE_MODE)
.add(SPARK_LOAD_SUBMIT_TIMEOUT)
.add(MERGE_CONDITION)
.build();

public LoadStmt(LabelName label, List<DataDescription> dataDescriptions, BrokerDesc brokerDesc,
Expand Down
1 change: 1 addition & 0 deletions test/common/data/stream_load/sr_condition_update_1.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
100,k2_100,111,100,100,v4_100,v5_100
2 changes: 2 additions & 0 deletions test/common/data/stream_load/sr_condition_update_2.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
100,k2_100,100
200,k2_200,222
125 changes: 125 additions & 0 deletions test/sql/test_condition_update/R/test_condition_update
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
-- name: test_condition_update
show backends;
create database test_condition_update_2fdsiou12s;
-- result:
-- !result
use test_condition_update_2fdsiou12s;
-- result:
-- !result
CREATE table tab1 (
k1 INTEGER,
k2 VARCHAR(50),
v1 INTEGER,
v2 INTEGER,
v3 INTEGER,
v4 varchar(50),
v5 varchar(50)
)
ENGINE=OLAP
PRIMARY KEY(`k1`,`k2`)
DISTRIBUTED BY HASH(`k1`) BUCKETS 10
PROPERTIES (
"replication_num" = "1"
);
-- result:
-- !result
insert into tab1 values (100, "k2_100", 100, 100, 100, "v4_100", "v5_100");
-- result:
-- !result
insert into tab1 values (200, "k2_200", 200, 200, 200, "v4_200", "v5_200");
-- result:
-- !result
insert into tab1 values (300, "k3_300", 300, 300, 300, "v4_300", "v5_300");
-- result:
-- !result
select * from tab1;
-- result:
100 k2_100 100 100 100 v4_100 v5_100
200 k2_200 200 200 200 v4_200 v5_200
300 k3_300 300 300 300 v4_300 v5_300
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_condition_update_1.csv -XPUT -H label:stream_load_condition_update_123432 -H column_separator:, -H merge_condition:k1 ${url}/api/test_condition_update_2fdsiou12s/tab1/_stream_load
-- result:
0
{
"Status": "Fail",
"Message": "Merge condition column k1 should not be primary key!"
}
-- !result
sync;
-- result:
-- !result
select * from tab1;
-- result:
300 k3_300 300 300 300 v4_300 v5_300
200 k2_200 200 200 200 v4_200 v5_200
100 k2_100 100 100 100 v4_100 v5_100
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_condition_update_1.csv -XPUT -H label:stream_load_condition_update_123433 -H column_separator:, -H merge_condition:v6 ${url}/api/test_condition_update_2fdsiou12s/tab1/_stream_load
-- result:
0
{
"Status": "Fail",
"Message": "Merge condition column v6 does not exist. If you are doing partial update with condition update, please check condition column is in the given update columns. Otherwise please check condition column is in table tab1"
}
-- !result
sync;
-- result:
-- !result
select * from tab1;
-- result:
100 k2_100 100 100 100 v4_100 v5_100
200 k2_200 200 200 200 v4_200 v5_200
300 k3_300 300 300 300 v4_300 v5_300
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_condition_update_1.csv -XPUT -H label:stream_load_condition_update_123434 -H column_separator:, -H merge_condition:v1 ${url}/api/test_condition_update_2fdsiou12s/tab1/_stream_load
-- result:
0
{
"Status": "Success",
"Message": "OK"
}
-- !result
sync;
-- result:
-- !result
select * from tab1;
-- result:
100 k2_100 111 100 100 v4_100 v5_100
300 k3_300 300 300 300 v4_300 v5_300
200 k2_200 200 200 200 v4_200 v5_200
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_condition_update_2.csv -XPUT -H label:stream_load_condition_update_123435 -H column_separator:, -H merge_condition:v2 -H partial_update:true -H columns:k1,k2,v1 ${url}/api/test_condition_update_2fdsiou12s/tab1/_stream_load
-- result:
0
{
"Status": "Fail",
"Message": "Merge condition column v2 does not exist. If you are doing partial update with condition update, please check condition column is in the given update columns. Otherwise please check condition column is in table tab1"
}
-- !result
sync;
-- result:
-- !result
select * from tab1;
-- result:
300 k3_300 300 300 300 v4_300 v5_300
200 k2_200 200 200 200 v4_200 v5_200
100 k2_100 111 100 100 v4_100 v5_100
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_condition_update_2.csv -XPUT -H label:stream_load_condition_update_123436 -H column_separator:, -H merge_condition:v1 -H partial_update:true -H columns:k1,k2,v1 ${url}/api/test_condition_update_2fdsiou12s/tab1/_stream_load
-- result:
0
{
"Status": "Success",
"Message": "OK"
}
-- !result
sync;
-- result:
-- !result
select * from tab1;
-- result:
100 k2_100 111 100 100 v4_100 v5_100
300 k3_300 300 300 300 v4_300 v5_300
200 k2_200 222 200 200 v4_200 v5_200
-- !result
48 changes: 48 additions & 0 deletions test/sql/test_condition_update/T/test_condition_update
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
-- name: test_condition_update
show backends;
create database test_condition_update_2fdsiou12s;
use test_condition_update_2fdsiou12s;

CREATE table tab1 (
k1 INTEGER,
k2 VARCHAR(50),
v1 INTEGER,
v2 INTEGER,
v3 INTEGER,
v4 varchar(50),
v5 varchar(50)
)
ENGINE=OLAP
PRIMARY KEY(`k1`,`k2`)
DISTRIBUTED BY HASH(`k1`) BUCKETS 10
PROPERTIES (
"replication_num" = "1"
);

insert into tab1 values (100, "k2_100", 100, 100, 100, "v4_100", "v5_100");
insert into tab1 values (200, "k2_200", 200, 200, 200, "v4_200", "v5_200");
insert into tab1 values (300, "k3_300", 300, 300, 300, "v4_300", "v5_300");
select * from tab1;

shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_condition_update_1.csv -XPUT -H label:stream_load_condition_update_123432 -H column_separator:, -H merge_condition:k1 ${url}/api/test_condition_update_2fdsiou12s/tab1/_stream_load
sync;
select * from tab1;

shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_condition_update_1.csv -XPUT -H label:stream_load_condition_update_123433 -H column_separator:, -H merge_condition:v6 ${url}/api/test_condition_update_2fdsiou12s/tab1/_stream_load
sync;
select * from tab1;

shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_condition_update_1.csv -XPUT -H label:stream_load_condition_update_123434 -H column_separator:, -H merge_condition:v1 ${url}/api/test_condition_update_2fdsiou12s/tab1/_stream_load
sync;
select * from tab1;


shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_condition_update_2.csv -XPUT -H label:stream_load_condition_update_123435 -H column_separator:, -H merge_condition:v2 -H partial_update:true -H columns:k1,k2,v1 ${url}/api/test_condition_update_2fdsiou12s/tab1/_stream_load
sync;
select * from tab1;


shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_condition_update_2.csv -XPUT -H label:stream_load_condition_update_123436 -H column_separator:, -H merge_condition:v1 -H partial_update:true -H columns:k1,k2,v1 ${url}/api/test_condition_update_2fdsiou12s/tab1/_stream_load
sync;
select * from tab1;

0 comments on commit a368a71

Please sign in to comment.