Skip to content

Commit

Permalink
[refactor](nereids)unify outputTupleDesc and projection be part (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange authored Mar 22, 2024
1 parent 6b8e329 commit 9911303
Show file tree
Hide file tree
Showing 15 changed files with 144 additions and 31 deletions.
26 changes: 19 additions & 7 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,24 @@ std::string ExecNode::get_name() {
Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) {
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_projection_timer);
auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) {
if (to->is_nullable() && !from->is_nullable()) {
if (_keep_origin || !from->is_exclusive()) {
auto& null_column = reinterpret_cast<vectorized::ColumnNullable&>(*to);
null_column.get_nested_column().insert_range_from(*from, 0, rows);
null_column.get_null_map_column().get_data().resize_fill(rows, 0);
} else {
to = make_nullable(from, false)->assume_mutable();
}
} else {
if (_keep_origin || !from->is_exclusive()) {
to->insert_range_from(*from, 0, rows);
} else {
to = from->assume_mutable();
}
}
};

using namespace vectorized;
MutableBlock mutable_block =
VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_output_row_descriptor);
Expand All @@ -535,13 +553,7 @@ Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Blo
auto column_ptr = origin_block->get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
//TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it
if (mutable_columns[i]->is_nullable() xor column_ptr->is_nullable()) {
DCHECK(mutable_columns[i]->is_nullable() && !column_ptr->is_nullable());
reinterpret_cast<ColumnNullable*>(mutable_columns[i].get())
->insert_range_from_not_nullable(*column_ptr, 0, rows);
} else {
mutable_columns[i]->insert_range_from(*column_ptr, 0, rows);
}
insert_column_datas(mutable_columns[i], column_ptr, rows);
}
DCHECK(mutable_block.rows() == rows);
output_block->set_columns(std::move(mutable_columns));
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ class ExecNode {

std::shared_ptr<QueryStatistics> _query_statistics = nullptr;

//_keep_origin is used to avoid copying during projection,
// currently set to true only in the nestloop join.
bool _keep_origin = false;

private:
static Status create_tree_helper(RuntimeState* state, ObjectPool* pool,
const std::vector<TPlanNode>& tnodes,
Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ void HashJoinProbeLocalState::init_for_probe(RuntimeState* state) {

void HashJoinProbeLocalState::add_tuple_is_null_column(vectorized::Block* block) {
DCHECK(_parent->cast<HashJoinProbeOperatorX>()._is_outer_join);
if (!_parent->cast<HashJoinProbeOperatorX>()._use_specific_projections) {
return;
}
auto p0 = _tuple_is_null_left_flag_column->assume_mutable();
auto p1 = _tuple_is_null_right_flag_column->assume_mutable();
auto& left_null_map = reinterpret_cast<vectorized::ColumnUInt8&>(*p0);
Expand Down
31 changes: 26 additions & 5 deletions be/src/pipeline/exec/join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ template <typename SharedStateArg, typename Derived>
Status JoinProbeLocalState<SharedStateArg, Derived>::_build_output_block(
vectorized::Block* origin_block, vectorized::Block* output_block, bool keep_origin) {
auto& p = Base::_parent->template cast<typename Derived::Parent>();
if (!Base::_projections.empty()) {
// In previous versions, the join node had a separate set of project structures,
// and you could see a 'todo' in the Thrift definition.
// Here, we have refactored it, but considering upgrade compatibility, we still need to retain the old code.
*output_block = *origin_block;
return Status::OK();
}
SCOPED_TIMER(_build_output_block_timer);
auto is_mem_reuse = output_block->mem_reuse();
vectorized::MutableBlock mutable_block =
Expand Down Expand Up @@ -192,19 +199,33 @@ JoinProbeOperatorX<LocalStateType>::JoinProbeOperatorX(ObjectPool* pool, const T
: tnode.hash_join_node.__isset.is_mark ? tnode.hash_join_node.is_mark
: false),
_short_circuit_for_null_in_build_side(_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
!_is_mark_join) {
!_is_mark_join),
_use_specific_projections(
tnode.__isset.hash_join_node
? (tnode.hash_join_node.__isset.use_specific_projections
? tnode.hash_join_node.use_specific_projections
: true)
: (tnode.nested_loop_join_node.__isset.use_specific_projections
? tnode.nested_loop_join_node.use_specific_projections
: true)

) {
if (tnode.__isset.hash_join_node) {
_intermediate_row_desc.reset(new RowDescriptor(
descs, tnode.hash_join_node.vintermediate_tuple_id_list,
std::vector<bool>(tnode.hash_join_node.vintermediate_tuple_id_list.size())));
_output_row_desc.reset(
new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false}));
if (!Base::_output_row_descriptor) {
_output_row_desc.reset(
new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false}));
}
} else if (tnode.__isset.nested_loop_join_node) {
_intermediate_row_desc.reset(new RowDescriptor(
descs, tnode.nested_loop_join_node.vintermediate_tuple_id_list,
std::vector<bool>(tnode.nested_loop_join_node.vintermediate_tuple_id_list.size())));
_output_row_desc.reset(
new RowDescriptor(descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false}));
if (!Base::_output_row_descriptor) {
_output_row_desc.reset(new RowDescriptor(
descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false}));
}
} else {
// Iff BE has been upgraded and FE has not yet, we should keep origin logics for CROSS JOIN.
DCHECK_EQ(_join_op, TJoinOp::CROSS_JOIN);
Expand Down
12 changes: 11 additions & 1 deletion be/src/pipeline/exec/join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ class JoinProbeOperatorX : public StatefulOperatorX<LocalStateType> {
Status init(const TPlanNode& tnode, RuntimeState* state) override;

Status open(doris::RuntimeState* state) override;
[[nodiscard]] const RowDescriptor& row_desc() const override { return *_output_row_desc; }
[[nodiscard]] const RowDescriptor& row_desc() const override {
if (Base::_output_row_descriptor) {
return *Base::_output_row_descriptor;
}
return *_output_row_desc;
}

[[nodiscard]] const RowDescriptor& intermediate_row_desc() const override {
return *_intermediate_row_desc;
Expand Down Expand Up @@ -114,6 +119,11 @@ class JoinProbeOperatorX : public StatefulOperatorX<LocalStateType> {
vectorized::VExprContextSPtrs _output_expr_ctxs;
OperatorXPtr _build_side_child = nullptr;
const bool _short_circuit_for_null_in_build_side;
// In the Old planner, there is a plan for two columns of tuple is null,
// but in the Nereids planner, this logic does not exist.
// Therefore, we should not insert these two columns under the Nereids optimizer.
// use_specific_projections true, if output exprssions is denoted by srcExprList represents, o.w. PlanNode.projections
const bool _use_specific_projections;
};

} // namespace pipeline
Expand Down
7 changes: 6 additions & 1 deletion be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ void NestedLoopJoinProbeLocalState::_reset_with_next_probe_row() {

void NestedLoopJoinProbeLocalState::add_tuple_is_null_column(vectorized::Block* block) {
auto& p = _parent->cast<NestedLoopJoinProbeOperatorX>();
if (!p._use_specific_projections) {
return;
}
if (p._is_outer_join) {
auto p0 = _tuple_is_null_left_flag_column->assume_mutable();
auto p1 = _tuple_is_null_right_flag_column->assume_mutable();
Expand Down Expand Up @@ -436,7 +439,9 @@ NestedLoopJoinProbeOperatorX::NestedLoopJoinProbeOperatorX(ObjectPool* pool, con
: JoinProbeOperatorX<NestedLoopJoinProbeLocalState>(pool, tnode, operator_id, descs),
_is_output_left_side_only(tnode.nested_loop_join_node.__isset.is_output_left_side_only &&
tnode.nested_loop_join_node.is_output_left_side_only),
_old_version_flag(!tnode.__isset.nested_loop_join_node) {}
_old_version_flag(!tnode.__isset.nested_loop_join_node) {
_keep_origin = _is_output_left_side_only;
}

Status NestedLoopJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(JoinProbeOperatorX<NestedLoopJoinProbeLocalState>::init(tnode, state));
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/nested_loop_join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ class NestedLoopJoinProbeOperatorX final
const RowDescriptor& row_desc() const override {
return _old_version_flag
? (_output_row_descriptor ? *_output_row_descriptor : _row_descriptor)
: *_output_row_desc;
: (_output_row_descriptor ? *_output_row_descriptor : *_output_row_desc);
}

bool need_more_input_data(RuntimeState* state) const override;
Expand Down
30 changes: 21 additions & 9 deletions be/src/pipeline/pipeline_x/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,28 @@ void PipelineXLocalStateBase::clear_origin_block() {

Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* origin_block,
vectorized::Block* output_block) const {
auto local_state = state->get_local_state(operator_id());
auto* local_state = state->get_local_state(operator_id());
SCOPED_TIMER(local_state->exec_time_counter());
SCOPED_TIMER(local_state->_projection_timer);

auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) {
if (to->is_nullable() && !from->is_nullable()) {
if (_keep_origin || !from->is_exclusive()) {
auto& null_column = reinterpret_cast<vectorized::ColumnNullable&>(*to);
null_column.get_nested_column().insert_range_from(*from, 0, rows);
null_column.get_null_map_column().get_data().resize_fill(rows, 0);
} else {
to = make_nullable(from, false)->assume_mutable();
}
} else {
if (_keep_origin || !from->is_exclusive()) {
to->insert_range_from(*from, 0, rows);
} else {
to = from->assume_mutable();
}
}
};

using namespace vectorized;
vectorized::MutableBlock mutable_block =
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
Expand All @@ -189,14 +208,7 @@ Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* ori
RETURN_IF_ERROR(local_state->_projections[i]->execute(origin_block, &result_column_id));
auto column_ptr = origin_block->get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
//TODO: this is a quick fix, we need a new function like "change_to_nullable" to do it
if (mutable_columns[i]->is_nullable() xor column_ptr->is_nullable()) {
DCHECK(mutable_columns[i]->is_nullable() && !column_ptr->is_nullable());
reinterpret_cast<ColumnNullable*>(mutable_columns[i].get())
->insert_range_from_not_nullable(*column_ptr, 0, rows);
} else {
mutable_columns[i]->insert_range_from(*column_ptr, 0, rows);
}
insert_column_datas(mutable_columns[i], column_ptr, rows);
}
DCHECK(mutable_block.rows() == rows);
output_block->set_columns(std::move(mutable_columns));
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/pipeline_x/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ class OperatorXBase : public OperatorBase {
std::string _op_name;
bool _ignore_data_distribution = false;
int _parallel_tasks = 0;

//_keep_origin is used to avoid copying during projection,
// currently set to true only in the nestloop join.
bool _keep_origin = false;
};

template <typename LocalStateType>
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/exec/join/vhash_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,9 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo

void HashJoinNode::_add_tuple_is_null_column(Block* block) {
DCHECK(_is_outer_join);
if (!_use_specific_projections) {
return;
}
auto p0 = _tuple_is_null_left_flag_column->assume_mutable();
auto p1 = _tuple_is_null_right_flag_column->assume_mutable();
auto& left_null_map = reinterpret_cast<ColumnUInt8&>(*p0);
Expand Down
31 changes: 26 additions & 5 deletions be/src/vec/exec/join/vjoin_node_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,17 @@ VJoinNodeBase::VJoinNodeBase(ObjectPool* pool, const TPlanNode& tnode, const Des
tnode.hash_join_node.is_mark),
_short_circuit_for_null_in_build_side(_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
!_is_mark_join),
_runtime_filter_descs(tnode.runtime_filters) {
_runtime_filter_descs(tnode.runtime_filters),
_use_specific_projections(
tnode.__isset.hash_join_node
? (tnode.hash_join_node.__isset.use_specific_projections
? tnode.hash_join_node.use_specific_projections
: true)
: (tnode.nested_loop_join_node.__isset.use_specific_projections
? tnode.nested_loop_join_node.use_specific_projections
: true)

) {
_runtime_filters.resize(_runtime_filter_descs.size());
_init_join_op();
if (_is_mark_join) {
Expand All @@ -95,14 +105,18 @@ VJoinNodeBase::VJoinNodeBase(ObjectPool* pool, const TPlanNode& tnode, const Des
}

if (tnode.__isset.hash_join_node) {
_output_row_desc.reset(
new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false}));
if (!_output_row_descriptor) {
_output_row_desc.reset(
new RowDescriptor(descs, {tnode.hash_join_node.voutput_tuple_id}, {false}));
}
_intermediate_row_desc.reset(new RowDescriptor(
descs, tnode.hash_join_node.vintermediate_tuple_id_list,
std::vector<bool>(tnode.hash_join_node.vintermediate_tuple_id_list.size())));
} else if (tnode.__isset.nested_loop_join_node) {
_output_row_desc.reset(
new RowDescriptor(descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false}));
if (!_output_row_descriptor) {
_output_row_desc.reset(new RowDescriptor(
descs, {tnode.nested_loop_join_node.voutput_tuple_id}, {false}));
}
_intermediate_row_desc.reset(new RowDescriptor(
descs, tnode.nested_loop_join_node.vintermediate_tuple_id_list,
std::vector<bool>(tnode.nested_loop_join_node.vintermediate_tuple_id_list.size())));
Expand Down Expand Up @@ -166,6 +180,13 @@ void VJoinNodeBase::_construct_mutable_join_block() {
Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_block,
bool keep_origin) {
SCOPED_TIMER(_build_output_block_timer);
if (!_projections.empty()) {
// In previous versions, the join node had a separate set of project structures,
// and you could see a 'todo' in the Thrift definition.
// Here, we have refactored it, but considering upgrade compatibility, we still need to retain the old code.
*output_block = *origin_block;
return Status::OK();
}
auto is_mem_reuse = output_block->mem_reuse();
MutableBlock mutable_block =
is_mem_reuse
Expand Down
12 changes: 11 additions & 1 deletion be/src/vec/exec/join/vjoin_node_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@ class VJoinNodeBase : public ExecNode {

Status open(RuntimeState* state) override;

const RowDescriptor& row_desc() const override { return *_output_row_desc; }
const RowDescriptor& row_desc() const override {
if (_output_row_descriptor) {
return *_output_row_descriptor;
}
return *_output_row_desc;
}

const RowDescriptor& intermediate_row_desc() const override { return *_intermediate_row_desc; }

Expand Down Expand Up @@ -152,6 +157,11 @@ class VJoinNodeBase : public ExecNode {

std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
std::vector<IRuntimeFilter*> _runtime_filters;
// In the Old planner, there is a plan for two columns of tuple is null,
// but in the Nereids planner, this logic does not exist.
// Therefore, we should not insert these two columns under the Nereids optimizer.
// use_specific_projections true, if output exprssions is denoted by srcExprList represents, o.w. PlanNode.projections
const bool _use_specific_projections = false;
};

} // namespace doris::vectorized
4 changes: 4 additions & 0 deletions be/src/vec/exec/join/vnested_loop_join_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {

if (tnode.nested_loop_join_node.__isset.is_output_left_side_only) {
_is_output_left_side_only = tnode.nested_loop_join_node.is_output_left_side_only;
_keep_origin = _is_output_left_side_only;
}

if (tnode.nested_loop_join_node.__isset.join_conjuncts &&
Expand Down Expand Up @@ -382,6 +383,9 @@ void VNestedLoopJoinNode::_resize_fill_tuple_is_null_column(size_t new_size, int
}

void VNestedLoopJoinNode::_add_tuple_is_null_column(Block* block) {
if (!_use_specific_projections) {
return;
}
if (_is_outer_join) {
auto p0 = _tuple_is_null_left_flag_column->assume_mutable();
auto p1 = _tuple_is_null_right_flag_column->assume_mutable();
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/join/vnested_loop_join_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class VNestedLoopJoinNode final : public VJoinNodeBase {
const RowDescriptor& row_desc() const override {
return _old_version_flag
? (_output_row_descriptor ? *_output_row_descriptor : _row_descriptor)
: *_output_row_desc;
: (_output_row_descriptor ? *_output_row_descriptor : *_output_row_desc);
}

std::shared_ptr<Block> get_left_block() { return _left_block; }
Expand Down
4 changes: 4 additions & 0 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,8 @@ struct THashJoinNode {
11: optional bool is_mark
12: optional TJoinDistributionType dist_type
13: optional list<Exprs.TExpr> mark_join_conjuncts
// use_specific_projections true, if output exprssions is denoted by srcExprList represents, o.w. PlanNode.projections
14: optional bool use_specific_projections
}

struct TNestedLoopJoinNode {
Expand All @@ -815,6 +817,8 @@ struct TNestedLoopJoinNode {
8: optional list<Exprs.TExpr> join_conjuncts

9: optional list<Exprs.TExpr> mark_join_conjuncts
// use_specific_projections true, if output exprssions is denoted by srcExprList represents, o.w. PlanNode.projections
10: optional bool use_specific_projections
}

struct TMergeJoinNode {
Expand Down

0 comments on commit 9911303

Please sign in to comment.