From ea039265aa5e007620f50771563309f2915586be Mon Sep 17 00:00:00 2001 From: meegoo Date: Thu, 26 Sep 2024 09:13:15 +0800 Subject: [PATCH] [BugFix] Fix stream load fail when online optimize double write (#51289) Signed-off-by: meegoo --- be/src/exec/multi_olap_table_sink.cpp | 7 + be/src/exec/multi_olap_table_sink.h | 14 +- test/common/data/stream_load/sr_8486.csv | 3 + .../test_optimize_table/R/test_optimize_table | 318 ++++++++++++++++++ .../test_optimize_table/T/test_optimize_table | 60 ++++ 5 files changed, 401 insertions(+), 1 deletion(-) create mode 100644 test/common/data/stream_load/sr_8486.csv diff --git a/be/src/exec/multi_olap_table_sink.cpp b/be/src/exec/multi_olap_table_sink.cpp index f75cde3eecf16e..4397584a7e82f0 100644 --- a/be/src/exec/multi_olap_table_sink.cpp +++ b/be/src/exec/multi_olap_table_sink.cpp @@ -38,6 +38,13 @@ Status MultiOlapTableSink::prepare(RuntimeState* state) { return Status::OK(); } +Status MultiOlapTableSink::send_chunk(RuntimeState* state, Chunk* chunk) { + for (auto& sink : _sinks) { + RETURN_IF_ERROR(sink->send_chunk(state, chunk)); + } + return Status::OK(); +} + Status MultiOlapTableSink::open(RuntimeState* state) { for (auto& sink : _sinks) { RETURN_IF_ERROR(sink->open(state)); diff --git a/be/src/exec/multi_olap_table_sink.h b/be/src/exec/multi_olap_table_sink.h index 025df202d555a9..3214655a56ac31 100644 --- a/be/src/exec/multi_olap_table_sink.h +++ b/be/src/exec/multi_olap_table_sink.h @@ -102,7 +102,7 @@ class MultiOlapTableSink : public AsyncDataSink { * @brief Sends a chunk of data to the MultiOlapTableSink. * * This method is called to send a chunk of data to the sink. - * if is_full() return false, send_chunk() will not block + * if is_full() return false, send_chunk_nonblocking() will not block * * @param state The runtime state. * @param chunk The chunk of data to send. @@ -110,6 +110,18 @@ class MultiOlapTableSink : public AsyncDataSink { */ Status send_chunk_nonblocking(RuntimeState* state, Chunk* chunk) override; + /* + * @brief Sends a chunk of data to the MultiOlapTableSink synchronously. + * + * This method is called to send a chunk of data to the sink. + * send_chunk() will block until the chunk is sent. + * + * @param state The runtime state. + * @param chunk The chunk of data to send. + * @return The status of the send operation. + */ + Status send_chunk(RuntimeState* state, Chunk* chunk) override; + /** * @brief Checks if the MultiOlapTableSink is full. * diff --git a/test/common/data/stream_load/sr_8486.csv b/test/common/data/stream_load/sr_8486.csv new file mode 100644 index 00000000000000..986079b46229c9 --- /dev/null +++ b/test/common/data/stream_load/sr_8486.csv @@ -0,0 +1,3 @@ +4 2020-06-01 +5 2020-07-01 +6 2020-08-01 diff --git a/test/sql/test_optimize_table/R/test_optimize_table b/test/sql/test_optimize_table/R/test_optimize_table index 19859f0f54df4b..222e27141797cf 100644 --- a/test/sql/test_optimize_table/R/test_optimize_table +++ b/test/sql/test_optimize_table/R/test_optimize_table @@ -94,6 +94,8 @@ PROPERTIES ( -- !result + + -- name: test_alter_key_buckets CREATE TABLE demo2_alter_0 ( `user_name` VARCHAR(32) DEFAULT '', @@ -115,6 +117,8 @@ None -- !result + + -- name: test_online_optimize_table create table t(k int, k1 date) PARTITION BY RANGE(`k1`) ( @@ -320,6 +324,8 @@ select count(*) from t; -- result: 63 -- !result + + -- name: test_online_optimize_table_pk create table t(k int) primary key(k) distributed by hash(k) buckets 10; -- result: @@ -538,4 +544,316 @@ select * from t; 18 19 20 +-- !result + +-- name: test_online_optimize_table_stream_load +create database db_${uuid0}; +-- result: +-- !result +use db_${uuid0}; +-- result: +-- !result +create table t(k int, k1 date) PARTITION BY RANGE(`k1`) +( + PARTITION `p202006` VALUES LESS THAN ("2020-07-01"), + PARTITION `p202007` VALUES LESS THAN ("2020-08-01"), + PARTITION `p202008` VALUES LESS THAN ("2020-09-01") +) distributed by hash(k) buckets 10; +-- result: +-- !result +insert into t values(1, '2020-06-01'),(2, '2020-07-01'),(3, '2020-08-01'); +-- result: +-- !result +show create table t; +-- result: +t CREATE TABLE `t` ( + `k` int(11) NULL COMMENT "", + `k1` date NULL COMMENT "" +) ENGINE=OLAP +DUPLICATE KEY(`k`, `k1`) +PARTITION BY RANGE(`k1`) +(PARTITION p202006 VALUES [("0000-01-01"), ("2020-07-01")), +PARTITION p202007 VALUES [("2020-07-01"), ("2020-08-01")), +PARTITION p202008 VALUES [("2020-08-01"), ("2020-09-01"))) +DISTRIBUTED BY HASH(`k`) BUCKETS 10 +PROPERTIES ( +"compression" = "LZ4", +"fast_schema_evolution" = "true", +"replicated_storage" = "true", +"replication_num" = "3" +); +-- !result +alter table t distributed by hash(k); +-- result: +-- !result +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +-- result: +0 +{ + "Status": "Success", + "Message": "OK" +} +-- !result +select sleep(1); +-- result: +1 +-- !result +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +-- result: +0 +{ + "Status": "Success", + "Message": "OK" +} +-- !result +select sleep(1); +-- result: +1 +-- !result +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +-- result: +0 +{ + "Status": "Success", + "Message": "OK" +} +-- !result +select sleep(1); +-- result: +1 +-- !result +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +-- result: +0 +{ + "Status": "Success", + "Message": "OK" +} +-- !result +select sleep(1); +-- result: +1 +-- !result +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +-- result: +0 +{ + "Status": "Success", + "Message": "OK" +} +-- !result +select sleep(1); +-- result: +1 +-- !result +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +-- result: +0 +{ + "Status": "Success", + "Message": "OK" +} +-- !result +select sleep(1); +-- result: +1 +-- !result +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +-- result: +0 +{ + "Status": "Success", + "Message": "OK" +} +-- !result +select sleep(1); +-- result: +1 +-- !result +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +-- result: +0 +{ + "Status": "Success", + "Message": "OK" +} +-- !result +select sleep(1); +-- result: +1 +-- !result +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +-- result: +0 +{ + "Status": "Success", + "Message": "OK" +} +-- !result +select sleep(1); +-- result: +1 +-- !result +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +-- result: +0 +{ + "Status": "Success", + "Message": "OK" +} +-- !result +select sleep(1); +-- result: +1 +-- !result +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +-- result: +0 +{ + "Status": "Success", + "Message": "OK" +} +-- !result +select sleep(1); +-- result: +1 +-- !result +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +-- result: +0 +{ + "Status": "Success", + "Message": "OK" +} +-- !result +select sleep(1); +-- result: +1 +-- !result +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +-- result: +0 +{ + "Status": "Success", + "Message": "OK" +} +-- !result +select sleep(1); +-- result: +1 +-- !result +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +-- result: +0 +{ + "Status": "Success", + "Message": "OK" +} +-- !result +select sleep(1); +-- result: +1 +-- !result +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +-- result: +0 +{ + "Status": "Success", + "Message": "OK" +} +-- !result +select sleep(1); +-- result: +1 +-- !result +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +-- result: +0 +{ + "Status": "Success", + "Message": "OK" +} +-- !result +select sleep(1); +-- result: +1 +-- !result +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +-- result: +0 +{ + "Status": "Success", + "Message": "OK" +} +-- !result +select sleep(1); +-- result: +1 +-- !result +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +-- result: +0 +{ + "Status": "Success", + "Message": "OK" +} +-- !result +select sleep(1); +-- result: +1 +-- !result +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +-- result: +0 +{ + "Status": "Success", + "Message": "OK" +} +-- !result +select sleep(1); +-- result: +1 +-- !result +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +-- result: +0 +{ + "Status": "Success", + "Message": "OK" +} +-- !result +select sleep(1); +-- result: +1 +-- !result +select count(*) from t; +-- result: +63 +-- !result +function: wait_optimize_table_finish() +-- result: +None +-- !result +show create table t; +-- result: +t CREATE TABLE `t` ( + `k` int(11) NULL COMMENT "", + `k1` date NULL COMMENT "" +) ENGINE=OLAP +DUPLICATE KEY(`k`, `k1`) +PARTITION BY RANGE(`k1`) +(PARTITION p202006 VALUES [("0000-01-01"), ("2020-07-01")), +PARTITION p202007 VALUES [("2020-07-01"), ("2020-08-01")), +PARTITION p202008 VALUES [("2020-08-01"), ("2020-09-01"))) +DISTRIBUTED BY HASH(`k`) +PROPERTIES ( +"compression" = "LZ4", +"fast_schema_evolution" = "true", +"replicated_storage" = "true", +"replication_num" = "3" +); +-- !result +select count(*) from t; +-- result: +63 -- !result \ No newline at end of file diff --git a/test/sql/test_optimize_table/T/test_optimize_table b/test/sql/test_optimize_table/T/test_optimize_table index 0d704f3a1fd7a2..36ec5a4653ce52 100644 --- a/test/sql/test_optimize_table/T/test_optimize_table +++ b/test/sql/test_optimize_table/T/test_optimize_table @@ -224,3 +224,63 @@ select * from t; function: wait_optimize_table_finish() show create table t; select * from t; + +-- name: test_online_optimize_table_stream_load +create database db_${uuid0}; +use db_${uuid0}; +create table t(k int, k1 date) PARTITION BY RANGE(`k1`) +( + PARTITION `p202006` VALUES LESS THAN ("2020-07-01"), + PARTITION `p202007` VALUES LESS THAN ("2020-08-01"), + PARTITION `p202008` VALUES LESS THAN ("2020-09-01") +) distributed by hash(k) buckets 10; +insert into t values(1, '2020-06-01'),(2, '2020-07-01'),(3, '2020-08-01'); + +show create table t; +alter table t distributed by hash(k); +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +select sleep(1); +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +select sleep(1); +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +select sleep(1); +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +select sleep(1); +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +select sleep(1); +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +select sleep(1); +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +select sleep(1); +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +select sleep(1); +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +select sleep(1); +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +select sleep(1); +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +select sleep(1); +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +select sleep(1); +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +select sleep(1); +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +select sleep(1); +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +select sleep(1); +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +select sleep(1); +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +select sleep(1); +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +select sleep(1); +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +select sleep(1); +shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load +select sleep(1); +-- show partitions from t; +select count(*) from t; +function: wait_optimize_table_finish() +show create table t; +-- show partitions from t; +select count(*) from t; \ No newline at end of file