Skip to content

Commit

Permalink
Decompose UnionNode into pipeline (StarRocks#853)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZiheLiu authored Oct 26, 2021
1 parent c706efe commit 6858900
Show file tree
Hide file tree
Showing 11 changed files with 442 additions and 8 deletions.
2 changes: 2 additions & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ set(EXEC_FILES
pipeline/analysis/analytic_sink_operator.cpp
pipeline/analysis/analytic_source_operator.cpp
pipeline/assert_num_rows_operator.cpp
pipeline/set/union_passthrough_operator.cpp
pipeline/set/union_const_source_operator.cpp
)

set(EXEC_FILES
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class OperatorFactory {
: _id(id), _name(name), _plan_node_id(plan_node_id) {}
virtual ~OperatorFactory() = default;
// Create the operator for the specific sequence driver
// For some operators, when share some status, need to know the the degree_of_parallelism
// For some operators, when share some status, need to know the degree_of_parallelism
virtual OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) = 0;
virtual bool is_source() const { return false; }
int32_t plan_node_id() const { return _plan_node_id; }
Expand Down
19 changes: 19 additions & 0 deletions be/src/exec/pipeline/pipeline_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,25 @@ OpFactories PipelineBuilderContext::maybe_interpolate_local_exchange(OpFactories
}
}

OpFactories PipelineBuilderContext::gather_pipelines_to_one(std::vector<OpFactories>& pred_operators_list) {
auto mem_mgr = std::make_shared<LocalExchangeMemoryManager>(config::vector_chunk_size);
auto local_exchange_source = std::make_shared<LocalExchangeSourceOperatorFactory>(next_operator_id(), mem_mgr);
auto exchanger = std::make_shared<PassthroughExchanger>(mem_mgr, local_exchange_source.get());

// Append a LocalExchangeSinkOperator to the tail of each pipeline.
for (auto& pred_operators : pred_operators_list) {
auto local_exchange_sink = std::make_shared<LocalExchangeSinkOperatorFactory>(next_operator_id(), exchanger);
pred_operators.emplace_back(std::move(local_exchange_sink));
add_pipeline(pred_operators);
}

// Create a new pipeline with a LocalExchangeSourceOperator.
OpFactories operators_source_with_local_exchange;
local_exchange_source->set_degree_of_parallelism(1);
operators_source_with_local_exchange.emplace_back(std::move(local_exchange_source));
return operators_source_with_local_exchange;
}

Pipelines PipelineBuilder::build(const FragmentContext& fragment, ExecNode* exec_node) {
pipeline::OpFactories operators = exec_node->decompose_to_pipeline(&_context);
_context.add_pipeline(operators);
Expand Down
7 changes: 7 additions & 0 deletions be/src/exec/pipeline/pipeline_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ class PipelineBuilderContext {

OpFactories maybe_interpolate_local_exchange(OpFactories& pred_operators);

// Uses local exchange to gather the output chunks of multiple predecessor pipelines
// into a new pipeline, which the successor operator belongs to.
// Append a LocalExchangeSinkOperator to the tail of each pipeline.
// Create a new pipeline with a LocalExchangeSourceOperator.
// These local exchange sink operators and the source operator share a passthrough exchanger.
OpFactories gather_pipelines_to_one(std::vector<OpFactories>& pred_operators_list);

uint32_t next_pipe_id() { return _next_pipeline_id++; }

uint32_t next_operator_id() { return _next_operator_id++; }
Expand Down
70 changes: 70 additions & 0 deletions be/src/exec/pipeline/set/union_const_source_operator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// This file is licensed under the Elastic License 2.0. Copyright 2021 StarRocks Limited.

#include "exec/pipeline/set/union_const_source_operator.h"

#include "column/column_helper.h"

namespace starrocks::pipeline {

StatusOr<vectorized::ChunkPtr> UnionConstSourceOperator::pull_chunk(starrocks::RuntimeState* state) {
DCHECK(0 <= _next_processed_row_index && _next_processed_row_index < _rows_total);

auto chunk = std::make_shared<vectorized::Chunk>();

size_t rows_count =
std::min(static_cast<size_t>(config::vector_chunk_size), _rows_total - _next_processed_row_index);
size_t columns_count = _dst_slots.size();

for (size_t col_i = 0; col_i < columns_count; col_i++) {
const auto* dst_slot = _dst_slots[col_i];

vectorized::ColumnPtr dst_column =
vectorized::ColumnHelper::create_column(dst_slot->type(), dst_slot->is_nullable());
dst_column->reserve(rows_count);

for (size_t row_i = 0; row_i < rows_count; row_i++) {
// Each const_expr_list is projected to ONE dest row.
DCHECK_EQ(_const_expr_lists[_next_processed_row_index + row_i].size(), columns_count);
ColumnPtr src_column = _const_expr_lists[_next_processed_row_index + row_i][col_i]->evaluate(nullptr);
if (src_column->is_nullable()) {
DCHECK(dst_column->is_nullable());
dst_column->append_nulls(1);
} else {
auto* src_const_column = vectorized::ColumnHelper::as_raw_column<vectorized::ConstColumn>(src_column);
dst_column->append(*src_const_column->data_column(), 0, 1);
}
}

chunk->append_column(std::move(dst_column), dst_slot->id());
}

_next_processed_row_index += rows_count;

DCHECK_CHUNK(chunk);
return std::move(chunk);
}

Status UnionConstSourceOperatorFactory::prepare(RuntimeState* state, MemTracker* mem_tracker) {
RETURN_IF_ERROR(OperatorFactory::prepare(state, mem_tracker));

RowDescriptor row_desc;
for (const vector<ExprContext*>& exprs : _const_expr_lists) {
RETURN_IF_ERROR(Expr::prepare(exprs, state, row_desc, mem_tracker));
}

for (const vector<ExprContext*>& exprs : _const_expr_lists) {
RETURN_IF_ERROR(Expr::open(exprs, state));
}

return Status::OK();
}

void UnionConstSourceOperatorFactory::close(RuntimeState* state) {
OperatorFactory::close(state);

for (const vector<ExprContext*>& exprs : _const_expr_lists) {
Expr::close(exprs, state);
}
}

} // namespace starrocks::pipeline
86 changes: 86 additions & 0 deletions be/src/exec/pipeline/set/union_const_source_operator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// This file is licensed under the Elastic License 2.0. Copyright 2021 StarRocks Limited.
#pragma once

#include "exec/exec_node.h"
#include "exec/pipeline/source_operator.h"

namespace starrocks {
namespace pipeline {
// UNION ALL operator has three kinds of sub-node as follows:
// 1. Passthrough.
// The src column from sub-node is projected to the dest column without expressions.
// A src column may be projected to the multiple dest columns.
// *UnionPassthroughOperator* is used for this case.
// 2. Materialize.
// The src column is projected to the dest column with expressions.
// *ProjectOperator* is used for this case.
// 3. Const.
// Use the evaluation result of const expressions WITHOUT sub-node as the dest column.
// Each expression is projected to the one dest row.
// *UnionConstSourceOperator* is used for this case.

// UnionConstSourceOperator is for the Const kind of sub-node.
class UnionConstSourceOperator final : public SourceOperator {
public:
UnionConstSourceOperator(int32_t id, int32_t plan_node_id, const std::vector<SlotDescriptor*>& dst_slots,
const std::vector<ExprContext*>* const const_expr_lists, const size_t rows_total)
: SourceOperator(id, "union_const_source", plan_node_id),
_dst_slots(dst_slots),
_const_expr_lists(const_expr_lists),
_rows_total(rows_total) {
DCHECK_NOTNULL(_const_expr_lists);
DCHECK_EQ(_const_expr_lists->size(), _rows_total);
}

bool has_output() const override { return _next_processed_row_index < _rows_total; }

bool is_finished() const override { return !has_output(); };

// finish is noop.
void finish(RuntimeState* state) override{};

StatusOr<vectorized::ChunkPtr> pull_chunk(RuntimeState* state) override;

private:
const std::vector<SlotDescriptor*>& _dst_slots;

// The evaluation of each const expr_list is projected to ONE dest row.
// It references to the part of the UnionConstSourceOperatorFactory::_const_expr_lists.
const std::vector<ExprContext*>* const _const_expr_lists;
const size_t _rows_total;
size_t _next_processed_row_index = 0;
};

class UnionConstSourceOperatorFactory final : public SourceOperatorFactory {
public:
UnionConstSourceOperatorFactory(int32_t id, int32_t plan_node_id, const std::vector<SlotDescriptor*>& dst_slots,
const std::vector<std::vector<ExprContext*>>& const_expr_lists)
: SourceOperatorFactory(id, "union_const_source", plan_node_id),
_dst_slots(dst_slots),
_const_expr_lists(const_expr_lists) {}

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override {
// Divide _const_expr_lists into *degree_of_parallelism* parts,
// each of which contains *rows_num_per_driver* continuous rows except the last part.
size_t rows_total = _const_expr_lists.size();
size_t rows_num_per_driver = (rows_total + degree_of_parallelism - 1) / degree_of_parallelism;
size_t rows_offset = rows_num_per_driver * driver_sequence;
DCHECK(rows_total > rows_offset);
size_t rows_count = std::min(rows_num_per_driver, rows_total - rows_offset);

return std::make_shared<UnionConstSourceOperator>(_id, _plan_node_id, _dst_slots,
_const_expr_lists.data() + rows_offset, rows_count);
}

Status prepare(RuntimeState* state, MemTracker* mem_tracker) override;
void close(RuntimeState* state) override;

private:
const std::vector<SlotDescriptor*>& _dst_slots;

// The evaluation of each const expr_list is projected to ONE dest row.
const std::vector<std::vector<ExprContext*>>& _const_expr_lists;
};

} // namespace pipeline
} // namespace starrocks
73 changes: 73 additions & 0 deletions be/src/exec/pipeline/set/union_passthrough_operator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// This file is licensed under the Elastic License 2.0. Copyright 2021 StarRocks Limited.

#include "exec/pipeline/set/union_passthrough_operator.h"

#include "column/column_helper.h"
#include "column/nullable_column.h"

namespace starrocks {
namespace pipeline {
Status UnionPassthroughOperator::push_chunk(RuntimeState* state, const ChunkPtr& src_chunk) {
DCHECK_EQ(_dst_chunk, nullptr);

_dst_chunk = std::make_shared<vectorized::Chunk>();

if (_dst2src_slot_map != nullptr) {
for (auto* dst_slot : _dst_slots) {
auto& src_slot_item = (*_dst2src_slot_map)[dst_slot->id()];
ColumnPtr& src_column = src_chunk->get_column_by_slot_id(src_slot_item.slot_id);
// If there are multiple dest slots mapping to the same src slot id,
// we should clone the src column instead of directly moving the src column.
if (src_slot_item.ref_count > 1) {
_clone_column(_dst_chunk, src_column, dst_slot, src_chunk->num_rows());
} else {
_move_column(_dst_chunk, src_column, dst_slot, src_chunk->num_rows());
}
}
} else {
// For backward compatibility, the indexes of the src and dst slots are one-to-one correspondence.
// TODO: when StarRocks 2.0 release, we could remove this branch.
size_t i = 0;
// When passthrough, the child tuple size must be 1;
for (auto* src_slot : _src_slots) {
auto* dst_slot = _dst_slots[i++];
ColumnPtr& src_column = src_chunk->get_column_by_slot_id(src_slot->id());
_move_column(_dst_chunk, src_column, dst_slot, src_chunk->num_rows());
}
}

DCHECK_CHUNK(_dst_chunk);

return Status::OK();
}

StatusOr<vectorized::ChunkPtr> UnionPassthroughOperator::pull_chunk(RuntimeState* state) {
return std::move(_dst_chunk);
}

void UnionPassthroughOperator::_clone_column(ChunkPtr& dst_chunk, const ColumnPtr& src_column,
const SlotDescriptor* dst_slot, size_t row_count) {
if (src_column->is_nullable() || !dst_slot->is_nullable()) {
dst_chunk->append_column(src_column->clone_shared(), dst_slot->id());
} else {
// If the dst slot is nullable and the src slot isn't nullable, we need insert null mask to column.
ColumnPtr nullable_column = vectorized::NullableColumn::create(src_column->clone_shared(),
vectorized::NullColumn::create(row_count, 0));
dst_chunk->append_column(std::move(nullable_column), dst_slot->id());
}
}

void UnionPassthroughOperator::_move_column(ChunkPtr& dst_chunk, ColumnPtr& src_column, const SlotDescriptor* dst_slot,
size_t row_count) {
if (src_column->is_nullable() || !dst_slot->is_nullable()) {
dst_chunk->append_column(std::move(src_column), dst_slot->id());
} else {
// If the dst slot is nullable and the src slot isn't nullable, we need insert null mask to column.
ColumnPtr nullable_column =
vectorized::NullableColumn::create(src_column, vectorized::NullColumn::create(row_count, 0));
dst_chunk->append_column(std::move(nullable_column), dst_slot->id());
}
}

} // namespace pipeline
} // namespace starrocks
96 changes: 96 additions & 0 deletions be/src/exec/pipeline/set/union_passthrough_operator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// This file is licensed under the Elastic License 2.0. Copyright 2021 StarRocks Limited.

#pragma once

#include "exec/exec_node.h"
#include "exec/pipeline/operator.h"

namespace starrocks {
namespace pipeline {
// UNION ALL operator has three kinds of sub-node as follows:
// 1. Passthrough.
// The src column from sub-node is projected to the dest column without expressions.
// A src column may be projected to the multiple dest columns.
// *UnionPassthroughOperator* is used for this case.
// 2. Materialize.
// The src column is projected to the dest column with expressions.
// *ProjectOperator* is used for this case.
// 3. Const.
// Use the evaluation result of const expressions WITHOUT sub-node as the dest column.
// Each expression is projected to the one dest row.
// *UnionConstSourceOperator* is used for this case.

// UnionPassthroughOperator is for the Passthrough kind of sub-node.
class UnionPassthroughOperator final : public Operator {
public:
struct SlotItem {
SlotId slot_id;
size_t ref_count;
};

using SlotMap = std::unordered_map<SlotId, SlotItem>;

UnionPassthroughOperator(int32_t id, int32_t plan_node_id, SlotMap* dst2src_slot_map,
const std::vector<SlotDescriptor*>& slots, const std::vector<SlotDescriptor*>& src_slots)
: Operator(id, "union_passthrough", plan_node_id),
_dst2src_slot_map(dst2src_slot_map),
_dst_slots(slots),
_src_slots(src_slots) {}

bool need_input() const override { return !_is_finished && _dst_chunk == nullptr; }

bool has_output() const override { return _dst_chunk != nullptr; }

bool is_finished() const override { return _is_finished && _dst_chunk == nullptr; }

void finish(RuntimeState* state) override { _is_finished = true; }

Status push_chunk(RuntimeState* state, const ChunkPtr& src_chunk) override;

StatusOr<vectorized::ChunkPtr> pull_chunk(RuntimeState* state) override;

private:
// Clones the src column to the dst chunk, which used when the src column is mapped to multiple dest columns.
void _clone_column(ChunkPtr& dst_chunk, const ColumnPtr& src_column, const SlotDescriptor* dst_slot,
size_t row_count);
// Moves the src column to the dst chunk, which used when the src column is mapped to the only one dest column.
void _move_column(ChunkPtr& dst_chunk, ColumnPtr& src_column, const SlotDescriptor* dst_slot, size_t row_count);

// Maps the dst slot id of the dest chunk to that of the src chunk.
// There may be multiple dest slot ids mapping to the same src slot id,
// so we should decide whether you can move the src column according to this situation.
SlotMap* _dst2src_slot_map;

const std::vector<SlotDescriptor*>& _dst_slots;
const std::vector<SlotDescriptor*>& _src_slots;

bool _is_finished = false;
vectorized::ChunkPtr _dst_chunk = nullptr;
};

class UnionPassthroughOperatorFactory final : public OperatorFactory {
public:
UnionPassthroughOperatorFactory(int32_t id, int32_t plan_node_id,
UnionPassthroughOperator::SlotMap* dst2src_slot_map,
const std::vector<SlotDescriptor*>& dst_slots,
const std::vector<SlotDescriptor*>& src_slots)
: OperatorFactory(id, "union_passthrough", plan_node_id),
_dst2src_slot_map(dst2src_slot_map),
_dst_slots(dst_slots),
_src_slots(src_slots) {}

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override {
return std::make_shared<UnionPassthroughOperator>(_id, _plan_node_id, _dst2src_slot_map, _dst_slots,
_src_slots);
}

private:
// It will be nullptr, if _pass_through_slot_maps of UnionNode is empty.
UnionPassthroughOperator::SlotMap* _dst2src_slot_map;

const std::vector<SlotDescriptor*>& _dst_slots;
const std::vector<SlotDescriptor*>& _src_slots;
};

} // namespace pipeline
} // namespace starrocks
Loading

0 comments on commit 6858900

Please sign in to comment.