Skip to content

Commit

Permalink
[BugFix] Fix stream load fail when online optimize double write (Star…
Browse files Browse the repository at this point in the history
…Rocks#51289)

Signed-off-by: meegoo <meegoo.sr@gmail.com>
  • Loading branch information
meegoo authored Sep 26, 2024
1 parent 2992463 commit ea03926
Show file tree
Hide file tree
Showing 5 changed files with 401 additions and 1 deletion.
7 changes: 7 additions & 0 deletions be/src/exec/multi_olap_table_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ Status MultiOlapTableSink::prepare(RuntimeState* state) {
return Status::OK();
}

Status MultiOlapTableSink::send_chunk(RuntimeState* state, Chunk* chunk) {
for (auto& sink : _sinks) {
RETURN_IF_ERROR(sink->send_chunk(state, chunk));
}
return Status::OK();
}

Status MultiOlapTableSink::open(RuntimeState* state) {
for (auto& sink : _sinks) {
RETURN_IF_ERROR(sink->open(state));
Expand Down
14 changes: 13 additions & 1 deletion be/src/exec/multi_olap_table_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,26 @@ class MultiOlapTableSink : public AsyncDataSink {
* @brief Sends a chunk of data to the MultiOlapTableSink.
*
* This method is called to send a chunk of data to the sink.
* if is_full() return false, send_chunk() will not block
* if is_full() return false, send_chunk_nonblocking() will not block
*
* @param state The runtime state.
* @param chunk The chunk of data to send.
* @return The status of the send operation.
*/
Status send_chunk_nonblocking(RuntimeState* state, Chunk* chunk) override;

/*
* @brief Sends a chunk of data to the MultiOlapTableSink synchronously.
*
* This method is called to send a chunk of data to the sink.
* send_chunk() will block until the chunk is sent.
*
* @param state The runtime state.
* @param chunk The chunk of data to send.
* @return The status of the send operation.
*/
Status send_chunk(RuntimeState* state, Chunk* chunk) override;

/**
* @brief Checks if the MultiOlapTableSink is full.
*
Expand Down
3 changes: 3 additions & 0 deletions test/common/data/stream_load/sr_8486.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
4 2020-06-01
5 2020-07-01
6 2020-08-01
318 changes: 318 additions & 0 deletions test/sql/test_optimize_table/R/test_optimize_table
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ PROPERTIES (
-- !result




-- name: test_alter_key_buckets
CREATE TABLE demo2_alter_0 (
`user_name` VARCHAR(32) DEFAULT '',
Expand All @@ -115,6 +117,8 @@ None
-- !result




-- name: test_online_optimize_table
create table t(k int, k1 date) PARTITION BY RANGE(`k1`)
(
Expand Down Expand Up @@ -320,6 +324,8 @@ select count(*) from t;
-- result:
63
-- !result


-- name: test_online_optimize_table_pk
create table t(k int) primary key(k) distributed by hash(k) buckets 10;
-- result:
Expand Down Expand Up @@ -538,4 +544,316 @@ select * from t;
18
19
20
-- !result

-- name: test_online_optimize_table_stream_load
create database db_${uuid0};
-- result:
-- !result
use db_${uuid0};
-- result:
-- !result
create table t(k int, k1 date) PARTITION BY RANGE(`k1`)
(
PARTITION `p202006` VALUES LESS THAN ("2020-07-01"),
PARTITION `p202007` VALUES LESS THAN ("2020-08-01"),
PARTITION `p202008` VALUES LESS THAN ("2020-09-01")
) distributed by hash(k) buckets 10;
-- result:
-- !result
insert into t values(1, '2020-06-01'),(2, '2020-07-01'),(3, '2020-08-01');
-- result:
-- !result
show create table t;
-- result:
t CREATE TABLE `t` (
`k` int(11) NULL COMMENT "",
`k1` date NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`k`, `k1`)
PARTITION BY RANGE(`k1`)
(PARTITION p202006 VALUES [("0000-01-01"), ("2020-07-01")),
PARTITION p202007 VALUES [("2020-07-01"), ("2020-08-01")),
PARTITION p202008 VALUES [("2020-08-01"), ("2020-09-01")))
DISTRIBUTED BY HASH(`k`) BUCKETS 10
PROPERTIES (
"compression" = "LZ4",
"fast_schema_evolution" = "true",
"replicated_storage" = "true",
"replication_num" = "3"
);
-- !result
alter table t distributed by hash(k);
-- result:
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load
-- result:
0
{
"Status": "Success",
"Message": "OK"
}
-- !result
select sleep(1);
-- result:
1
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load
-- result:
0
{
"Status": "Success",
"Message": "OK"
}
-- !result
select sleep(1);
-- result:
1
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load
-- result:
0
{
"Status": "Success",
"Message": "OK"
}
-- !result
select sleep(1);
-- result:
1
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load
-- result:
0
{
"Status": "Success",
"Message": "OK"
}
-- !result
select sleep(1);
-- result:
1
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load
-- result:
0
{
"Status": "Success",
"Message": "OK"
}
-- !result
select sleep(1);
-- result:
1
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load
-- result:
0
{
"Status": "Success",
"Message": "OK"
}
-- !result
select sleep(1);
-- result:
1
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load
-- result:
0
{
"Status": "Success",
"Message": "OK"
}
-- !result
select sleep(1);
-- result:
1
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load
-- result:
0
{
"Status": "Success",
"Message": "OK"
}
-- !result
select sleep(1);
-- result:
1
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load
-- result:
0
{
"Status": "Success",
"Message": "OK"
}
-- !result
select sleep(1);
-- result:
1
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load
-- result:
0
{
"Status": "Success",
"Message": "OK"
}
-- !result
select sleep(1);
-- result:
1
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load
-- result:
0
{
"Status": "Success",
"Message": "OK"
}
-- !result
select sleep(1);
-- result:
1
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load
-- result:
0
{
"Status": "Success",
"Message": "OK"
}
-- !result
select sleep(1);
-- result:
1
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load
-- result:
0
{
"Status": "Success",
"Message": "OK"
}
-- !result
select sleep(1);
-- result:
1
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load
-- result:
0
{
"Status": "Success",
"Message": "OK"
}
-- !result
select sleep(1);
-- result:
1
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load
-- result:
0
{
"Status": "Success",
"Message": "OK"
}
-- !result
select sleep(1);
-- result:
1
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load
-- result:
0
{
"Status": "Success",
"Message": "OK"
}
-- !result
select sleep(1);
-- result:
1
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load
-- result:
0
{
"Status": "Success",
"Message": "OK"
}
-- !result
select sleep(1);
-- result:
1
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load
-- result:
0
{
"Status": "Success",
"Message": "OK"
}
-- !result
select sleep(1);
-- result:
1
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load
-- result:
0
{
"Status": "Success",
"Message": "OK"
}
-- !result
select sleep(1);
-- result:
1
-- !result
shell: curl --location-trusted -u root: -T ${root_path}/lib/../common/data/stream_load/sr_8486.csv -XPUT ${url}/api/db_${uuid0}/t/_stream_load
-- result:
0
{
"Status": "Success",
"Message": "OK"
}
-- !result
select sleep(1);
-- result:
1
-- !result
select count(*) from t;
-- result:
63
-- !result
function: wait_optimize_table_finish()
-- result:
None
-- !result
show create table t;
-- result:
t CREATE TABLE `t` (
`k` int(11) NULL COMMENT "",
`k1` date NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`k`, `k1`)
PARTITION BY RANGE(`k1`)
(PARTITION p202006 VALUES [("0000-01-01"), ("2020-07-01")),
PARTITION p202007 VALUES [("2020-07-01"), ("2020-08-01")),
PARTITION p202008 VALUES [("2020-08-01"), ("2020-09-01")))
DISTRIBUTED BY HASH(`k`)
PROPERTIES (
"compression" = "LZ4",
"fast_schema_evolution" = "true",
"replicated_storage" = "true",
"replication_num" = "3"
);
-- !result
select count(*) from t;
-- result:
63
-- !result
Loading

0 comments on commit ea03926

Please sign in to comment.