Skip to content

Commit

Permalink
[fix](join)output all value from no-null side of outer join (#13655)
Browse files Browse the repository at this point in the history
* [fix](joinoutput all value from no-null side of outer join

* add regression test
  • Loading branch information
starocean999 authored Oct 27, 2022
1 parent 2697f72 commit c874931
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 8 deletions.
46 changes: 38 additions & 8 deletions be/src/vec/exec/join/vhash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,20 @@ Status ProcessHashTableProbe<JoinOpType, ignore_null>::do_process(HashTableType&
while (probe_index < probe_rows) {
if constexpr (ignore_null && need_null_map_for_probe) {
if ((*null_map)[probe_index]) {
_items_counts[probe_index++] = (uint32_t)0;
if constexpr (probe_all) {
_items_counts[probe_index++] = (uint32_t)1;
// only full outer / left outer need insert the data of right table
if (LIKELY(current_offset < _build_block_rows.size())) {
_build_block_offsets[current_offset] = -1;
_build_block_rows[current_offset] = -1;
} else {
_build_block_offsets.emplace_back(-1);
_build_block_rows.emplace_back(-1);
}
++current_offset;
} else {
_items_counts[probe_index++] = (uint32_t)0;
}
all_match_one = false;
continue;
}
Expand Down Expand Up @@ -472,6 +485,8 @@ Status ProcessHashTableProbe<JoinOpType, ignore_null>::do_process_with_other_joi
using KeyGetter = typename HashTableType::State;
using Mapped = typename HashTableType::Mapped;
if constexpr (std::is_same_v<Mapped, RowRefListWithFlags>) {
constexpr auto probe_all = JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN ||
JoinOpType::value == TJoinOp::FULL_OUTER_JOIN;
KeyGetter key_getter(probe_raw_ptrs, _join_node->_probe_key_sz, nullptr);

int right_col_idx = _join_node->_left_table_data_types.size();
Expand All @@ -494,7 +509,22 @@ Status ProcessHashTableProbe<JoinOpType, ignore_null>::do_process_with_other_joi
// ignore null rows
if constexpr (ignore_null && need_null_map_for_probe) {
if ((*null_map)[probe_index]) {
_items_counts[probe_index++] = (uint32_t)0;
if constexpr (probe_all) {
_items_counts[probe_index++] = (uint32_t)1;
same_to_prev.emplace_back(false);
visited_map.emplace_back(nullptr);
// only full outer / left outer need insert the data of right table
if (LIKELY(current_offset < _build_block_rows.size())) {
_build_block_offsets[current_offset] = -1;
_build_block_rows[current_offset] = -1;
} else {
_build_block_offsets.emplace_back(-1);
_build_block_rows.emplace_back(-1);
}
++current_offset;
} else {
_items_counts[probe_index++] = (uint32_t)0;
}
all_match_one = false;
continue;
}
Expand Down Expand Up @@ -1048,6 +1078,9 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo
std::vector<int> res_col_ids(probe_expr_ctxs_sz);
RETURN_IF_ERROR(_do_evaluate(_probe_block, _probe_expr_ctxs, *_probe_expr_call_timer,
res_col_ids));
if (_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) {
_probe_column_convert_to_null = _convert_block_to_null(_probe_block);
}
// TODO: Now we are not sure whether a column is nullable only by ExecNode's `row_desc`
// so we have to initialize this flag by the first probe block.
if (!_has_set_need_null_map_for_probe) {
Expand Down Expand Up @@ -1075,9 +1108,6 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo
_hash_table_variants);

RETURN_IF_ERROR(st);
if (_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) {
_probe_column_convert_to_null = _convert_block_to_null(_probe_block);
}
}
}

Expand Down Expand Up @@ -1397,6 +1427,9 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin
ColumnUInt8::MutablePtr null_map_val;
std::vector<int> res_col_ids(_build_expr_ctxs.size());
RETURN_IF_ERROR(_do_evaluate(block, _build_expr_ctxs, *_build_expr_call_timer, res_col_ids));
if (_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) {
_convert_block_to_null(block);
}
// TODO: Now we are not sure whether a column is nullable only by ExecNode's `row_desc`
// so we have to initialize this flag by the first build block.
if (!_has_set_need_null_map_for_build) {
Expand Down Expand Up @@ -1441,9 +1474,6 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin
make_bool_variant(_build_unique), make_bool_variant(has_runtime_filter),
make_bool_variant(_need_null_map_for_build));

if (_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) {
_convert_block_to_null(block);
}
return st;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select1 --
2

-- !select2 --
2

Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_outer_join_with_null_value") {
sql """
drop table if exists outer_table_a;
"""

sql """
drop table if exists outer_table_b;
"""

sql """
create table outer_table_a
(
PROJECT_ID VARCHAR(32) not null,
SO_NO VARCHAR(32) not null,
ORG_ID VARCHAR(32) not null
)ENGINE = OLAP
DUPLICATE KEY(PROJECT_ID)
DISTRIBUTED BY HASH(PROJECT_ID) BUCKETS 30
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""

sql """
create table outer_table_b
(
PROJECT_ID VARCHAR(32) not null,
SO_NO VARCHAR(32),
ORG_ID VARCHAR(32) not null
)ENGINE = OLAP
DUPLICATE KEY(PROJECT_ID)
DISTRIBUTED BY HASH(PROJECT_ID) BUCKETS 30
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""

sql """
insert into outer_table_a values('1','1','1');
"""

sql """
insert into outer_table_b values('1','1','1'),('1',null,'1');
"""

qt_select1 """
select
count(*)
FROM
outer_table_b WSA
LEFT JOIN outer_table_a WBWD ON WBWD.ORG_ID = WSA.ORG_ID
AND WBWD.PROJECT_ID = WSA.PROJECT_ID
AND WBWD.SO_NO = WSA.SO_NO;
"""

qt_select2 """
select
count(*)
FROM
outer_table_b WSA
LEFT JOIN outer_table_a WBWD ON WBWD.ORG_ID = WSA.ORG_ID
AND WBWD.PROJECT_ID = WSA.PROJECT_ID
AND WBWD.SO_NO = WSA.SO_NO
AND WBWD.SO_NO >= WSA.SO_NO;
"""

sql """
drop table if exists outer_table_a;
"""

sql """
drop table if exists outer_table_b;
"""
}

0 comments on commit c874931

Please sign in to comment.