Skip to content

Commit

Permalink
[Pipeline](exec) Support pipeline exec engine (apache#14736)
Browse files Browse the repository at this point in the history
Co-authored-by: Lijia Liu <liutang123@yeah.net>
Co-authored-by: HappenLee <happenlee@hotmail.com>
Co-authored-by: Jerry Hu <mrhhsg@gmail.com>
Co-authored-by: Pxl <952130278@qq.com>
Co-authored-by: shee <13843187+qzsee@users.noreply.github.com>
Co-authored-by: Gabriel <gabrielleebuaa@gmail.com>

## Problem Summary:

### 1. Design

DSIP: https://cwiki.apache.org/confluence/display/DORIS/DSIP-027%3A+Support+Pipeline+Exec+Engine

### 2. How to use:

Set the environment variable `set enable_pipeline_engine = true; `
  • Loading branch information
HappenLee authored Dec 2, 2022
1 parent 505019e commit 12304bc
Show file tree
Hide file tree
Showing 113 changed files with 5,755 additions and 401 deletions.
2 changes: 2 additions & 0 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ set(DORIS_LINK_LIBS
Webserver
Geo
Vec
Pipeline
${WL_END_GROUP}
)

Expand Down Expand Up @@ -910,6 +911,7 @@ endif()

add_subdirectory(${SRC_DIR}/util)
add_subdirectory(${SRC_DIR}/vec)
add_subdirectory(${SRC_DIR}/pipeline)

if (${MAKE_TEST} STREQUAL "ON")
add_subdirectory(${TEST_DIR})
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,8 @@ CONF_Bool(enable_java_support, "true");
// Set config randomly to check more issues in github workflow
CONF_Bool(enable_fuzzy_mode, "false");

CONF_Int32(pipeline_executor_size, "0");

#ifdef BE_TEST
// test s3
CONF_String(test_s3_resource, "resource");
Expand Down
24 changes: 12 additions & 12 deletions be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ namespace doris {
Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
const std::vector<TExpr>& output_exprs,
const TPlanFragmentExecParams& params,
const RowDescriptor& row_desc, bool is_vec,
const RowDescriptor& row_desc, RuntimeState* state,
std::unique_ptr<DataSink>* sink, DescriptorTbl& desc_tbl) {
DataSink* tmp_sink = nullptr;

Expand All @@ -61,10 +61,10 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
? params.send_query_statistics_with_every_batch
: false;
// TODO: figure out good buffer size based on size of output row
if (is_vec) {
tmp_sink = new doris::vectorized::VDataStreamSender(
pool, params.sender_id, row_desc, thrift_sink.stream_sink, params.destinations,
16 * 1024, send_query_statistics_with_every_batch);
if (state->enable_vectorized_exec()) {
tmp_sink = new vectorized::VDataStreamSender(
state, pool, params.sender_id, row_desc, thrift_sink.stream_sink,
params.destinations, 16 * 1024, send_query_statistics_with_every_batch);
} else {
tmp_sink = new DataStreamSender(pool, params.sender_id, row_desc,
thrift_sink.stream_sink, params.destinations, 16 * 1024,
Expand All @@ -80,7 +80,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
}

// TODO: figure out good buffer size based on size of output row
if (is_vec) {
if (state->enable_vectorized_exec()) {
tmp_sink = new doris::vectorized::VResultSink(row_desc, output_exprs,
thrift_sink.result_sink, 4096);
} else {
Expand All @@ -95,7 +95,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
}

// TODO: figure out good buffer size based on size of output row
if (is_vec) {
if (state->enable_vectorized_exec()) {
bool send_query_statistics_with_every_batch =
params.__isset.send_query_statistics_with_every_batch
? params.send_query_statistics_with_every_batch
Expand Down Expand Up @@ -139,7 +139,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
if (!thrift_sink.__isset.mysql_table_sink) {
return Status::InternalError("Missing data buffer sink.");
}
if (is_vec) {
if (state->enable_vectorized_exec()) {
doris::vectorized::VMysqlTableSink* vmysql_tbl_sink =
new doris::vectorized::VMysqlTableSink(pool, row_desc, output_exprs);
sink->reset(vmysql_tbl_sink);
Expand All @@ -158,7 +158,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
if (!thrift_sink.__isset.odbc_table_sink) {
return Status::InternalError("Missing data odbc sink.");
}
if (is_vec) {
if (state->enable_vectorized_exec()) {
sink->reset(new vectorized::VOdbcTableSink(pool, row_desc, output_exprs));
} else {
sink->reset(new OdbcTableSink(pool, row_desc, output_exprs));
Expand All @@ -170,7 +170,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
if (!thrift_sink.__isset.jdbc_table_sink) {
return Status::InternalError("Missing data jdbc sink.");
}
if (is_vec) {
if (state->enable_vectorized_exec()) {
if (config::enable_java_support) {
sink->reset(new vectorized::VJdbcTableSink(pool, row_desc, output_exprs));
} else {
Expand All @@ -196,7 +196,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
case TDataSinkType::OLAP_TABLE_SINK: {
Status status;
DCHECK(thrift_sink.__isset.olap_table_sink);
if (is_vec) {
if (state->enable_vectorized_exec()) {
sink->reset(new stream_load::VOlapTableSink(pool, row_desc, output_exprs, &status));
} else {
sink->reset(new stream_load::OlapTableSink(pool, row_desc, output_exprs, &status));
Expand All @@ -220,7 +220,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
}
}

if (sink->get() != nullptr) {
if (*sink != nullptr) {
RETURN_IF_ERROR((*sink)->init(thrift_sink));
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/data_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class DataSink {
virtual Status send(RuntimeState* state, RowBatch* batch) = 0;

// Send a Block into this sink.
virtual Status send(RuntimeState* state, vectorized::Block* block) {
virtual Status send(RuntimeState* state, vectorized::Block* block, bool eos = false) {
return Status::NotSupported("Not support send block");
};
// Releases all resources that were allocated in prepare()/send().
Expand All @@ -80,7 +80,7 @@ class DataSink {
static Status create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
const std::vector<TExpr>& output_exprs,
const TPlanFragmentExecParams& params,
const RowDescriptor& row_desc, bool is_vec,
const RowDescriptor& row_desc, RuntimeState* state,
std::unique_ptr<DataSink>* sink, DescriptorTbl& desc_tbl);

// Returns the runtime profile for the sink.
Expand Down
69 changes: 44 additions & 25 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,6 @@ namespace doris {

const std::string ExecNode::ROW_THROUGHPUT_COUNTER = "RowsReturnedRate";

int ExecNode::get_node_id_from_profile(RuntimeProfile* p) {
return p->metadata();
}

ExecNode::RowBatchQueue::RowBatchQueue(int max_batches) : BlockingQueue<RowBatch*>(max_batches) {}

ExecNode::RowBatchQueue::~RowBatchQueue() {
Expand Down Expand Up @@ -150,7 +146,8 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
_rows_returned_rate(nullptr),
_memory_used_counter(nullptr),
_get_next_span(),
_is_closed(false) {
_is_closed(false),
_ref(0) {
if (tnode.__isset.output_tuple_id) {
_output_row_descriptor.reset(new RowDescriptor(descs, {tnode.output_tuple_id}, {true}));
}
Expand Down Expand Up @@ -251,7 +248,7 @@ Status ExecNode::prepare(RuntimeState* state) {
return Status::OK();
}

Status ExecNode::open(RuntimeState* state) {
Status ExecNode::alloc_resource(doris::RuntimeState* state) {
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
if (_vconjunct_ctx_ptr) {
RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->open(state));
Expand All @@ -264,6 +261,10 @@ Status ExecNode::open(RuntimeState* state) {
}
}

Status ExecNode::open(RuntimeState* state) {
return alloc_resource(state);
}

Status ExecNode::reset(RuntimeState* state) {
_num_rows_returned = 0;
for (int i = 0; i < _children.size(); ++i) {
Expand All @@ -280,38 +281,43 @@ Status ExecNode::collect_query_statistics(QueryStatistics* statistics) {
return Status::OK();
}

void ExecNode::release_resource(doris::RuntimeState* state) {
if (!_is_resource_released) {
if (_rows_returned_counter != nullptr) {
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}

if (_vconjunct_ctx_ptr) {
(*_vconjunct_ctx_ptr)->close(state);
}
if (typeid(*this) != typeid(doris::vectorized::NewOlapScanNode)) {
Expr::close(_conjunct_ctxs, state);
}
vectorized::VExpr::close(_projections, state);

if (_buffer_pool_client.is_registered()) {
state->exec_env()->buffer_pool()->DeregisterClient(&_buffer_pool_client);
}

runtime_profile()->add_to_span();
_is_resource_released = true;
}
}

Status ExecNode::close(RuntimeState* state) {
if (_is_closed) {
return Status::OK();
}
_is_closed = true;

if (_rows_returned_counter != nullptr) {
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}

Status result;
for (int i = 0; i < _children.size(); ++i) {
auto st = _children[i]->close(state);
if (result.ok() && !st.ok()) {
result = st;
}
}

if (_vconjunct_ctx_ptr) {
(*_vconjunct_ctx_ptr)->close(state);
}
if (typeid(*this) != typeid(doris::vectorized::NewOlapScanNode)) {
Expr::close(_conjunct_ctxs, state);
}
vectorized::VExpr::close(_projections, state);

if (_buffer_pool_client.is_registered()) {
state->exec_env()->buffer_pool()->DeregisterClient(&_buffer_pool_client);
}

runtime_profile()->add_to_span();

release_resource(state);
return result;
}

Expand Down Expand Up @@ -855,4 +861,17 @@ Status ExecNode::get_next_after_projects(RuntimeState* state, vectorized::Block*
return get_next(state, block, eos);
}

Status ExecNode::execute(RuntimeState* state, vectorized::Block* input_block,
vectorized::Block* output_block, bool* eos) {
return Status::NotSupported("{} not implements execute", get_name());
}

Status ExecNode::pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) {
return Status::NotSupported("{} not implements pull", get_name());
}

Status ExecNode::sink(RuntimeState* state, vectorized::Block* input_block, bool eos) {
return Status::NotSupported("{} not implements sink", get_name());
}

} // namespace doris
52 changes: 46 additions & 6 deletions be/src/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ class Block;
class VExpr;
} // namespace vectorized

namespace pipeline {
class PipelineFragmentContext;
class Pipeline;
class Operator;
} // namespace pipeline

using std::string;
using std::stringstream;
using std::vector;
Expand Down Expand Up @@ -89,6 +95,11 @@ class ExecNode {
// Caller must not be holding any io buffers. This will cause deadlock.
virtual Status open(RuntimeState* state);

// Alloc and open resource for the node
// Only pipeline operator use exec node need to impl the virtual function
// so only vectorized exec node need to impl
virtual Status alloc_resource(RuntimeState* state);

// Retrieves rows and returns them via row_batch. Sets eos to true
// if subsequent calls will not retrieve any more rows.
// Data referenced by any tuples returned in row_batch must not be overwritten
Expand All @@ -104,10 +115,29 @@ class ExecNode {
// TODO: AggregationNode and HashJoinNode cannot be "re-opened" yet.
virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos);
virtual Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos);

// new interface to compatible new optimizers in FE
Status get_next_after_projects(RuntimeState* state, vectorized::Block* block, bool* eos);

// Process data
// Eg: Projection, Union All, HashProbe
virtual Status execute(RuntimeState* state, vectorized::Block* input_block,
vectorized::Block* output_block, bool* eos);

// Emit data, both need impl with method: sink
// Eg: Aggregation, Sort
virtual Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos);

virtual Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) {
return Status::OK();
}

bool can_read() const { return _can_read; }

// Sink Data to ExecNode to do some stock work, both need impl with method: get_result
// `eos` means source is exhausted, exec node should do some finalize work
// Eg: Aggregation, Sort
virtual Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos);

// Resets the stream of row batches to be retrieved by subsequent GetNext() calls.
// Clears all internal state, returning this node to the state it was in after calling
// Prepare() and before calling Open(). This function must not clear memory
Expand Down Expand Up @@ -140,6 +170,14 @@ class ExecNode {
// each implementation should start out by calling the default implementation.
virtual Status close(RuntimeState* state);

void increase_ref() { ++_ref; }
int decrease_ref() { return --_ref; }

// Release and close resource for the node
// Only pipeline operator use exec node need to impl the virtual function
// so only vectorized exec node need to impl
virtual void release_resource(RuntimeState* state);

// Creates exec node tree from list of nodes contained in plan via depth-first
// traversal. All nodes are placed in pool.
// Returns error if 'plan' is corrupted, otherwise success.
Expand Down Expand Up @@ -202,14 +240,14 @@ class ExecNode {

virtual std::string get_name();

// Extract node id from p->name().
static int get_node_id_from_profile(RuntimeProfile* p);

// Names of counters shared by all exec nodes
static const std::string ROW_THROUGHPUT_COUNTER;

ExecNode* child(int i) { return _children[i]; }

protected:
friend class DataSink;
friend class doris::pipeline::Operator;

/// Initialize 'buffer_pool_client_' and claim the initial reservation for this
/// ExecNode. Only needs to be called by ExecNodes that will use the client.
Expand Down Expand Up @@ -335,8 +373,6 @@ class ExecNode {
// Set to true if this is a vectorized exec node.
bool _is_vec = false;

ExecNode* child(int i) { return _children[i]; }

bool is_closed() const { return _is_closed; }

// TODO(zc)
Expand Down Expand Up @@ -372,9 +408,13 @@ class ExecNode {
/// allocations. ExecNodes overriding this function should return
/// ExecNode::QueryMaintenance().
virtual Status QueryMaintenance(RuntimeState* state, const std::string& msg) WARN_UNUSED_RESULT;
std::atomic<bool> _can_read = false;

private:
friend class pipeline::Operator;
bool _is_closed;
bool _is_resource_released = false;
std::atomic_int _ref; // used by pipeline operator to release resource.
};

} // namespace doris
Loading

0 comments on commit 12304bc

Please sign in to comment.