Skip to content

Commit

Permalink
[BugFix] Fix use wrong hash table when spill left/right outer join ha…
Browse files Browse the repository at this point in the history
…s other conjuncts (StarRocks#23302)

Signed-off-by: stdpain <drfeng08@gmail.com>
  • Loading branch information
stdpain authored May 15, 2023
1 parent 2fc54d4 commit 03dfe31
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 22 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ fs_brokers/apache_hdfs_broker/jindosdk-4.6.2
dependency-reduced-pom.xml
test/conf/
test/conf/sr.conf
test/nosetests.xml
tags
.tags
.cache
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/hash_join_components.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ StatusOr<ChunkPtr> HashJoinProber::probe_chunk(RuntimeState* state, JoinHashTabl
if (!_current_probe_has_remain) {
_probe_chunk = nullptr;
}
RETURN_IF_ERROR(_hash_joiner.filter_probe_output_chunk(chunk));
RETURN_IF_ERROR(_hash_joiner.filter_probe_output_chunk(chunk, *hash_table));
TRY_CATCH_ALLOC_SCOPE_END()
return chunk;
}
Expand Down
28 changes: 13 additions & 15 deletions be/src/exec/hash_joiner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -439,64 +439,62 @@ void HashJoiner::_process_row_for_other_conjunct(ChunkPtr* chunk, size_t start_c
}
}

