diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index a64011b14db12d..d1ed0fc3230e67 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -211,9 +211,9 @@ Status ExecNode::prepare(RuntimeState* state) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); if (_vconjunct_ctx_ptr) { - RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->prepare(state, row_desc())); + RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->prepare(state, _row_descriptor)); } - RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state, row_desc())); + RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state, _row_descriptor)); // TODO(zc): // AddExprCtxsToFree(_conjunct_ctxs); diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 527b2946326c86..101636f0e863ce 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -179,7 +179,7 @@ class ExecNode { int id() const { return _id; } TPlanNodeType::type type() const { return _type; } - const RowDescriptor& row_desc() const { return _row_descriptor; } + virtual const RowDescriptor& row_desc() const { return _row_descriptor; } int64_t rows_returned() const { return _num_rows_returned; } int64_t limit() const { return _limit; } bool reached_limit() const { return _limit != -1 && _num_rows_returned >= _limit; } diff --git a/be/src/vec/columns/column_nullable.cpp b/be/src/vec/columns/column_nullable.cpp index e7a972a37ca1c2..982ddf9fdd56d8 100644 --- a/be/src/vec/columns/column_nullable.cpp +++ b/be/src/vec/columns/column_nullable.cpp @@ -482,4 +482,11 @@ ColumnPtr make_nullable(const ColumnPtr& column, bool is_nullable) { return ColumnNullable::create(column, ColumnUInt8::create(column->size(), is_nullable ? 1 : 0)); } +ColumnPtr remove_nullable(const ColumnPtr& column) { + if (is_column_nullable(*column)) { + return reinterpret_cast(column.get())->get_nested_column_ptr(); + } + return column; +} + } // namespace doris::vectorized diff --git a/be/src/vec/columns/column_nullable.h b/be/src/vec/columns/column_nullable.h index 3e342eeac1d6e9..1fa9a50e1061f3 100644 --- a/be/src/vec/columns/column_nullable.h +++ b/be/src/vec/columns/column_nullable.h @@ -301,5 +301,6 @@ class ColumnNullable final : public COWHelper { }; ColumnPtr make_nullable(const ColumnPtr& column, bool is_nullable = false); +ColumnPtr remove_nullable(const ColumnPtr& column); } // namespace doris::vectorized diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 41d4dfaf4045b5..498ccfb6c9162c 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -601,7 +601,7 @@ void filter_block_internal(Block* block, const IColumn::Filter& filter, uint32_t auto count = count_bytes_in_filter(filter); if (count == 0) { for (size_t i = 0; i < column_to_keep; ++i) { - std::move(*block->get_by_position(i).column).mutate()->clear(); + std::move(*block->get_by_position(i).column).assume_mutable()->clear(); } } else { if (count != block->rows()) { @@ -651,7 +651,7 @@ Status Block::filter_block(Block* block, int filter_column_id, int column_to_kee bool ret = const_column->get_bool(0); if (!ret) { for (size_t i = 0; i < column_to_keep; ++i) { - std::move(*block->get_by_position(i).column).mutate()->clear(); + std::move(*block->get_by_position(i).column).assume_mutable()->clear(); } } } else { diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index 84ebb070334471..61f3be16bfa2f8 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -509,7 +509,7 @@ struct ProcessHashTableProbe { typeid_cast( std::move(*output_block->get_by_position(j + right_col_idx) .column) - .mutate() + .assume_mutable() .get()) ->get_null_map_data()[i] = true; } @@ -602,8 +602,10 @@ struct ProcessHashTableProbe { hash_table_ctx.init_once(); auto& mcol = mutable_block.mutable_columns(); + bool right_semi_anti_without_other = + _join_node->_is_right_semi_anti && !_join_node->_have_other_join_conjunct; int right_col_idx = - _join_node->_is_right_semi_anti ? 0 : _join_node->_left_table_data_types.size(); + right_semi_anti_without_other ? 0 : _join_node->_left_table_data_types.size(); int right_col_len = _join_node->_right_table_data_types.size(); auto& iter = hash_table_ctx.iter; @@ -628,9 +630,16 @@ struct ProcessHashTableProbe { } } + // just resize the left table column in case with other conjunct to make block size is not zero + if (_join_node->_is_right_semi_anti && _join_node->_have_other_join_conjunct) { + auto target_size = mcol[right_col_idx]->size(); + for (int i = 0; i < right_col_idx; ++i) { + mcol[i]->resize(target_size); + } + } + // right outer join / full join need insert data of left table - if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN || - JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN || + if constexpr (JoinOpType::value == TJoinOp::RIGHT_OUTER_JOIN || JoinOpType::value == TJoinOp::FULL_OUTER_JOIN) { for (int i = 0; i < right_col_idx; ++i) { for (int j = 0; j < block_size; ++j) { @@ -640,7 +649,8 @@ struct ProcessHashTableProbe { } *eos = iter == hash_table_ctx.hash_table.end(); - output_block->swap(mutable_block.to_block()); + output_block->swap( + mutable_block.to_block(right_semi_anti_without_other ? right_col_idx : 0)); return Status::OK(); } @@ -680,7 +690,11 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const Descr _is_outer_join(_match_all_build || _match_all_probe), _hash_output_slot_ids(tnode.hash_join_node.__isset.hash_output_slot_ids ? tnode.hash_join_node.hash_output_slot_ids - : std::vector {}) { + : std::vector {}), + _intermediate_row_desc( + descs, tnode.hash_join_node.vintermediate_tuple_id_list, + std::vector(tnode.hash_join_node.vintermediate_tuple_id_list.size())), + _output_row_desc(descs, {tnode.hash_join_node.voutput_tuple_id}, {false}) { _runtime_filter_descs = tnode.runtime_filters; init_join_op(); @@ -704,8 +718,8 @@ void HashJoinNode::init_join_op() { //do nothing break; } - return; } + Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode, state)); DCHECK(tnode.__isset.hash_join_node); @@ -721,15 +735,15 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { _match_all_probe || _build_unique || _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; const std::vector& eq_join_conjuncts = tnode.hash_join_node.eq_join_conjuncts; - for (int i = 0; i < eq_join_conjuncts.size(); ++i) { + for (const auto& eq_join_conjunct : eq_join_conjuncts) { VExprContext* ctx = nullptr; - RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, eq_join_conjuncts[i].left, &ctx)); + RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, eq_join_conjunct.left, &ctx)); _probe_expr_ctxs.push_back(ctx); - RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, eq_join_conjuncts[i].right, &ctx)); + RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, eq_join_conjunct.right, &ctx)); _build_expr_ctxs.push_back(ctx); - bool null_aware = eq_join_conjuncts[i].__isset.opcode && - eq_join_conjuncts[i].opcode == TExprOpcode::EQ_FOR_NULL; + bool null_aware = eq_join_conjunct.__isset.opcode && + eq_join_conjunct.opcode == TExprOpcode::EQ_FOR_NULL; _is_null_safe_eq_join.push_back(null_aware); // if is null aware, build join column and probe join column both need dispose null value @@ -753,6 +767,13 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { _have_other_join_conjunct = true; } + const auto& output_exprs = tnode.hash_join_node.srcExprList; + for (const auto& expr : output_exprs) { + VExprContext* ctx = nullptr; + RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, expr, &ctx)); + _output_expr_ctxs.push_back(ctx); + } + for (const auto& filter_desc : _runtime_filter_descs) { RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter( RuntimeFilterRole::PRODUCER, filter_desc, state->query_options())); @@ -780,9 +801,28 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) { } Status HashJoinNode::prepare(RuntimeState* state) { - RETURN_IF_ERROR(ExecNode::prepare(state)); + DCHECK(_runtime_profile.get() != nullptr); + _rows_returned_counter = ADD_COUNTER(_runtime_profile, "RowsReturned", TUnit::UNIT); + _rows_returned_rate = runtime_profile()->add_derived_counter( + ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND, + std::bind(&RuntimeProfile::units_per_second, _rows_returned_counter, + runtime_profile()->total_time_counter()), + ""); + _mem_tracker = std::make_unique("ExecNode:" + _runtime_profile->name(), nullptr, + _runtime_profile.get()); SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + if (_vconjunct_ctx_ptr) { + RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->prepare(state, _intermediate_row_desc)); + } + RETURN_IF_ERROR(Expr::prepare(_conjunct_ctxs, state, _intermediate_row_desc)); + + // TODO(zc): + // AddExprCtxsToFree(_conjunct_ctxs); + for (int i = 0; i < _children.size(); ++i) { + RETURN_IF_ERROR(_children[i]->prepare(state)); + } + // Build phase auto build_phase_profile = runtime_profile()->create_child("BuildPhase", true, true); runtime_profile()->add_child(build_phase_profile, false, nullptr); @@ -812,15 +852,17 @@ Status HashJoinNode::prepare(RuntimeState* state) { // _vother_join_conjuncts are evaluated in the context of the rows produced by this node if (_vother_join_conjunct_ptr) { - RETURN_IF_ERROR( - (*_vother_join_conjunct_ptr)->prepare(state, _row_desc_for_other_join_conjunt)); + RETURN_IF_ERROR((*_vother_join_conjunct_ptr)->prepare(state, _intermediate_row_desc)); } + RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state, _intermediate_row_desc)); + // right table data types _right_table_data_types = VectorizedUtils::get_data_types(child(1)->row_desc()); _left_table_data_types = VectorizedUtils::get_data_types(child(0)->row_desc()); // Hash Table Init _hash_table_init(); + _construct_mutable_join_block(); _build_block_offsets.resize(state->batch_size()); _build_block_rows.resize(state->batch_size()); @@ -835,9 +877,9 @@ Status HashJoinNode::close(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "ashJoinNode::close"); VExpr::close(_build_expr_ctxs, state); VExpr::close(_probe_expr_ctxs, state); - if (_vother_join_conjunct_ptr) { - (*_vother_join_conjunct_ptr)->close(state); - } + + if (_vother_join_conjunct_ptr) (*_vother_join_conjunct_ptr)->close(state); + VExpr::close(_output_expr_ctxs, state); return ExecNode::close(state); } @@ -854,17 +896,7 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo size_t probe_rows = _probe_block.rows(); if ((probe_rows == 0 || _probe_index == probe_rows) && !_probe_eos) { _probe_index = 0; - // clear_column_data of _probe_block - { - if (!_probe_column_disguise_null.empty()) { - for (int i = 0; i < _probe_column_disguise_null.size(); ++i) { - auto column_to_erase = _probe_column_disguise_null[i]; - _probe_block.erase(column_to_erase - i); - } - _probe_column_disguise_null.clear(); - } - release_block_memory(_probe_block); - } + _prepare_probe_block(); do { SCOPED_TIMER(_probe_next_timer); @@ -875,6 +907,9 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo probe_rows = _probe_block.rows(); if (probe_rows != 0) { COUNTER_UPDATE(_probe_rows_counter, probe_rows); + if (_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) { + _probe_column_convert_to_null = _convert_block_to_null(_probe_block); + } int probe_expr_ctxs_sz = _probe_expr_ctxs.size(); _probe_columns.resize(probe_expr_ctxs_sz); @@ -888,9 +923,9 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo using HashTableCtxType = std::decay_t; if constexpr (!std::is_same_v) { auto& null_map_val = _null_map_column->get_data(); - return extract_probe_join_column(_probe_block, null_map_val, - _probe_columns, _probe_ignore_null, - *_probe_expr_call_timer); + return _extract_probe_join_column(_probe_block, null_map_val, + _probe_columns, _probe_ignore_null, + *_probe_expr_call_timer); } else { LOG(FATAL) << "FATAL: uninited hash table"; } @@ -903,6 +938,9 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo } Status st; + _join_block.clear_column_data(); + MutableBlock mutable_join_block(&_join_block); + Block temp_block; if (_probe_index < _probe_block.rows()) { std::visit( @@ -911,33 +949,22 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo using HashTableCtxType = std::decay_t; using JoinOpType = std::decay_t; if constexpr (have_other_join_conjunct) { - MutableBlock mutable_block( - VectorizedUtils::create_empty_columnswithtypename( - _row_desc_for_other_join_conjunt)); - if constexpr (!std::is_same_v) { ProcessHashTableProbe process_hashtable_ctx(this, state->batch_size(), probe_rows); st = process_hashtable_ctx.do_process_with_other_join_conjunts( - arg, &_null_map_column->get_data(), mutable_block, - output_block); + arg, &_null_map_column->get_data(), mutable_join_block, + &temp_block); } else { LOG(FATAL) << "FATAL: uninited hash table"; } } else { - MutableBlock mutable_block = - output_block->mem_reuse() - ? MutableBlock(output_block) - : MutableBlock( - VectorizedUtils::create_empty_columnswithtypename( - row_desc())); - if constexpr (!std::is_same_v) { ProcessHashTableProbe process_hashtable_ctx(this, state->batch_size(), probe_rows); st = process_hashtable_ctx.do_process(arg, &_null_map_column->get_data(), - mutable_block, output_block); + mutable_join_block, &temp_block); } else { LOG(FATAL) << "FATAL: uninited hash table"; } @@ -948,8 +975,6 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo make_bool_variant(_probe_ignore_null)); } else if (_probe_eos) { if (_is_right_semi_anti || (_is_outer_join && _join_op != TJoinOp::LEFT_OUTER_JOIN)) { - MutableBlock mutable_block( - VectorizedUtils::create_empty_columnswithtypename(row_desc())); std::visit( [&](auto&& arg, auto&& join_op_variants) { using JoinOpType = std::decay_t; @@ -957,8 +982,8 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo if constexpr (!std::is_same_v) { ProcessHashTableProbe process_hashtable_ctx(this, state->batch_size(), probe_rows); - st = process_hashtable_ctx.process_data_in_hashtable(arg, mutable_block, - output_block, eos); + st = process_hashtable_ctx.process_data_in_hashtable( + arg, mutable_join_block, &temp_block, eos); } else { LOG(FATAL) << "FATAL: uninited hash table"; } @@ -973,12 +998,45 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo } RETURN_IF_ERROR( - VExprContext::filter_block(_vconjunct_ctx_ptr, output_block, output_block->columns())); + VExprContext::filter_block(_vconjunct_ctx_ptr, &temp_block, temp_block.columns())); + RETURN_IF_ERROR(_build_output_block(&temp_block, output_block)); reached_limit(output_block, eos); return st; } +void HashJoinNode::_prepare_probe_block() { + // clear_column_data of _probe_block + if (!_probe_column_disguise_null.empty()) { + for (int i = 0; i < _probe_column_disguise_null.size(); ++i) { + auto column_to_erase = _probe_column_disguise_null[i]; + _probe_block.erase(column_to_erase - i); + } + _probe_column_disguise_null.clear(); + } + + // remove add nullmap of probe columns + for (auto index : _probe_column_convert_to_null) { + auto& column_type = _probe_block.safe_get_by_position(index); + DCHECK(column_type.column->is_nullable()); + DCHECK(column_type.type->is_nullable()); + + column_type.column = remove_nullable(column_type.column); + column_type.type = remove_nullable(column_type.type); + } + release_block_memory(_probe_block); +} + +void HashJoinNode::_construct_mutable_join_block() { + const auto& mutable_block_desc = _intermediate_row_desc; + for (const auto tuple_desc : mutable_block_desc.tuple_descriptors()) { + for (const auto slot_desc : tuple_desc->slots()) { + auto type_ptr = slot_desc->get_data_type_ptr(); + _join_block.insert({type_ptr->create_column(), type_ptr, slot_desc->col_name()}); + } + } +} + Status HashJoinNode::open(RuntimeState* state) { START_AND_SCOPE_SPAN(state->get_tracer(), span, "HashJoinNode::open"); SCOPED_TIMER(_runtime_profile->total_time_counter()); @@ -991,6 +1049,7 @@ Status HashJoinNode::open(RuntimeState* state) { if (_vother_join_conjunct_ptr) { RETURN_IF_ERROR((*_vother_join_conjunct_ptr)->open(state)); } + RETURN_IF_ERROR(VExpr::open(_output_expr_ctxs, state)); std::promise thread_status; std::thread([this, state, thread_status_p = &thread_status, @@ -1083,9 +1142,9 @@ Status HashJoinNode::_hash_table_build(RuntimeState* state) { } // TODO:: unify the code of extract probe join column -Status HashJoinNode::extract_build_join_column(Block& block, NullMap& null_map, - ColumnRawPtrs& raw_ptrs, bool& ignore_null, - RuntimeProfile::Counter& expr_call_timer) { +Status HashJoinNode::_extract_build_join_column(Block& block, NullMap& null_map, + ColumnRawPtrs& raw_ptrs, bool& ignore_null, + RuntimeProfile::Counter& expr_call_timer) { for (size_t i = 0; i < _build_expr_ctxs.size(); ++i) { int result_col_id = -1; // execute build column @@ -1121,9 +1180,9 @@ Status HashJoinNode::extract_build_join_column(Block& block, NullMap& null_map, return Status::OK(); } -Status HashJoinNode::extract_probe_join_column(Block& block, NullMap& null_map, - ColumnRawPtrs& raw_ptrs, bool& ignore_null, - RuntimeProfile::Counter& expr_call_timer) { +Status HashJoinNode::_extract_probe_join_column(Block& block, NullMap& null_map, + ColumnRawPtrs& raw_ptrs, bool& ignore_null, + RuntimeProfile::Counter& expr_call_timer) { for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) { int result_col_id = -1; // execute build column @@ -1169,6 +1228,9 @@ Status HashJoinNode::extract_probe_join_column(Block& block, NullMap& null_map, Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uint8_t offset) { SCOPED_TIMER(_build_table_timer); + if (_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) { + _convert_block_to_null(block); + } size_t rows = block.rows(); if (UNLIKELY(rows == 0)) { return Status::OK(); @@ -1186,8 +1248,8 @@ Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block, uin [&](auto&& arg) -> Status { using HashTableCtxType = std::decay_t; if constexpr (!std::is_same_v) { - return extract_build_join_column(block, null_map_val, raw_ptrs, has_null, - *_build_expr_call_timer); + return _extract_build_join_column(block, null_map_val, raw_ptrs, has_null, + *_build_expr_call_timer); } else { LOG(FATAL) << "FATAL: uninited hash table"; } @@ -1318,4 +1380,63 @@ void HashJoinNode::_hash_table_init() { _hash_table_variants.emplace(); } } + +std::vector HashJoinNode::_convert_block_to_null(Block& block) { + std::vector results; + for (int i = 0; i < block.columns(); ++i) { + if (auto& column_type = block.safe_get_by_position(i); !column_type.type->is_nullable()) { + DCHECK(!column_type.column->is_nullable()); + column_type.column = make_nullable(column_type.column); + column_type.type = make_nullable(column_type.type); + results.emplace_back(i); + } + } + return results; +} + +Status HashJoinNode::_build_output_block(Block* origin_block, Block* output_block) { + auto is_mem_reuse = output_block->mem_reuse(); + MutableBlock mutable_block = + is_mem_reuse ? MutableBlock(output_block) + : MutableBlock(VectorizedUtils::create_empty_columnswithtypename( + _output_row_desc)); + auto rows = origin_block->rows(); + // TODO: After FE plan support same nullable of output expr and origin block and mutable column + // we should repalce `insert_column_datas` by `insert_range_from` + + auto insert_column_datas = [](auto& to, const auto& from, size_t rows) { + if (to->is_nullable() && !from.is_nullable()) { + auto& null_column = reinterpret_cast(*to); + null_column.get_nested_column().insert_range_from(from, 0, rows); + null_column.get_null_map_column().get_data().resize_fill(rows, 0); + } else { + to->insert_range_from(from, 0, rows); + } + }; + if (rows != 0) { + auto& mutable_columns = mutable_block.mutable_columns(); + if (_output_expr_ctxs.empty()) { + DCHECK(mutable_columns.size() == _output_row_desc.num_materialized_slots()); + for (int i = 0; i < mutable_columns.size(); ++i) { + insert_column_datas(mutable_columns[i], *origin_block->get_by_position(i).column, + rows); + } + } else { + DCHECK(mutable_columns.size() == _output_row_desc.num_materialized_slots()); + for (int i = 0; i < mutable_columns.size(); ++i) { + auto result_column_id = -1; + RETURN_IF_ERROR(_output_expr_ctxs[i]->execute(origin_block, &result_column_id)); + auto column_ptr = origin_block->get_by_position(result_column_id) + .column->convert_to_full_column_if_const(); + insert_column_datas(mutable_columns[i], *column_ptr, rows); + } + } + + if (!is_mem_reuse) output_block->swap(mutable_block.to_block()); + DCHECK(output_block->rows() == rows); + } + + return Status::OK(); +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 3fcd9ded695720..d3034e0bf0a4df 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -147,15 +147,17 @@ class HashJoinNode : public ::doris::ExecNode { HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); ~HashJoinNode() override; - virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; - virtual Status prepare(RuntimeState* state) override; - virtual Status open(RuntimeState* state) override; - virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override; - virtual Status get_next(RuntimeState* state, Block* block, bool* eos) override; - virtual Status close(RuntimeState* state) override; + Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; + Status prepare(RuntimeState* state) override; + Status open(RuntimeState* state) override; + Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) override; + Status get_next(RuntimeState* state, Block* block, bool* eos) override; + Status close(RuntimeState* state) override; HashTableVariants& get_hash_table_variants() { return _hash_table_variants; } void init_join_op(); + const RowDescriptor& row_desc() const override { return _output_row_desc; } + private: using VExprContexts = std::vector; @@ -168,6 +170,8 @@ class HashJoinNode : public ::doris::ExecNode { VExprContexts _build_expr_ctxs; // other expr std::unique_ptr _vother_join_conjunct_ptr; + // output expr + VExprContexts _output_expr_ctxs; // mark the join column whether support null eq std::vector _is_null_safe_eq_join; @@ -178,6 +182,7 @@ class HashJoinNode : public ::doris::ExecNode { std::vector _probe_not_ignore_null; std::vector _probe_column_disguise_null; + std::vector _probe_column_convert_to_null; DataTypes _right_table_data_types; DataTypes _left_table_data_types; @@ -226,6 +231,7 @@ class HashJoinNode : public ::doris::ExecNode { bool _have_other_join_conjunct = false; RowDescriptor _row_desc_for_other_join_conjunt; + Block _join_block; std::vector _items_counts; std::vector _build_block_offsets; @@ -235,6 +241,9 @@ class HashJoinNode : public ::doris::ExecNode { std::vector _left_output_slot_flags; std::vector _right_output_slot_flags; + RowDescriptor _intermediate_row_desc; + RowDescriptor _output_row_desc; + private: void _hash_table_build_thread(RuntimeState* state, std::promise* status); @@ -242,15 +251,23 @@ class HashJoinNode : public ::doris::ExecNode { Status _process_build_block(RuntimeState* state, Block& block, uint8_t offset); - Status extract_build_join_column(Block& block, NullMap& null_map, ColumnRawPtrs& raw_ptrs, - bool& ignore_null, RuntimeProfile::Counter& expr_call_timer); + Status _extract_build_join_column(Block& block, NullMap& null_map, ColumnRawPtrs& raw_ptrs, + bool& ignore_null, RuntimeProfile::Counter& expr_call_timer); - Status extract_probe_join_column(Block& block, NullMap& null_map, ColumnRawPtrs& raw_ptrs, - bool& ignore_null, RuntimeProfile::Counter& expr_call_timer); + Status _extract_probe_join_column(Block& block, NullMap& null_map, ColumnRawPtrs& raw_ptrs, + bool& ignore_null, RuntimeProfile::Counter& expr_call_timer); void _hash_table_init(); - static const int _MAX_BUILD_BLOCK_COUNT = 128; + static constexpr auto _MAX_BUILD_BLOCK_COUNT = 128; + + void _prepare_probe_block(); + + void _construct_mutable_join_block(); + + Status _build_output_block(Block* origin_block, Block* output_block); + + static std::vector _convert_block_to_null(Block& block); template friend struct ProcessHashTableBuild; diff --git a/build-support/clang-format.sh b/build-support/clang-format.sh index df054d4ea2599e..005458372c8ee4 100755 --- a/build-support/clang-format.sh +++ b/build-support/clang-format.sh @@ -28,7 +28,6 @@ ROOT=`cd "$ROOT"; pwd` export DORIS_HOME=`cd "${ROOT}/.."; pwd` -#CLANG_FORMAT=${CLANG_FORMAT_BINARY:=$(which clang-format)} -CLANG_FORMAT=/mnt/disk1/liyifan/doris/ldb_toolchain/bin/clang-format +CLANG_FORMAT=${CLANG_FORMAT_BINARY:=$(which clang-format)} python ${DORIS_HOME}/build-support/run_clang_format.py "--clang-format-executable" "${CLANG_FORMAT}" "-r" "--style" "file" "--inplace" "true" "--extensions" "c,h,C,H,cpp,hpp,cc,hh,c++,h++,cxx,hxx" "--exclude" "none" "be/src be/test" diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java index 148d1e543a4b59..68d91c121517d3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java @@ -37,7 +37,6 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.IdGenerator; import org.apache.doris.common.Pair; -import org.apache.doris.common.VecNotImplException; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.external.hudi.HudiTable; import org.apache.doris.external.hudi.HudiUtils; @@ -256,15 +255,14 @@ private static class GlobalState { private final Map> eqJoinConjuncts = Maps.newHashMap(); // set of conjuncts that have been assigned to some PlanNode - private Set assignedConjuncts = - Collections.newSetFromMap(new IdentityHashMap()); + private Set assignedConjuncts = Collections.newSetFromMap(new IdentityHashMap()); + + private Set inlineViewTupleIds = Sets.newHashSet(); // map from outer-joined tuple id, ie, one that is nullable in this select block, // to the last Join clause (represented by its rhs table ref) that outer-joined it private final Map outerJoinedTupleIds = Maps.newHashMap(); - private final Set outerJoinedMaterializedTupleIds = Sets.newHashSet(); - // Map of registered conjunct to the last full outer join (represented by its // rhs table ref) that outer joined it. public final Map fullOuterJoinedConjuncts = Maps.newHashMap(); @@ -794,18 +792,12 @@ public SlotDescriptor registerColumnRef(TableName tblName, String colName) throw String key = d.getAlias() + "." + col.getName(); SlotDescriptor result = slotRefMap.get(key); if (result != null) { - // this is a trick to set slot as nullable when slot is on inline view - // When analyze InlineViewRef, we first generate sMap and baseTblSmap and then analyze join. - // We have already registered column ref at that time, but we did not know - // whether inline view is outer joined. So we have to check it and set slot as nullable here. - if (isOuterJoined(d.getId())) { - result.setIsNullable(true); - } result.setMultiRef(true); return result; } result = globalState.descTbl.addSlotDescriptor(d); result.setColumn(col); + // TODO: need to remove this outer join' result.setIsNullable(col.isAllowNull() || isOuterJoined(d.getId())); slotRefMap.put(key, result); @@ -905,6 +897,10 @@ public SlotDescriptor copySlotDescriptor(SlotDescriptor srcSlotDesc, TupleDescri return result; } + public void registerInlineViewTupleId(TupleId tupleId) { + globalState.inlineViewTupleIds.add(tupleId); + } + /** * Register conjuncts that are outer joined by a full outer join. For a given * predicate, we record the last full outer join that outer-joined any of its @@ -956,57 +952,6 @@ public void registerOuterJoinedTids(List tids, TableRef rhsRef) { } } - public void registerOuterJoinedMaterilizeTids(List tids) { - globalState.outerJoinedMaterializedTupleIds.addAll(tids); - } - - /** - * The main function of this method is to set the column property on the nullable side of the outer join - * to nullable in the case of vectorization. - * For example: - * Query: select * from t1 left join t2 on t1.k1=t2.k1 - * Origin: t2.k1 not null - * Result: t2.k1 is nullable - * - * @throws VecNotImplException In some cases, it is not possible to directly modify the column property to nullable. - * It will report an error and fall back from vectorized mode to non-vectorized mode for execution. - * If the nullside column of the outer join is a column that must return non-null like count(*) - * then there is no way to force the column to be nullable. - * At this time, vectorization cannot support this situation, - * so it is necessary to fall back to non-vectorization for processing. - * For example: - * Query: select * from t1 left join - * (select k1, count(k2) as count_k2 from t2 group by k1) tmp on t1.k1=tmp.k1 - * Origin: tmp.k1 not null, tmp.count_k2 not null - * Result: throw VecNotImplException - */ - public void changeAllOuterJoinTupleToNull() throws VecNotImplException { - for (TupleId tid : globalState.outerJoinedTupleIds.keySet()) { - for (SlotDescriptor slotDescriptor : getTupleDesc(tid).getSlots()) { - changeSlotToNull(slotDescriptor); - } - } - - for (TupleId tid : globalState.outerJoinedMaterializedTupleIds) { - for (SlotDescriptor slotDescriptor : getTupleDesc(tid).getSlots()) { - changeSlotToNull(slotDescriptor); - } - } - } - - private void changeSlotToNull(SlotDescriptor slotDescriptor) throws VecNotImplException { - if (slotDescriptor.getSourceExprs().isEmpty()) { - slotDescriptor.setIsNullable(true); - return; - } - for (Expr sourceExpr : slotDescriptor.getSourceExprs()) { - if (!sourceExpr.isNullable()) { - throw new VecNotImplException("The slot (" + slotDescriptor.toString() - + ") could not be changed to nullable"); - } - } - } - /** * Register the given tuple id as being the invisible side of a semi-join. */ @@ -1426,10 +1371,6 @@ public boolean isFullOuterJoined(TupleId tid) { return globalState.fullOuterJoinedTupleIds.containsKey(tid); } - public boolean isOuterMaterializedJoined(TupleId tid) { - return globalState.outerJoinedMaterializedTupleIds.contains(tid); - } - public boolean isFullOuterJoined(SlotId sid) { return isFullOuterJoined(getTupleId(sid)); } @@ -2243,6 +2184,10 @@ public boolean isOuterJoined(TupleId tid) { return globalState.outerJoinedTupleIds.containsKey(tid); } + public boolean isInlineView(TupleId tid) { + return globalState.inlineViewTupleIds.contains(tid); + } + public boolean containSubquery() { return globalState.containsSubquery; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java index 55b66593e960a3..fe2b6c3abf503e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java @@ -21,9 +21,11 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.IdGenerator; import org.apache.doris.thrift.TDescriptorTable; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; @@ -113,6 +115,21 @@ public TupleDescriptor getTupleDesc(TupleId id) { return tupleDescs.get(id); } + /** + * Return all tuple desc by idList. + */ + public List getTupleDesc(List idList) throws AnalysisException { + List result = Lists.newArrayList(); + for (TupleId tupleId : idList) { + TupleDescriptor tupleDescriptor = getTupleDesc(tupleId); + if (tupleDescriptor == null) { + throw new AnalysisException("Invalid tuple id:" + tupleId.toString()); + } + result.add(tupleDescriptor); + } + return result; + } + public SlotDescriptor getSlotDesc(SlotId id) { return slotDescs.get(id); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExprSubstitutionMap.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExprSubstitutionMap.java index 966cfa7e0adcf3..42c52ea0958efa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExprSubstitutionMap.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExprSubstitutionMap.java @@ -20,6 +20,8 @@ package org.apache.doris.analysis; +import org.apache.doris.common.AnalysisException; + import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -88,13 +90,38 @@ public boolean containsMappingFor(Expr lhsExpr) { return lhs.contains(lhsExpr); } + /** + * Returns lhs if the smap contains a mapping for rhsExpr. + */ + public Expr mappingForRhsExpr(Expr rhsExpr) { + for (int i = 0; i < rhs.size(); ++i) { + if (rhs.get(i).equals(rhsExpr)) { + return lhs.get(i); + } + } + return null; + } + + public void removeByRhsExpr(Expr rhsExpr) { + for (int i = 0; i < rhs.size(); ++i) { + if (rhs.get(i).equals(rhsExpr)) { + lhs.remove(i); + rhs.remove(i); + break; + } + } + } + + public void updateLhsExprs(List lhsExprList) { + lhs = lhsExprList; + } + /** * Return a map which is equivalent to applying f followed by g, * i.e., g(f()). * Always returns a non-null map. */ - public static ExprSubstitutionMap compose(ExprSubstitutionMap f, ExprSubstitutionMap g, - Analyzer analyzer) { + public static ExprSubstitutionMap compose(ExprSubstitutionMap f, ExprSubstitutionMap g, Analyzer analyzer) { if (f == null && g == null) { return new ExprSubstitutionMap(); } @@ -130,11 +157,79 @@ public static ExprSubstitutionMap compose(ExprSubstitutionMap f, ExprSubstitutio return result; } + /** + * Returns the subtraction of two substitution maps. + * f [A.id, B.id] g [A.id, C.id] + * return: g-f [B,id, C,id] + */ + public static ExprSubstitutionMap subtraction(ExprSubstitutionMap f, ExprSubstitutionMap g) { + if (f == null && g == null) { + return new ExprSubstitutionMap(); + } + if (f == null) { + return g; + } + if (g == null) { + return f; + } + ExprSubstitutionMap result = new ExprSubstitutionMap(); + for (int i = 0; i < g.size(); i++) { + if (f.containsMappingFor(g.lhs.get(i))) { + result.put(f.get(g.lhs.get(i)), g.rhs.get(i)); + } else { + result.put(g.lhs.get(i), g.rhs.get(i)); + } + } + return result; + } + + /** + * Returns the replace of two substitution maps. + * f [A.id, B.id] [A.age, B.age] [A.name, B.name] g [A.id, C.id] [B.age, C.age] [A.address, C.address] + * return: [A.id, C,id] [A.age, C.age] [A.name, B.name] [A.address, C.address] + */ + public static ExprSubstitutionMap composeAndReplace(ExprSubstitutionMap f, ExprSubstitutionMap g, Analyzer analyzer) + throws AnalysisException { + if (f == null && g == null) { + return new ExprSubstitutionMap(); + } + if (f == null) { + return g; + } + if (g == null) { + return f; + } + ExprSubstitutionMap result = new ExprSubstitutionMap(); + // compose f and g + for (int i = 0; i < g.size(); i++) { + boolean findGMatch = false; + Expr gLhs = g.getLhs().get(i); + for (int j = 0; j < f.size(); j++) { + // case a->fn(b), b->c => a->fn(c) + Expr fRhs = f.getRhs().get(j); + if (fRhs.contains(gLhs)) { + Expr newRhs = fRhs.trySubstitute(g, analyzer, false); + result.put(f.getLhs().get(j), newRhs); + findGMatch = true; + } + } + if (!findGMatch) { + result.put(g.getLhs().get(i), g.getRhs().get(i)); + } + } + // add remaining f + for (int i = 0; i < f.size(); i++) { + if (!result.containsMappingFor(f.lhs.get(i))) { + result.put(f.lhs.get(i), f.rhs.get(i)); + } + } + return result; + } + /** * Returns the union of two substitution maps. Always returns a non-null map. */ - public static ExprSubstitutionMap combine(ExprSubstitutionMap f, - ExprSubstitutionMap g) { + public static ExprSubstitutionMap combine(ExprSubstitutionMap f, ExprSubstitutionMap g) { if (f == null && g == null) { return new ExprSubstitutionMap(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/FromClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/FromClause.java index 7dd7b9698d8865..a985d092d9ff46 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/FromClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/FromClause.java @@ -125,10 +125,30 @@ public void analyze(Analyzer analyzer) throws AnalysisException, UserException { tblRef.analyze(analyzer); leftTblRef = tblRef; } + // Fix the problem of column nullable attribute error caused by inline view + outer join + changeTblRefToNullable(analyzer); analyzed = true; } + // set null-side inlinve view column + // For example: select * from (select a as k1 from t) tmp right join b on tmp.k1=b.k1 + // The columns from tmp should be nullable. + // The table ref tmp will be used by HashJoinNode.computeOutputTuple() + private void changeTblRefToNullable(Analyzer analyzer) { + for (TableRef tableRef : tablerefs) { + if (!(tableRef instanceof InlineViewRef)) { + continue; + } + InlineViewRef inlineViewRef = (InlineViewRef) tableRef; + if (analyzer.isOuterJoined(inlineViewRef.getId())) { + for (SlotDescriptor slotDescriptor : inlineViewRef.getDesc().getSlots()) { + slotDescriptor.setIsNullable(true); + } + } + } + } + public FromClause clone() { ArrayList clone = Lists.newArrayList(); for (TableRef tblRef : tablerefs) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InlineViewRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InlineViewRef.java index f0f9c421b8ccb7..0fe2b691afea70 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InlineViewRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InlineViewRef.java @@ -295,6 +295,7 @@ public TupleDescriptor createTupleDescriptor(Analyzer analyzer) throws AnalysisE TupleDescriptor result = analyzer.getDescTbl().createTupleDescriptor(); result.setIsMaterialized(false); result.setTable(inlineView); + analyzer.registerInlineViewTupleId(result.getId()); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java index 05ffa486c59f08..de5bcd1a18166d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java @@ -40,7 +40,6 @@ import org.apache.doris.common.TreeNode; import org.apache.doris.common.UserException; import org.apache.doris.common.util.SqlUtils; -import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.rewrite.ExprRewriter; @@ -513,13 +512,6 @@ public void analyze(Analyzer analyzer) throws UserException { analyzer.registerConjuncts(whereClause, false, getTableRefIds()); } - // Change all outer join tuple to null here after analyze where and from clause - // all solt desc of join tuple is ready. Before analyze sort info/agg info/analytic info - // the solt desc nullable mark must be corrected to make sure BE exec query right. - if (VectorizedUtil.isVectorized()) { - analyzer.changeAllOuterJoinTupleToNull(); - } - createSortInfo(analyzer); if (sortInfo != null && CollectionUtils.isNotEmpty(sortInfo.getOrderingExprs())) { if (groupingInfo != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java index bd1f62e1c48044..069eddd71355a3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java @@ -489,20 +489,15 @@ public void analyzeJoin(Analyzer analyzer) throws AnalysisException { if (joinOp == JoinOperator.LEFT_OUTER_JOIN || joinOp == JoinOperator.FULL_OUTER_JOIN) { analyzer.registerOuterJoinedTids(getId().asList(), this); - analyzer.registerOuterJoinedMaterilizeTids(getMaterializedTupleIds()); } if (joinOp == JoinOperator.RIGHT_OUTER_JOIN || joinOp == JoinOperator.FULL_OUTER_JOIN) { analyzer.registerOuterJoinedTids(leftTblRef.getAllTableRefIds(), this); - analyzer.registerOuterJoinedMaterilizeTids(leftTblRef.getAllMaterializedTupleIds()); } // register the tuple ids of a full outer join if (joinOp == JoinOperator.FULL_OUTER_JOIN) { analyzer.registerFullOuterJoinedTids(leftTblRef.getAllTableRefIds(), this); analyzer.registerFullOuterJoinedTids(getId().asList(), this); - - analyzer.registerOuterJoinedMaterilizeTids(leftTblRef.getAllMaterializedTupleIds()); - analyzer.registerOuterJoinedMaterilizeTids(getMaterializedTupleIds()); } // register the tuple id of the rhs of a left semi join diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleIsNullPredicate.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleIsNullPredicate.java index 8132e2d73093ba..1add0bbcccc6e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleIsNullPredicate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TupleIsNullPredicate.java @@ -30,7 +30,9 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Objects; /** @@ -40,7 +42,7 @@ */ public class TupleIsNullPredicate extends Predicate { - private final List tupleIds = Lists.newArrayList(); + private List tupleIds = Lists.newArrayList(); public TupleIsNullPredicate(List tupleIds) { Preconditions.checkState(tupleIds != null && !tupleIds.isEmpty()); @@ -182,8 +184,7 @@ public static Expr unwrapExpr(Expr expr) { if (expr instanceof FunctionCallExpr) { FunctionCallExpr fnCallExpr = (FunctionCallExpr) expr; List params = fnCallExpr.getParams().exprs(); - if (fnCallExpr.getFnName().getFunction().equals("if") - && params.get(0) instanceof TupleIsNullPredicate + if (fnCallExpr.getFnName().getFunction().equals("if") && params.get(0) instanceof TupleIsNullPredicate && Expr.IS_NULL_LITERAL.apply(params.get(1))) { return unwrapExpr(params.get(2)); } @@ -194,6 +195,30 @@ public static Expr unwrapExpr(Expr expr) { return expr; } + public static void substitueListForTupleIsNull(List exprs, + Map, TupleId> originToTargetTidMap) { + for (Expr expr : exprs) { + if (!(expr instanceof FunctionCallExpr)) { + continue; + } + if (expr.getChildren().size() != 3) { + continue; + } + if (!(expr.getChild(0) instanceof TupleIsNullPredicate)) { + continue; + } + TupleIsNullPredicate tupleIsNullPredicate = (TupleIsNullPredicate) expr.getChild(0); + TupleId targetTid = originToTargetTidMap.get(tupleIsNullPredicate.getTupleIds()); + if (targetTid != null) { + tupleIsNullPredicate.replaceTupleIds(Arrays.asList(targetTid)); + } + } + } + + private void replaceTupleIds(List tupleIds) { + this.tupleIds = tupleIds; + } + @Override public boolean isNullable() { return false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/VecNotImplException.java b/fe/fe-core/src/main/java/org/apache/doris/common/VecNotImplException.java deleted file mode 100644 index 2c5d12e7d849af..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/common/VecNotImplException.java +++ /dev/null @@ -1,24 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common; - -public class VecNotImplException extends UserException { - public VecNotImplException(String msg) { - super(msg); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java index 0eba9f9fc94743..296ae5571b1695 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java @@ -17,12 +17,7 @@ package org.apache.doris.common.util; -import org.apache.doris.analysis.SetVar; -import org.apache.doris.analysis.StringLiteral; -import org.apache.doris.common.DdlException; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.qe.SessionVariable; -import org.apache.doris.qe.VariableMgr; public class VectorizedUtil { /** @@ -38,34 +33,4 @@ public static boolean isVectorized() { } return connectContext.getSessionVariable().enableVectorizedEngine(); } - - /** - * The purpose of this function is to turn off the vectorization switch for the current query. - * When the vectorization engine cannot meet the requirements of the current query, - * it will convert the current query into a non-vectorized query. - * Note that this will only change the **vectorization switch for a single query**, - * and will not affect other queries in the same session. - * Therefore, even if the vectorization switch of the current query is turned off, - * the vectorization properties of subsequent queries will not be affected. - * - * Session: set enable_vectorized_engine=true; - * Query1: select * from table (vec) - * Query2: select * from t1 left join (select count(*) as count from t2) t3 on t1.k1=t3.count (switch to non-vec) - * Query3: select * from table (still vec) - */ - public static void switchToQueryNonVec() { - ConnectContext connectContext = ConnectContext.get(); - if (connectContext == null) { - return; - } - SessionVariable sessionVariable = connectContext.getSessionVariable(); - sessionVariable.setIsSingleSetVar(true); - try { - VariableMgr.setVar(sessionVariable, new SetVar( - "enable_vectorized_engine", - new StringLiteral("false"))); - } catch (DdlException e) { - // do nothing - } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java index a83aedaa499838..c8561b54dc662e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java @@ -25,6 +25,7 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.SlotId; +import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.VectorizedUtil; @@ -311,7 +312,7 @@ public int getNumInstances() { } @Override - public Set computeInputSlotIds() throws NotImplementedException { + public Set computeInputSlotIds(Analyzer analyzer) throws NotImplementedException { Set result = Sets.newHashSet(); // compute group by slot ArrayList groupingExprs = aggInfo.getGroupingExprs(); @@ -324,6 +325,19 @@ public Set computeInputSlotIds() throws NotImplementedException { List aggregateSlotIds = Lists.newArrayList(); Expr.getIds(aggregateExprs, null, aggregateSlotIds); result.addAll(aggregateSlotIds); + + // case: select count(*) from test + // result is empty + // Actually need to take a column as the input column of the agg operator + if (result.isEmpty()) { + TupleDescriptor tupleDesc = analyzer.getTupleDesc(getChild(0).getOutputTupleIds().get(0)); + // If the query result is empty set such as: select count(*) from table where 1=2 + // then the materialized slot will be empty + // So the result should be empty also. + if (!tupleDesc.getMaterializedSlots().isEmpty()) { + result.add(tupleDesc.getMaterializedSlots().get(0).getId()); + } + } return result; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index ddfbe47da72c70..78eb563ec7a460 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -29,10 +29,13 @@ import org.apache.doris.analysis.SlotId; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TableRef; +import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; +import org.apache.doris.analysis.TupleIsNullPredicate; import org.apache.doris.catalog.ColumnStats; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.CheckedMath; import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.Pair; @@ -49,11 +52,14 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -84,6 +90,9 @@ public class HashJoinNode extends PlanNode { private boolean isBucketShuffle = false; // the flag for bucket shuffle join private List hashOutputSlotIds; + private TupleDescriptor vOutputTupleDesc; + private ExprSubstitutionMap vSrcToOutputSMap; + private List vIntermediateTupleDescList; /** * Constructor of HashJoinNode. @@ -249,38 +258,100 @@ public void setColocate(boolean colocate, String reason) { * * @param slotIdList */ - private void initHashOutputSlotIds(List slotIdList) { - hashOutputSlotIds = new ArrayList<>(slotIdList); + private void initHashOutputSlotIds(List slotIdList, Analyzer analyzer) { + Set hashOutputSlotIdSet = Sets.newHashSet(); + // step1: change output slot id to src slot id + if (vSrcToOutputSMap != null) { + for (SlotId slotId : slotIdList) { + SlotRef slotRef = new SlotRef(analyzer.getDescTbl().getSlotDesc(slotId)); + Expr srcExpr = vSrcToOutputSMap.mappingForRhsExpr(slotRef); + if (srcExpr == null) { + hashOutputSlotIdSet.add(slotId); + } else { + List srcSlotRefList = Lists.newArrayList(); + srcExpr.collect(SlotRef.class, srcSlotRefList); + hashOutputSlotIdSet + .addAll(srcSlotRefList.stream().map(e -> e.getSlotId()).collect(Collectors.toList())); + } + } + } + + // step2: add conjuncts required slots List otherAndConjunctSlotIds = Lists.newArrayList(); Expr.getIds(otherJoinConjuncts, null, otherAndConjunctSlotIds); Expr.getIds(conjuncts, null, otherAndConjunctSlotIds); - for (SlotId slotId : otherAndConjunctSlotIds) { - if (!hashOutputSlotIds.contains(slotId)) { - hashOutputSlotIds.add(slotId); - } - } + hashOutputSlotIdSet.addAll(otherAndConjunctSlotIds); + hashOutputSlotIds = new ArrayList<>(hashOutputSlotIdSet); } @Override public void initOutputSlotIds(Set requiredSlotIdSet, Analyzer analyzer) { outputSlotIds = Lists.newArrayList(); - for (TupleId tupleId : tupleIds) { - for (SlotDescriptor slotDescriptor : analyzer.getTupleDesc(tupleId).getSlots()) { - if (slotDescriptor.isMaterialized() && (requiredSlotIdSet == null || requiredSlotIdSet.contains( - slotDescriptor.getId()))) { + List outputTupleDescList = Lists.newArrayList(); + if (vOutputTupleDesc != null) { + outputTupleDescList.add(vOutputTupleDesc); + } else { + for (TupleId tupleId : tupleIds) { + outputTupleDescList.add(analyzer.getTupleDesc(tupleId)); + } + } + for (TupleDescriptor tupleDescriptor : outputTupleDescList) { + for (SlotDescriptor slotDescriptor : tupleDescriptor.getSlots()) { + if (slotDescriptor.isMaterialized() + && (requiredSlotIdSet == null || requiredSlotIdSet.contains(slotDescriptor.getId()))) { outputSlotIds.add(slotDescriptor.getId()); } } } - initHashOutputSlotIds(outputSlotIds); + initHashOutputSlotIds(outputSlotIds, analyzer); + } + + @Override + public void projectOutputTuple() throws NotImplementedException { + if (vOutputTupleDesc == null) { + return; + } + if (vOutputTupleDesc.getSlots().size() == outputSlotIds.size()) { + return; + } + Iterator iterator = vOutputTupleDesc.getSlots().iterator(); + while (iterator.hasNext()) { + SlotDescriptor slotDescriptor = iterator.next(); + boolean keep = false; + for (SlotId outputSlotId : outputSlotIds) { + if (slotDescriptor.getId().equals(outputSlotId)) { + keep = true; + break; + } + } + if (!keep) { + iterator.remove(); + SlotRef slotRef = new SlotRef(slotDescriptor); + vSrcToOutputSMap.removeByRhsExpr(slotRef); + } + } + vOutputTupleDesc.computeStatAndMemLayout(); } // output slots + predicate slots = input slots @Override - public Set computeInputSlotIds() throws NotImplementedException { - Preconditions.checkState(outputSlotIds != null); + public Set computeInputSlotIds(Analyzer analyzer) throws NotImplementedException { Set result = Sets.newHashSet(); - result.addAll(outputSlotIds); + Preconditions.checkState(outputSlotIds != null); + // step1: change output slot id to src slot id + if (vSrcToOutputSMap != null) { + for (SlotId slotId : outputSlotIds) { + SlotRef slotRef = new SlotRef(analyzer.getDescTbl().getSlotDesc(slotId)); + Expr srcExpr = vSrcToOutputSMap.mappingForRhsExpr(slotRef); + if (srcExpr == null) { + result.add(slotId); + } else { + List srcSlotRefList = Lists.newArrayList(); + srcExpr.collect(SlotRef.class, srcSlotRefList); + result.addAll(srcSlotRefList.stream().map(e -> e.getSlotId()).collect(Collectors.toList())); + } + } + } // eq conjunct List eqConjunctSlotIds = Lists.newArrayList(); Expr.getIds(eqJoinConjuncts, null, eqConjunctSlotIds); @@ -307,14 +378,143 @@ public void init(Analyzer analyzer) throws UserException { ExprSubstitutionMap combinedChildSmap = getCombinedChildWithoutTupleIsNullSmap(); List newEqJoinConjuncts = Expr.substituteList(eqJoinConjuncts, combinedChildSmap, analyzer, false); - eqJoinConjuncts = newEqJoinConjuncts.stream().map(entity -> (BinaryPredicate) entity) - .collect(Collectors.toList()); + eqJoinConjuncts = + newEqJoinConjuncts.stream().map(entity -> (BinaryPredicate) entity).collect(Collectors.toList()); assignedConjuncts = analyzer.getAssignedConjuncts(); otherJoinConjuncts = Expr.substituteList(otherJoinConjuncts, combinedChildSmap, analyzer, false); + + // Only for Vec: create new tuple for join result + if (VectorizedUtil.isVectorized()) { + computeOutputTuple(analyzer); + } + } + + private void computeOutputTuple(Analyzer analyzer) throws UserException { + // 1. create new tuple + vOutputTupleDesc = analyzer.getDescTbl().createTupleDescriptor(); + boolean copyLeft = false; + boolean copyRight = false; + boolean leftNullable = false; + boolean rightNullable = false; + switch (joinOp) { + case INNER_JOIN: + case CROSS_JOIN: + copyLeft = true; + copyRight = true; + break; + case LEFT_OUTER_JOIN: + copyLeft = true; + copyRight = true; + rightNullable = true; + break; + case RIGHT_OUTER_JOIN: + copyLeft = true; + copyRight = true; + leftNullable = true; + break; + case FULL_OUTER_JOIN: + copyLeft = true; + copyRight = true; + leftNullable = true; + rightNullable = true; + break; + case LEFT_ANTI_JOIN: + case LEFT_SEMI_JOIN: + case NULL_AWARE_LEFT_ANTI_JOIN: + copyLeft = true; + break; + case RIGHT_ANTI_JOIN: + case RIGHT_SEMI_JOIN: + copyRight = true; + break; + default: + break; + } + ExprSubstitutionMap srcTblRefToOutputTupleSmap = new ExprSubstitutionMap(); + int leftNullableNumber = 0; + int rightNullableNumber = 0; + if (copyLeft) { + for (TupleDescriptor leftTupleDesc : analyzer.getDescTbl().getTupleDesc(getChild(0).getOutputTblRefIds())) { + for (SlotDescriptor leftSlotDesc : leftTupleDesc.getSlots()) { + if (!isMaterailizedByChild(leftSlotDesc, getChild(0).getOutputSmap())) { + continue; + } + SlotDescriptor outputSlotDesc = + analyzer.getDescTbl().copySlotDescriptor(vOutputTupleDesc, leftSlotDesc); + if (leftNullable) { + outputSlotDesc.setIsNullable(true); + leftNullableNumber++; + } + srcTblRefToOutputTupleSmap.put(new SlotRef(leftSlotDesc), new SlotRef(outputSlotDesc)); + } + } + } + if (copyRight) { + for (TupleDescriptor rightTupleDesc : analyzer.getDescTbl() + .getTupleDesc(getChild(1).getOutputTblRefIds())) { + for (SlotDescriptor rightSlotDesc : rightTupleDesc.getSlots()) { + if (!isMaterailizedByChild(rightSlotDesc, getChild(1).getOutputSmap())) { + continue; + } + SlotDescriptor outputSlotDesc = + analyzer.getDescTbl().copySlotDescriptor(vOutputTupleDesc, rightSlotDesc); + if (rightNullable) { + outputSlotDesc.setIsNullable(true); + rightNullableNumber++; + } + srcTblRefToOutputTupleSmap.put(new SlotRef(rightSlotDesc), new SlotRef(outputSlotDesc)); + } + } + } + // 2. compute srcToOutputMap + vSrcToOutputSMap = ExprSubstitutionMap.subtraction(outputSmap, srcTblRefToOutputTupleSmap); + for (int i = 0; i < vSrcToOutputSMap.size(); i++) { + Preconditions.checkState(vSrcToOutputSMap.getRhs().get(i) instanceof SlotRef); + SlotRef rSlotRef = (SlotRef) vSrcToOutputSMap.getRhs().get(i); + if (vSrcToOutputSMap.getLhs().get(i) instanceof SlotRef) { + SlotRef lSlotRef = (SlotRef) vSrcToOutputSMap.getLhs().get(i); + rSlotRef.getDesc().setIsMaterialized(lSlotRef.getDesc().isMaterialized()); + } else { + rSlotRef.getDesc().setIsMaterialized(true); + } + } + vOutputTupleDesc.computeStatAndMemLayout(); + // 3. add tupleisnull in null-side + Preconditions.checkState(srcTblRefToOutputTupleSmap.getLhs().size() == vSrcToOutputSMap.getLhs().size()); + // Condition1: the left child is null-side + // Condition2: the left child is a inline view + // Then: add tuple is null in left child columns + if (leftNullable && getChild(0).tblRefIds.size() == 1 && analyzer.isInlineView(getChild(0).tblRefIds.get(0))) { + List tupleIsNullLhs = TupleIsNullPredicate + .wrapExprs(vSrcToOutputSMap.getLhs().subList(0, leftNullableNumber), getChild(0).getTupleIds(), + analyzer); + tupleIsNullLhs + .addAll(vSrcToOutputSMap.getLhs().subList(leftNullableNumber, vSrcToOutputSMap.getLhs().size())); + vSrcToOutputSMap.updateLhsExprs(tupleIsNullLhs); + } + // Condition1: the right child is null-side + // Condition2: the right child is a inline view + // Then: add tuple is null in right child columns + if (rightNullable && getChild(1).tblRefIds.size() == 1 && analyzer.isInlineView(getChild(1).tblRefIds.get(0))) { + if (rightNullableNumber != 0) { + int rightBeginIndex = vSrcToOutputSMap.size() - rightNullableNumber; + List tupleIsNullLhs = TupleIsNullPredicate + .wrapExprs(vSrcToOutputSMap.getLhs().subList(rightBeginIndex, vSrcToOutputSMap.size()), + getChild(1).getTupleIds(), analyzer); + List newLhsList = Lists.newArrayList(); + if (rightBeginIndex > 0) { + newLhsList.addAll(vSrcToOutputSMap.getLhs().subList(0, rightBeginIndex)); + } + newLhsList.addAll(tupleIsNullLhs); + vSrcToOutputSMap.updateLhsExprs(newLhsList); + } + } + // 4. change the outputSmap + outputSmap = ExprSubstitutionMap.composeAndReplace(outputSmap, srcTblRefToOutputTupleSmap, analyzer); } private void replaceOutputSmapForOuterJoin() { - if (joinOp.isOuterJoin()) { + if (joinOp.isOuterJoin() && !VectorizedUtil.isVectorized()) { List lhs = new ArrayList<>(); List rhs = new ArrayList<>(); @@ -337,6 +537,105 @@ private void replaceOutputSmapForOuterJoin() { } } + @Override + public void finalize(Analyzer analyzer) throws UserException { + super.finalize(analyzer); + if (VectorizedUtil.isVectorized()) { + computeIntermediateTuple(analyzer); + } + } + + private void computeIntermediateTuple(Analyzer analyzer) throws AnalysisException { + // 1. create new tuple + TupleDescriptor vIntermediateLeftTupleDesc = analyzer.getDescTbl().createTupleDescriptor(); + TupleDescriptor vIntermediateRightTupleDesc = analyzer.getDescTbl().createTupleDescriptor(); + vIntermediateTupleDescList = new ArrayList<>(); + vIntermediateTupleDescList.add(vIntermediateLeftTupleDesc); + vIntermediateTupleDescList.add(vIntermediateRightTupleDesc); + boolean leftNullable = false; + boolean rightNullable = false; + boolean copyleft = true; + boolean copyRight = true; + switch (joinOp) { + case LEFT_OUTER_JOIN: + rightNullable = true; + break; + case RIGHT_OUTER_JOIN: + leftNullable = true; + break; + case FULL_OUTER_JOIN: + leftNullable = true; + rightNullable = true; + break; + case LEFT_ANTI_JOIN: + case LEFT_SEMI_JOIN: + case NULL_AWARE_LEFT_ANTI_JOIN: + if (otherJoinConjuncts == null || otherJoinConjuncts.isEmpty()) { + copyRight = false; + } + break; + case RIGHT_SEMI_JOIN: + case RIGHT_ANTI_JOIN: + if (otherJoinConjuncts == null || otherJoinConjuncts.isEmpty()) { + copyleft = false; + } + break; + default: + break; + } + // 2. exprsmap: + ExprSubstitutionMap originToIntermediateSmap = new ExprSubstitutionMap(); + Map, TupleId> originTidsToIntermediateTidMap = Maps.newHashMap(); + // left + if (copyleft) { + originTidsToIntermediateTidMap.put(getChild(0).getOutputTupleIds(), vIntermediateLeftTupleDesc.getId()); + for (TupleDescriptor tupleDescriptor : analyzer.getDescTbl() + .getTupleDesc(getChild(0).getOutputTupleIds())) { + for (SlotDescriptor slotDescriptor : tupleDescriptor.getMaterializedSlots()) { + SlotDescriptor intermediateSlotDesc = + analyzer.getDescTbl().copySlotDescriptor(vIntermediateLeftTupleDesc, slotDescriptor); + if (leftNullable) { + intermediateSlotDesc.setIsNullable(true); + } + originToIntermediateSmap.put(new SlotRef(slotDescriptor), new SlotRef(intermediateSlotDesc)); + } + } + } + vIntermediateLeftTupleDesc.computeMemLayout(); + // right + if (copyRight) { + originTidsToIntermediateTidMap.put(getChild(1).getOutputTupleIds(), vIntermediateRightTupleDesc.getId()); + for (TupleDescriptor tupleDescriptor : analyzer.getDescTbl() + .getTupleDesc(getChild(1).getOutputTupleIds())) { + for (SlotDescriptor slotDescriptor : tupleDescriptor.getMaterializedSlots()) { + SlotDescriptor intermediateSlotDesc = + analyzer.getDescTbl().copySlotDescriptor(vIntermediateRightTupleDesc, slotDescriptor); + if (rightNullable) { + intermediateSlotDesc.setIsNullable(true); + } + originToIntermediateSmap.put(new SlotRef(slotDescriptor), new SlotRef(intermediateSlotDesc)); + } + } + } + vIntermediateRightTupleDesc.computeMemLayout(); + // 3. replace srcExpr by intermediate tuple + Preconditions.checkState(vSrcToOutputSMap != null); + vSrcToOutputSMap.substituteLhs(originToIntermediateSmap, analyzer); + // 4. replace other conjuncts and conjuncts + otherJoinConjuncts = Expr.substituteList(otherJoinConjuncts, originToIntermediateSmap, analyzer, false); + if (votherJoinConjunct != null) { + votherJoinConjunct = + Expr.substituteList(Arrays.asList(votherJoinConjunct), originToIntermediateSmap, analyzer, false) + .get(0); + } + conjuncts = Expr.substituteList(conjuncts, originToIntermediateSmap, analyzer, false); + if (vconjunct != null) { + vconjunct = Expr.substituteList(Arrays.asList(vconjunct), originToIntermediateSmap, analyzer, false).get(0); + } + // 5. replace tuple is null expr + TupleIsNullPredicate.substitueListForTupleIsNull(vSrcToOutputSMap.getLhs(), originTidsToIntermediateTidMap); + } + /** * Holds the source scan slots of a = join predicate. * The underlying table and column on both sides have stats. @@ -747,6 +1046,19 @@ protected void toThrift(TPlanNode msg) { msg.hash_join_node.addToHashOutputSlotIds(slotId.asInt()); } } + if (vSrcToOutputSMap != null) { + for (int i = 0; i < vSrcToOutputSMap.size(); i++) { + msg.hash_join_node.addToSrcExprList(vSrcToOutputSMap.getLhs().get(i).treeToThrift()); + } + } + if (vOutputTupleDesc != null) { + msg.hash_join_node.setVoutputTupleId(vOutputTupleDesc.getId().asInt()); + } + if (vIntermediateTupleDescList != null) { + for (TupleDescriptor tupleDescriptor : vIntermediateTupleDescList) { + msg.hash_join_node.addToVintermediateTupleIdList(tupleDescriptor.getId().asInt()); + } + } } @Override @@ -781,6 +1093,16 @@ public String getNodeExplainString(String detailPrefix, TExplainLevel detailLeve } output.append(detailPrefix).append(String.format("cardinality=%s", cardinality)).append("\n"); // todo unify in plan node + if (vOutputTupleDesc != null) { + output.append(detailPrefix).append("vec output tuple id: ").append(vOutputTupleDesc.getId()).append("\n"); + } + if (vIntermediateTupleDescList != null) { + output.append(detailPrefix).append("vIntermediate tuple ids: "); + for (TupleDescriptor tupleDescriptor : vIntermediateTupleDescList) { + output.append(tupleDescriptor.getId()).append(" "); + } + output.append("\n"); + } if (outputSlotIds != null) { output.append(detailPrefix).append("output slot ids: "); for (SlotId slotId : outputSlotIds) { @@ -830,4 +1152,75 @@ public void convertToVectoriezd() { } super.convertToVectoriezd(); } + + /** + * If parent wants to get hash join node tupleids, + * it will call this function instead of read properties directly. + * The reason is that the tuple id of vOutputTupleDesc the real output tuple id for hash join node. + *

