Skip to content

Commit

Permalink
[Bug](join) corner case cause the mark join + null aware left join co…
Browse files Browse the repository at this point in the history
…re dump in regression test in pipeline query engine (apache#25087)
  • Loading branch information
HappenLee committed Oct 13, 2023
1 parent 1728f59 commit 04e3270
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 36 deletions.
65 changes: 33 additions & 32 deletions be/src/vec/exec/join/vhash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ Status HashJoinNode::close(RuntimeState* state) {

bool HashJoinNode::need_more_input_data() const {
return (_probe_block.rows() == 0 || _probe_index == _probe_block.rows()) && !_probe_eos &&
(!_short_circuit_for_probe || _is_mark_join);
!_short_circuit_for_probe;
}

void HashJoinNode::prepare_for_next() {
Expand All @@ -530,45 +530,46 @@ void HashJoinNode::prepare_for_next() {
Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) {
SCOPED_TIMER(_probe_timer);
if (_short_circuit_for_probe) {
/// If `_short_circuit_for_probe` is true, this indicates no rows
// If we use a short-circuit strategy, should return empty block directly.
*eos = true;
return Status::OK();
}

if (_short_circuit_for_null_in_probe_side && _is_mark_join) {
/// If `_short_circuit_for_null_in_probe_side` is true, this indicates no rows
/// match the join condition, and this is 'mark join', so we need to create a column as mark
/// with all rows set to 0.
if (_is_mark_join) {
auto block_rows = _probe_block.rows();
if (block_rows == 0) {
*eos = _probe_eos;
return Status::OK();
}

Block temp_block;
//get probe side output column
for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
if (_left_output_slot_flags[i]) {
temp_block.insert(_probe_block.get_by_position(i));
}
}
auto mark_column = ColumnUInt8::create(block_rows, 0);
temp_block.insert({std::move(mark_column), std::make_shared<DataTypeUInt8>(), ""});
auto block_rows = _probe_block.rows();
if (block_rows == 0) {
*eos = _probe_eos;
return Status::OK();
}

{
SCOPED_TIMER(_join_filter_timer);
RETURN_IF_ERROR(
VExprContext::filter_block(_conjuncts, &temp_block, temp_block.columns()));
Block temp_block;
//get probe side output column
for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
if (_left_output_slot_flags[i]) {
temp_block.insert(_probe_block.get_by_position(i));
}
}
auto mark_column = ColumnUInt8::create(block_rows, 0);
temp_block.insert({std::move(mark_column), std::make_shared<DataTypeUInt8>(), ""});

RETURN_IF_ERROR(_build_output_block(&temp_block, output_block, false));
temp_block.clear();
release_block_memory(_probe_block);
reached_limit(output_block, eos);
return Status::OK();
{
SCOPED_TIMER(_join_filter_timer);
RETURN_IF_ERROR(
VExprContext::filter_block(_conjuncts, &temp_block, temp_block.columns()));
}
// If we use a short-circuit strategy, should return empty block directly.
*eos = true;

RETURN_IF_ERROR(_build_output_block(&temp_block, output_block, false));
temp_block.clear();
release_block_memory(_probe_block);
reached_limit(output_block, eos);
return Status::OK();
}

//TODO: this short circuit maybe could refactor, no need to check at here.
if (_short_circuit_for_probe_and_additional_data) {
if (_empty_right_table_need_probe_dispose) {
// when build table rows is 0 and not have other_join_conjunct and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
// we could get the result is probe table + null-column(if need output)
// If we use a short-circuit strategy, should return block directly by add additional null data.
Expand Down Expand Up @@ -753,8 +754,8 @@ Status HashJoinNode::push(RuntimeState* /*state*/, vectorized::Block* input_bloc
Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eos) {
SCOPED_TIMER(_runtime_profile->total_time_counter());

if (_short_circuit_for_probe && !_is_mark_join) {
// If we use a short-circuit strategy, should return empty block directly.
// If we use a short-circuit strategy, should return empty block directly.
if (_short_circuit_for_probe) {
*eos = true;
return Status::OK();
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/join/vhash_join_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ class HashJoinNode final : public VJoinNodeBase {
void _init_short_circuit_for_probe() override {
_short_circuit_for_probe =
(_short_circuit_for_null_in_probe_side &&
_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) ||
_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_mark_join) ||
(_build_blocks->empty() && _join_op == TJoinOp::INNER_JOIN && !_is_mark_join) ||
(_build_blocks->empty() && _join_op == TJoinOp::LEFT_SEMI_JOIN && !_is_mark_join) ||
(_build_blocks->empty() && _join_op == TJoinOp::RIGHT_OUTER_JOIN) ||
Expand All @@ -274,7 +274,7 @@ class HashJoinNode final : public VJoinNodeBase {

//when build table rows is 0 and not have other_join_conjunct and not _is_mark_join and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
//we could get the result is probe table + null-column(if need output)
_short_circuit_for_probe_and_additional_data =
_empty_right_table_need_probe_dispose =
(_build_blocks->empty() && !_have_other_join_conjunct && !_is_mark_join) &&
(_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN ||
_join_op == TJoinOp::LEFT_ANTI_JOIN);
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/join/vjoin_node_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class VJoinNodeBase : public ExecNode {

virtual void _init_short_circuit_for_probe() {
_short_circuit_for_probe = false;
_short_circuit_for_probe_and_additional_data = false;
_empty_right_table_need_probe_dispose = false;
}

TJoinOp::type _join_op;
Expand Down Expand Up @@ -128,7 +128,7 @@ class VJoinNodeBase : public ExecNode {
bool _short_circuit_for_probe = false;

// for some join, when build side rows is empty, we could return directly by add some additional null data in probe table.
bool _short_circuit_for_probe_and_additional_data = false;
bool _empty_right_table_need_probe_dispose = false;
std::unique_ptr<RowDescriptor> _output_row_desc;
std::unique_ptr<RowDescriptor> _intermediate_row_desc;
// output expr
Expand Down

0 comments on commit 04e3270

Please sign in to comment.