Status HashJoiner::_process_outer_join_with_other_conjunct(ChunkPtr* chunk, size_t start_column, size_t column_count) {
Status HashJoiner::_process_outer_join_with_other_conjunct(ChunkPtr* chunk, size_t start_column, size_t column_count,
JoinHashTable& hash_table) {
bool filter_all = false;
bool hit_all = false;
Filter filter;

RETURN_IF_ERROR(_calc_filter_for_other_conjunct(chunk, filter, filter_all, hit_all));
_process_row_for_other_conjunct(chunk, start_column, column_count, filter_all, hit_all, filter);

auto& ht = _hash_join_builder->hash_table();
ht.remove_duplicate_index(&filter);
hash_table.remove_duplicate_index(&filter);
(*chunk)->filter(filter);

return Status::OK();
}

Status HashJoiner::_process_semi_join_with_other_conjunct(ChunkPtr* chunk) {
Status HashJoiner::_process_semi_join_with_other_conjunct(ChunkPtr* chunk, JoinHashTable& hash_table) {
bool filter_all = false;
bool hit_all = false;
Filter filter;

_calc_filter_for_other_conjunct(chunk, filter, filter_all, hit_all);
RETURN_IF_ERROR(_calc_filter_for_other_conjunct(chunk, filter, filter_all, hit_all));

auto& ht = _hash_join_builder->hash_table();
ht.remove_duplicate_index(&filter);
hash_table.remove_duplicate_index(&filter);

(*chunk)->filter(filter);

return Status::OK();
}

Status HashJoiner::_process_right_anti_join_with_other_conjunct(ChunkPtr* chunk) {
Status HashJoiner::_process_right_anti_join_with_other_conjunct(ChunkPtr* chunk, JoinHashTable& hash_table) {
bool filter_all = false;
bool hit_all = false;
Filter filter;

_calc_filter_for_other_conjunct(chunk, filter, filter_all, hit_all);
auto& ht = _hash_join_builder->hash_table();
ht.remove_duplicate_index(&filter);
RETURN_IF_ERROR(_calc_filter_for_other_conjunct(chunk, filter, filter_all, hit_all));
hash_table.remove_duplicate_index(&filter);

(*chunk)->set_num_rows(0);

return Status::OK();
}

Status HashJoiner::_process_other_conjunct(ChunkPtr* chunk) {
Status HashJoiner::_process_other_conjunct(ChunkPtr* chunk, JoinHashTable& hash_table) {
SCOPED_TIMER(probe_metrics().other_join_conjunct_evaluate_timer);
switch (_join_type) {
case TJoinOp::LEFT_OUTER_JOIN:
case TJoinOp::FULL_OUTER_JOIN:
return _process_outer_join_with_other_conjunct(chunk, _probe_column_count, _build_column_count);
return _process_outer_join_with_other_conjunct(chunk, _probe_column_count, _build_column_count, hash_table);
case TJoinOp::RIGHT_OUTER_JOIN:
case TJoinOp::LEFT_SEMI_JOIN:
case TJoinOp::LEFT_ANTI_JOIN:
case TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN:
case TJoinOp::RIGHT_SEMI_JOIN:
return _process_semi_join_with_other_conjunct(chunk);
return _process_semi_join_with_other_conjunct(chunk, hash_table);
case TJoinOp::RIGHT_ANTI_JOIN:
return _process_right_anti_join_with_other_conjunct(chunk);
return _process_right_anti_join_with_other_conjunct(chunk, hash_table);
default:
// the other join conjunct for inner join will be convert to other predicate
// so can't reach here
Expand Down
13 changes: 7 additions & 6 deletions be/src/exec/hash_joiner.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,13 +273,13 @@ class HashJoiner final : public pipeline::ContextWithDependency {
HashJoinProber* new_prober(ObjectPool* pool) { return _hash_join_prober->clone_empty(pool); }
HashJoinBuilder* new_builder(ObjectPool* pool) { return _hash_join_builder->clone_empty(pool); }

Status filter_probe_output_chunk(ChunkPtr& chunk) {
Status filter_probe_output_chunk(ChunkPtr& chunk, JoinHashTable& hash_table) {
// Probe in JoinHashMap is divided into probe with other_conjuncts and without other_conjuncts.
// Probe without other_conjuncts directly labels the hash table as hit, while _process_other_conjunct()
// only remains the rows which are not hit the hash table before. Therefore, _process_other_conjunct can
// not be called when other_conjuncts is empty.
if (chunk && !chunk->is_empty() && !_other_join_conjunct_ctxs.empty()) {
RETURN_IF_ERROR(_process_other_conjunct(&chunk));
RETURN_IF_ERROR(_process_other_conjunct(&chunk, hash_table));
}

// TODO(satanson): _conjunct_ctxs shouldn't include local runtime in-filters.
Expand Down Expand Up @@ -365,10 +365,11 @@ class HashJoiner final : public pipeline::ContextWithDependency {
static void _process_row_for_other_conjunct(ChunkPtr* chunk, size_t start_column, size_t column_count,
bool filter_all, bool hit_all, const Filter& filter);

Status _process_outer_join_with_other_conjunct(ChunkPtr* chunk, size_t start_column, size_t column_count);
Status _process_semi_join_with_other_conjunct(ChunkPtr* chunk);
Status _process_right_anti_join_with_other_conjunct(ChunkPtr* chunk);
Status _process_other_conjunct(ChunkPtr* chunk);
Status _process_outer_join_with_other_conjunct(ChunkPtr* chunk, size_t start_column, size_t column_count,
JoinHashTable& hash_table);
Status _process_semi_join_with_other_conjunct(ChunkPtr* chunk, JoinHashTable& hash_table);
Status _process_right_anti_join_with_other_conjunct(ChunkPtr* chunk, JoinHashTable& hash_table);
Status _process_other_conjunct(ChunkPtr* chunk, JoinHashTable& hash_table);
Status _process_where_conjunct(ChunkPtr* chunk);

Status _create_runtime_in_filters(RuntimeState* state);
Expand Down
60 changes: 60 additions & 0 deletions test/sql/test_spill/R/test_spill_hash_join
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
-- name: test_spill_hash_join
set enable_spill=true;
-- result:
-- !result
set spill_mode="force";
-- result:
-- !result
set pipeline_dop=1;
-- result:
-- !result
create table t0 (
c0 INT,
c1 BIGINT
) DUPLICATE KEY(c0) DISTRIBUTED BY HASH(c0) BUCKETS 1 PROPERTIES('replication_num' = '1');
-- result:
-- !result
insert into t0 SELECT generate_series, 4096 - generate_series FROM TABLE(generate_series(1, 4096));
-- result:
-- !result
insert into t0 select * from t0;
-- result:
-- !result
create table t1 like t0;
-- result:
-- !result
insert into t1 SELECT generate_series, 4096 - generate_series FROM TABLE(generate_series(4096, 8192));
-- result:
-- !result
select count(l.c0), avg(l.c0), count(l.c1), count(l.c0), count(r.c1) from t0 l left join [broadcast] t1 r on l.c0 = r.c0 and l.c1 < r.c1;
-- result:
8192 2048.5 8192 8192 0
-- !result
select count(l.c0), avg(l.c0), count(l.c1) from t0 l left semi join [broadcast] t1 r on l.c0 = r.c0 and l.c1 < r.c1;
-- result:
0 None 0
-- !result
select count(l.c0), avg(l.c0), count(l.c1) from t0 l left semi join [broadcast] t1 r on l.c0 = r.c0 and l.c1 >= r.c1;
-- result:
2 4096.0 2
-- !result
select count(l.c0), avg(l.c0), count(l.c1) from t0 l left anti join [broadcast] t1 r on l.c0 = r.c0 and l.c1 >= r.c1;
-- result:
8190 2048.0 8190
-- !result
select count(r.c0), avg(r.c0), count(r.c1) from t0 l right semi join [bucket] t1 r on l.c0 = r.c0;
-- result:
1 4096.0 1
-- !result
select count(r.c0), avg(r.c0), count(r.c1) from t0 l right semi join [bucket] t1 r on l.c0 = r.c0 and l.c1 < r.c1;
-- result:
0 None 0
-- !result
select count(r.c0), avg(r.c0), count(r.c1) from t0 l right anti join [bucket] t1 r on l.c0 = r.c0 and l.c1 < r.c1;
-- result:
4097 6144.0 4097
-- !result
select count(l.c0), avg(l.c0), count(l.c1), count(l.c0), count(r.c1) from t0 l right join [bucket] t1 r on l.c0 = r.c0 and l.c1 < r.c1;
-- result:
0 None 0 0 4097
-- !result
22 changes: 22 additions & 0 deletions test/sql/test_spill/T/test_spill_hash_join
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- name: test_spill_hash_join
set enable_spill=true;
set spill_mode="force";
set pipeline_dop=1;
--
create table t0 (
c0 INT,
c1 BIGINT
) DUPLICATE KEY(c0) DISTRIBUTED BY HASH(c0) BUCKETS 1 PROPERTIES('replication_num' = '1');
insert into t0 SELECT generate_series, 4096 - generate_series FROM TABLE(generate_series(1, 4096));
insert into t0 select * from t0;
create table t1 like t0;
insert into t1 SELECT generate_series, 4096 - generate_series FROM TABLE(generate_series(4096, 8192));

select count(l.c0), avg(l.c0), count(l.c1), count(l.c0), count(r.c1) from t0 l left join [broadcast] t1 r on l.c0 = r.c0 and l.c1 < r.c1;
select count(l.c0), avg(l.c0), count(l.c1) from t0 l left semi join [broadcast] t1 r on l.c0 = r.c0 and l.c1 < r.c1;
select count(l.c0), avg(l.c0), count(l.c1) from t0 l left semi join [broadcast] t1 r on l.c0 = r.c0 and l.c1 >= r.c1;
select count(l.c0), avg(l.c0), count(l.c1) from t0 l left anti join [broadcast] t1 r on l.c0 = r.c0 and l.c1 >= r.c1;
select count(r.c0), avg(r.c0), count(r.c1) from t0 l right semi join [bucket] t1 r on l.c0 = r.c0;
select count(r.c0), avg(r.c0), count(r.c1) from t0 l right semi join [bucket] t1 r on l.c0 = r.c0 and l.c1 < r.c1;
select count(r.c0), avg(r.c0), count(r.c1) from t0 l right anti join [bucket] t1 r on l.c0 = r.c0 and l.c1 < r.c1;
select count(l.c0), avg(l.c0), count(l.c1), count(l.c0), count(r.c1) from t0 l right join [bucket] t1 r on l.c0 = r.c0 and l.c1 < r.c1;

0 comments on commit 03dfe31

Please sign in to comment.