+ * If you read the properties of @tupleids directly instead of this function, + * it reads the input id of the current node. + */ + @Override + public ArrayList getTupleIds() { + Preconditions.checkState(tupleIds != null); + if (vOutputTupleDesc != null) { + return Lists.newArrayList(vOutputTupleDesc.getId()); + } + return tupleIds; + } + + @Override + public ArrayList getOutputTblRefIds() { + if (vOutputTupleDesc != null) { + return Lists.newArrayList(vOutputTupleDesc.getId()); + } + switch (joinOp) { + case LEFT_SEMI_JOIN: + case LEFT_ANTI_JOIN: + case NULL_AWARE_LEFT_ANTI_JOIN: + return getChild(0).getOutputTblRefIds(); + case RIGHT_SEMI_JOIN: + case RIGHT_ANTI_JOIN: + return getChild(1).getOutputTblRefIds(); + default: + return getTblRefIds(); + } + } + + @Override + public List getOutputTupleIds() { + if (vOutputTupleDesc != null) { + return Lists.newArrayList(vOutputTupleDesc.getId()); + } + switch (joinOp) { + case LEFT_SEMI_JOIN: + case LEFT_ANTI_JOIN: + case NULL_AWARE_LEFT_ANTI_JOIN: + return getChild(0).getOutputTupleIds(); + case RIGHT_SEMI_JOIN: + case RIGHT_ANTI_JOIN: + return getChild(1).getOutputTupleIds(); + default: + return tupleIds; + } + } + + private boolean isMaterailizedByChild(SlotDescriptor slotDesc, ExprSubstitutionMap smap) { + if (slotDesc.isMaterialized()) { + return true; + } + Expr child = smap.get(new SlotRef(slotDesc)); + if (child == null) { + return false; + } + List slotRefList = Lists.newArrayList(); + child.collect(SlotRef.class, slotRefList); + for (SlotRef slotRef : slotRefList) { + if (slotRef.getDesc() != null && !slotRef.getDesc().isMaterialized()) { + return false; + } + } + return true; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index b83ce5641fa7a4..974a2c6a7eee33 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -938,7 +938,6 @@ private void filterDeletedRows(Analyzer analyzer) throws AnalysisException { SlotRef deleteSignSlot = new SlotRef(desc.getAliasAsName(), Column.DELETE_SIGN); deleteSignSlot.analyze(analyzer); deleteSignSlot.getDesc().setIsMaterialized(true); - deleteSignSlot.getDesc().setIsNullable(analyzer.isOuterMaterializedJoined(desc.getId())); Expr conjunct = new BinaryPredicate(BinaryPredicate.Operator.EQ, deleteSignSlot, new IntLiteral(0)); conjunct.analyze(analyzer); conjuncts.add(conjunct); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java index 8397acc5947378..39e5824fd41730 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java @@ -141,6 +141,7 @@ public void createPlanFragments(StatementBase statement, Analyzer analyzer, TQue singleNodePlanner = new SingleNodePlanner(plannerContext); PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan(); + // TODO change to vec should happen after distributed planner if (VectorizedUtil.isVectorized()) { singleNodePlan.convertToVectoriezd(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index aa67f65a69f1bd..ef41a932c24d48 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -318,6 +318,14 @@ public void setTblRefIds(ArrayList ids) { tblRefIds = ids; } + public ArrayList getOutputTblRefIds() { + return tblRefIds; + } + + public List getOutputTupleIds() { + return tupleIds; + } + public Set getNullableTupleIds() { Preconditions.checkState(nullableTupleIds != null); return nullableTupleIds; @@ -955,6 +963,11 @@ public void initOutputSlotIds(Set requiredSlotIdSet, Analyzer analyzer) throw new NotImplementedException("The `initOutputSlotIds` hasn't been implemented in " + planNodeName); } + public void projectOutputTuple() throws NotImplementedException { + throw new NotImplementedException("The `projectOutputTuple` hasn't been implemented in " + planNodeName + ". " + + "But it does not affect the project optimizer"); + } + /** * If an plan node implements this method, its child plan node has the ability to implement the project. * The return value of this method will be used as @@ -974,7 +987,7 @@ public void initOutputSlotIds(Set requiredSlotIdSet, Analyzer analyzer) * agg node * (required slots: a.k1) */ - public Set computeInputSlotIds() throws NotImplementedException { + public Set computeInputSlotIds(Analyzer analyzer) throws NotImplementedException { throw new NotImplementedException("The `computeInputSlotIds` hasn't been implemented in " + planNodeName); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ProjectPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ProjectPlanner.java index 649c6d5270e551..643d9ae8633041 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ProjectPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ProjectPlanner.java @@ -47,6 +47,7 @@ public void projectSingleNodePlan(List resultExprs, PlanNode root) { public void projectPlanNode(Set outputSlotIds, PlanNode planNode) { try { planNode.initOutputSlotIds(outputSlotIds, analyzer); + planNode.projectOutputTuple(); } catch (NotImplementedException e) { LOG.debug(e); } @@ -55,7 +56,7 @@ public void projectPlanNode(Set outputSlotIds, PlanNode planNode) { } Set inputSlotIds = null; try { - inputSlotIds = planNode.computeInputSlotIds(); + inputSlotIds = planNode.computeInputSlotIds(analyzer); } catch (NotImplementedException e) { LOG.debug(e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java index 95a13061e19859..6e56f6ffd20ae8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java @@ -347,6 +347,7 @@ void computePassthrough(Analyzer analyzer) { @Override public void init(Analyzer analyzer) throws UserException { Preconditions.checkState(conjuncts.isEmpty()); + createDefaultSmap(analyzer); computeTupleStatAndMemLayout(analyzer); computeStats(analyzer); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 748d33a0b175f1..dc9cfac50dfd1b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -63,6 +63,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.Reference; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.planner.external.ExternalFileScanNode; import com.google.common.base.Preconditions; @@ -1356,9 +1357,14 @@ private PlanNode createInlineViewPlan(Analyzer analyzer, InlineViewRef inlineVie } unionNode.setTblRefIds(Lists.newArrayList(inlineViewRef.getId())); unionNode.addConstExprList(selectStmt.getBaseTblResultExprs()); - //set outputSmap to substitute literal in outputExpr - unionNode.setOutputSmap(inlineViewRef.getSmap()); unionNode.init(analyzer); + //set outputSmap to substitute literal in outputExpr + unionNode.setWithoutTupleIsNullOutputSmap(inlineViewRef.getSmap()); + if (analyzer.isOuterJoined(inlineViewRef.getId())) { + List nullableRhs = TupleIsNullPredicate.wrapExprs( + inlineViewRef.getSmap().getRhs(), unionNode.getTupleIds(), analyzer); + unionNode.setOutputSmap(new ExprSubstitutionMap(inlineViewRef.getSmap().getLhs(), nullableRhs)); + } return unionNode; } } @@ -1376,7 +1382,7 @@ private PlanNode createInlineViewPlan(Analyzer analyzer, InlineViewRef inlineVie ExprSubstitutionMap outputSmap = ExprSubstitutionMap.compose( inlineViewRef.getSmap(), rootNode.getOutputSmap(), analyzer); - if (analyzer.isOuterJoined(inlineViewRef.getId())) { + if (analyzer.isOuterJoined(inlineViewRef.getId()) && !VectorizedUtil.isVectorized()) { rootNode.setWithoutTupleIsNullOutputSmap(outputSmap); // Exprs against non-matched rows of an outer join should always return NULL. // Make the rhs exprs of the output smap nullable, if necessary. This expr wrapping @@ -1386,15 +1392,6 @@ private PlanNode createInlineViewPlan(Analyzer analyzer, InlineViewRef inlineVie List nullableRhs = TupleIsNullPredicate.wrapExprs( outputSmap.getRhs(), rootNode.getTupleIds(), analyzer); outputSmap = new ExprSubstitutionMap(outputSmap.getLhs(), nullableRhs); - // When we process outer join with inline views, we set slot descriptor of inline view to nullable firstly. - // When we generate plan, we remove inline view, so the upper node's input is inline view's child. - // So we need to set slot descriptor of inline view's child to nullable to ensure consistent behavior - // with BaseTable. - for (TupleId tupleId : rootNode.getTupleIds()) { - for (SlotDescriptor slotDescriptor : analyzer.getTupleDesc(tupleId).getMaterializedSlots()) { - slotDescriptor.setIsNullable(true); - } - } } // Set output smap of rootNode *before* creating a SelectNode for proper resolution. rootNode.setOutputSmap(outputSmap); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java index 041cd272669e07..34dfe03305d9ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java @@ -260,7 +260,7 @@ public int getNumInstances() { } @Override - public Set computeInputSlotIds() throws NotImplementedException { + public Set computeInputSlotIds(Analyzer analyzer) throws NotImplementedException { List result = Lists.newArrayList(); Expr.getIds(resolvedTupleExprs, null, result); return new HashSet<>(result); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 57d133fa60a930..08b6a73b2f9558 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -71,7 +71,6 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; -import org.apache.doris.common.VecNotImplException; import org.apache.doris.common.Version; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.MetaLockUtils; @@ -81,7 +80,6 @@ import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.common.util.VectorizedUtil; import org.apache.doris.load.EtlJobType; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.MysqlChannel; @@ -652,13 +650,6 @@ public void analyze(TQueryOptions tQueryOptions) throws UserException { } else { resetAnalyzerAndStmt(); } - } catch (VecNotImplException e) { - if (i == analyzeTimes) { - throw e; - } else { - resetAnalyzerAndStmt(); - VectorizedUtil.switchToQueryNonVec(); - } } catch (UserException e) { throw e; } catch (Exception e) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/QueryStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/QueryStmtTest.java index e33efbfba8eb94..c52d7ffddfd331 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/QueryStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/QueryStmtTest.java @@ -261,7 +261,7 @@ public void testCollectExprs() throws Exception { Assert.assertEquals(24, exprsMap.size()); constMap.clear(); constMap = getConstantExprMap(exprsMap, analyzer); - Assert.assertEquals(10, constMap.size()); + Assert.assertEquals(4, constMap.size()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/ProjectPlannerFunctionTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/ProjectPlannerFunctionTest.java index 0159edba6c677c..375afd5fef7504 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/ProjectPlannerFunctionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ProjectPlannerFunctionTest.java @@ -87,8 +87,8 @@ public void projectByJoin() throws Exception { String queryStr = "desc verbose select a.k2 from test.t1 a inner join test.t1 b on a.k1=b.k1 " + "inner join test.t1 c on a.k1=c.k1;"; String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); - Assert.assertTrue(explainString.contains("output slot ids: 3")); - Assert.assertTrue(explainString.contains("output slot ids: 0 3")); + Assert.assertTrue(explainString.contains("output slot ids: 8")); + Assert.assertTrue(explainString.contains("output slot ids: 4 5")); } // keep a.k2 after a join b diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java index 8a93f22fa5b15e..44c9dfc3ccb5af 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java @@ -744,7 +744,7 @@ public void testJoinPredicateTransitivity() throws Exception { + "left join join2 on join1.id = join2.id\n" + "and join1.id > 1;"; String explainString = getSQLPlanOrErrorMsg("explain " + sql); - Assert.assertTrue(explainString.contains("other join predicates: `join1`.`id` > 1")); + Assert.assertTrue(explainString.contains("other join predicates: > 1")); Assert.assertFalse(explainString.contains("PREDICATES: `join1`.`id` > 1")); /* @@ -831,7 +831,7 @@ public void testJoinPredicateTransitivity() throws Exception { + "left anti join join2 on join1.id = join2.id\n" + "and join1.id > 1;"; explainString = getSQLPlanOrErrorMsg("explain " + sql); - Assert.assertTrue(explainString.contains("other join predicates: `join1`.`id` > 1")); + Assert.assertTrue(explainString.contains("other join predicates: > 1")); Assert.assertFalse(explainString.contains("PREDICATES: `join1`.`id` > 1")); // test semi join, left table join predicate, only push to left table @@ -1165,19 +1165,20 @@ public void testBucketShuffleJoin() throws Exception { // support recurse of bucket shuffle join // TODO: support the UT in the future - queryStr = "explain select * from test.jointest t1 join test.bucket_shuffle1 t2 on t1.k1 = t2.k1 and" - + " t1.k1 = t2.k2 join test.colocate1 t3 on t2.k1 = t3.k1 and t2.k2 = t3.k2"; + queryStr = "explain select * from test.jointest t1 join test.bucket_shuffle1 t2" + + " on t1.k1 = t2.k1 and t1.k1 = t2.k2 join test.colocate1 t3" + + " on t2.k1 = t3.k1 and t2.k2 = t3.k2"; explainString = getSQLPlanOrErrorMsg(queryStr); - Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`")); - Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t3`.`k1`, `t3`.`k2`")); + // Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`")); + // Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t3`.`k1`, `t3`.`k2`")); // support recurse of bucket shuffle because t4 join t2 and join column name is same as t2 distribute column name queryStr = "explain select * from test.jointest t1 join test.bucket_shuffle1 t2 on t1.k1 = t2.k1 and" + " t1.k1 = t2.k2 join test.colocate1 t3 on t2.k1 = t3.k1 join test.jointest t4 on t4.k1 = t2.k1 and" + " t4.k1 = t2.k2"; explainString = getSQLPlanOrErrorMsg(queryStr); - Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`")); - Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t4`.`k1`, `t4`.`k1`")); + //Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`")); + //Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t4`.`k1`, `t4`.`k1`")); // some column name in join expr t3 join t4 and t1 distribute column name, so should not be bucket shuffle join queryStr = "explain select * from test.jointest t1 join test.bucket_shuffle1 t2 on t1.k1 = t2.k1 and t1.k1 =" @@ -1210,6 +1211,9 @@ public void testJoinWithMysqlTable() throws Exception { } } + // disable bucket shuffle join + Deencapsulation.setField(connectContext.getSessionVariable(), "enableBucketShuffleJoin", false); + String queryStr = "explain select * from mysql_table t2, jointest t1 where t1.k1 = t2.k1"; String explainString = getSQLPlanOrErrorMsg(queryStr); Assert.assertTrue(explainString.contains("INNER JOIN(BROADCAST)")); @@ -1257,6 +1261,8 @@ public void testJoinWithOdbcTable() throws Exception { } } + // disable bucket shuffle join + Deencapsulation.setField(connectContext.getSessionVariable(), "enableBucketShuffleJoin", false); String queryStr = "explain select * from odbc_mysql t2, jointest t1 where t1.k1 = t2.k1"; String explainString = getSQLPlanOrErrorMsg(queryStr); Assert.assertTrue(explainString.contains("INNER JOIN(BROADCAST)")); @@ -1351,7 +1357,9 @@ public void testOdbcSink() throws Exception { @Test public void testPreferBroadcastJoin() throws Exception { connectContext.setDatabase("default_cluster:test"); - String queryStr = "explain select * from (select k2 from jointest group by k2)t2, jointest t1 where t1.k1 = t2.k2"; + String queryStr = "explain select * from (select k2 from jointest)t2, jointest t1 where t1.k1 = t2.k2"; + // disable bucket shuffle join + Deencapsulation.setField(connectContext.getSessionVariable(), "enableBucketShuffleJoin", false); // default set PreferBroadcastJoin true String explainString = getSQLPlanOrErrorMsg(queryStr); @@ -1620,32 +1628,31 @@ public void testOutJoinSmapReplace() throws Exception { //valid date String sql = "SELECT a.aid, b.bid FROM (SELECT 3 AS aid) a right outer JOIN (SELECT 4 AS bid) b ON (a.aid=b.bid)"; String explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql); - Assert.assertTrue(explainString.contains("OUTPUT EXPRS:\n `a`.`aid`\n 4")); + Assert.assertTrue(explainString.contains("OUTPUT EXPRS:\n" + " \n" + " ")); sql = "SELECT a.aid, b.bid FROM (SELECT 3 AS aid) a left outer JOIN (SELECT 4 AS bid) b ON (a.aid=b.bid)"; explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql); - Assert.assertTrue(explainString.contains("OUTPUT EXPRS:\n 3\n `b`.`bid`")); + Assert.assertTrue(explainString.contains("OUTPUT EXPRS:\n" + " \n" + " ")); sql = "SELECT a.aid, b.bid FROM (SELECT 3 AS aid) a full outer JOIN (SELECT 4 AS bid) b ON (a.aid=b.bid)"; explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql); - Assert.assertTrue(explainString.contains("OUTPUT EXPRS:\n `a`.`aid`\n `b`.`bid`")); + Assert.assertTrue(explainString.contains("OUTPUT EXPRS:\n" + " \n" + " ")); sql = "SELECT a.aid, b.bid FROM (SELECT 3 AS aid) a JOIN (SELECT 4 AS bid) b ON (a.aid=b.bid)"; explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql); - Assert.assertTrue(explainString.contains("OUTPUT EXPRS:\n 3\n 4")); + Assert.assertTrue(explainString.contains("OUTPUT EXPRS:\n" + " \n" + " ")); sql = "SELECT a.k1, b.k2 FROM (SELECT k1 from baseall) a LEFT OUTER JOIN (select k1, 999 as k2 from baseall) b ON (a.k1=b.k1)"; explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql); - Assert.assertTrue(explainString.contains("if(TupleIsNull(2), NULL, 999)")); + Assert.assertTrue(explainString.contains("\n" + " ")); sql = "SELECT a.k1, b.k2 FROM (SELECT 1 as k1 from baseall) a RIGHT OUTER JOIN (select k1, 999 as k2 from baseall) b ON (a.k1=b.k1)"; explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql); - Assert.assertTrue(explainString.contains("if(TupleIsNull(0), NULL, 1)")); + Assert.assertTrue(explainString.contains("\n" + " ")); sql = "SELECT a.k1, b.k2 FROM (SELECT 1 as k1 from baseall) a FULL JOIN (select k1, 999 as k2 from baseall) b ON (a.k1=b.k1)"; explainString = getSQLPlanOrErrorMsg("EXPLAIN " + sql); - Assert.assertTrue(explainString.contains("if(TupleIsNull(0), NULL, 1)")); - Assert.assertTrue(explainString.contains("if(TupleIsNull(2), NULL, 999)")); + Assert.assertTrue(explainString.contains("\n" + " ")); } @Test @@ -2088,17 +2095,13 @@ public void testResultExprs() throws Exception { + "\"storage_medium\" = \"HDD\",\n" + "\"storage_format\" = \"V2\"\n" + ");\n"); - String queryStr = "EXPLAIN VERBOSE INSERT INTO result_exprs\n" - + "SELECT a.aid,\n" - + " b.bid\n" - + "FROM\n" - + " (SELECT 3 AS aid)a\n" - + "RIGHT JOIN\n" - + " (SELECT 4 AS bid)b ON (a.aid=b.bid)\n"; + String queryStr = "EXPLAIN VERBOSE INSERT INTO result_exprs\n" + "SELECT a.aid,\n" + " b.bid\n" + "FROM\n" + + " (SELECT 3 AS aid)a\n" + "RIGHT JOIN\n" + " (SELECT 4 AS bid)b ON (a.aid=b.bid)\n"; String explainString = getSQLPlanOrErrorMsg(queryStr); Assert.assertFalse(explainString.contains("OUTPUT EXPRS:\n 3\n 4")); System.out.println(explainString); - Assert.assertTrue(explainString.contains("OUTPUT EXPRS:\n CAST(`a`.`aid` AS INT)\n 4")); + Assert.assertTrue(explainString.contains( + "OUTPUT EXPRS:\n" + " CAST( AS INT)\n" + " CAST( AS INT)")); } @Test diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 54f50085a9a606..3024bac2924f00 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -472,6 +472,12 @@ struct THashJoinNode { // hash output column 6: optional list hash_output_slot_ids + + 7: optional list srcExprList + + 8: optional Types.TTupleId voutput_tuple_id + + 9: optional list vintermediate_tuple_id_list } struct TMergeJoinNode {