Skip to content

Commit

Permalink
[pipelineX](fix) fix two phase execution and add test cases (apache#2…
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Aug 25, 2023
1 parent f80b067 commit 49a32c2
Show file tree
Hide file tree
Showing 24 changed files with 82 additions and 42 deletions.
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -557,8 +557,7 @@ Status AggSourceOperatorX::setup_local_state(RuntimeState* state, LocalStateInfo
}

bool AggSourceOperatorX::can_read(RuntimeState* state) {
auto& local_state = state->get_local_state(id())->cast<AggLocalState>();
return local_state._dependency->done();
return state->get_local_state(id())->cast<AggLocalState>()._dependency->done();
}

} // namespace pipeline
Expand Down
6 changes: 1 addition & 5 deletions be/src/pipeline/exec/exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,7 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
return Status::OK();
}
RETURN_IF_ERROR(PipelineXLocalState::init(state, info));
auto& parent_ref = _parent->cast<ExchangeSourceOperatorX>();
stream_recvr = _state->exec_env()->vstream_mgr()->create_recvr(
_state, parent_ref._input_row_desc, _state->fragment_instance_id(), parent_ref._id,
parent_ref._num_senders, profile(), parent_ref._is_merging,
parent_ref._sub_plan_query_statistics_recvr);
stream_recvr = info.recvr;
RETURN_IF_ERROR(_parent->cast<ExchangeSourceOperatorX>()._vsort_exec_exprs.clone(
state, vsort_exec_exprs));
_init = true;
Expand Down
10 changes: 10 additions & 0 deletions be/src/pipeline/exec/exchange_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ class ExchangeSourceOperatorX final : public OperatorXBase {

Status close(RuntimeState* state) override;
bool is_source() const override { return true; }
bool need_to_create_exch_recv() const override { return true; }

RowDescriptor input_row_desc() const { return _input_row_desc; }

int num_senders() const { return _num_senders; }
bool is_merging() const { return _is_merging; }

std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr() {
return _sub_plan_query_statistics_recvr;
}

private:
friend class ExchangeLocalState;
Expand Down
7 changes: 7 additions & 0 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
#include "vec/runtime/vdata_stream_recvr.h"
#include "vec/sink/vresult_sink.h"

namespace doris {
class DataSink;
Expand Down Expand Up @@ -484,12 +486,14 @@ class StatefulOperator : public StreamingOperator<OperatorBuilderType> {
struct LocalStateInfo {
const std::vector<TScanRangeParams> scan_ranges;
Dependency* dependency;
std::shared_ptr<vectorized::VDataStreamRecvr> recvr;
};

// This struct is used only for initializing local sink state.
struct LocalSinkStateInfo {
const int sender_id;
Dependency* dependency;
std::shared_ptr<BufferControlBlock> sender;
};

class PipelineXLocalState {
Expand Down Expand Up @@ -674,6 +678,7 @@ class OperatorXBase : public OperatorBase {
}

virtual bool is_source() const override { return false; }
[[nodiscard]] virtual bool need_to_create_exch_recv() const { return false; }

Status get_next_after_projects(RuntimeState* state, vectorized::Block* block,
SourceState& source_state);
Expand Down Expand Up @@ -768,6 +773,8 @@ class DataSinkOperatorX : public OperatorBase {

virtual Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) = 0;

[[nodiscard]] virtual bool need_to_create_result_sender() const { return false; }

template <class TARGET>
TARGET& cast() {
DCHECK(dynamic_cast<TARGET*>(this));
Expand Down
16 changes: 5 additions & 11 deletions be/src/pipeline/exec/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info)
// create profile
_profile = state->obj_pool()->add(new RuntimeProfile(title));
// create sender
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(state->fragment_instance_id(),
p._buf_size, &_sender, true,
state->execution_timeout()));
_sender = info.sender;
_output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size());
for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i]));
Expand All @@ -81,11 +79,8 @@ Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info)

ResultSinkOperatorX::ResultSinkOperatorX(const RowDescriptor& row_desc,
const std::vector<TExpr>& t_output_expr,
const TResultSink& sink, int buffer_size)
: DataSinkOperatorX(0),
_row_desc(row_desc),
_t_output_expr(t_output_expr),
_buf_size(buffer_size) {
const TResultSink& sink)
: DataSinkOperatorX(0), _row_desc(row_desc), _t_output_expr(t_output_expr) {
if (!sink.__isset.type || sink.type == TResultSinkType::MYSQL_PROTOCAL) {
_sink_type = TResultSinkType::MYSQL_PROTOCAL;
} else {
Expand Down Expand Up @@ -185,7 +180,6 @@ Status ResultSinkLocalState::close(RuntimeState* state) {
}

bool ResultSinkOperatorX::can_write(RuntimeState* state) {
auto& local_state = state->get_sink_local_state(id())->cast<ResultSinkLocalState>();
return local_state._sender->can_sink();
return state->get_sink_local_state(id())->cast<ResultSinkLocalState>()._sender->can_sink();
}
} // namespace doris::pipeline
} // namespace doris::pipeline
5 changes: 3 additions & 2 deletions be/src/pipeline/exec/result_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class ResultSinkLocalState final : public PipelineXSinkLocalState {
class ResultSinkOperatorX final : public DataSinkOperatorX {
public:
ResultSinkOperatorX(const RowDescriptor& row_desc, const std::vector<TExpr>& select_exprs,
const TResultSink& sink, int buffer_size);
const TResultSink& sink);
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) override;
Expand All @@ -75,6 +75,8 @@ class ResultSinkOperatorX final : public DataSinkOperatorX {

bool can_write(RuntimeState* state) override;

[[nodiscard]] bool need_to_create_result_sender() const override { return true; }

private:
friend class ResultSinkLocalState;

Expand All @@ -89,7 +91,6 @@ class ResultSinkOperatorX final : public DataSinkOperatorX {
// Owned by the RuntimeState.
const std::vector<TExpr>& _t_output_expr;
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
int _buf_size; // Allocated from _pool

// for fetch data by rowids
TFetchOption _fetch_option;
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ class PipelineTask {
}
}

TUniqueId instance_id() const { return _state->fragment_instance_id(); }

protected:
void _finish_p_dependency() {
for (const auto& p : _pipeline->_parents) {
Expand Down
26 changes: 23 additions & 3 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <opentelemetry/trace/span_context.h>
#include <opentelemetry/trace/tracer.h>
#include <pthread.h>
#include <runtime/result_buffer_mgr.h>
#include <stdlib.h>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
Expand Down Expand Up @@ -252,8 +253,7 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
}

// TODO: figure out good buffer size based on size of output row
_sink.reset(new ResultSinkOperatorX(row_desc, output_exprs, thrift_sink.result_sink,
vectorized::RESULT_SINK_BUFFER_SIZE));
_sink.reset(new ResultSinkOperatorX(row_desc, output_exprs, thrift_sink.result_sink));
break;
}
default:
Expand Down Expand Up @@ -302,10 +302,30 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
auto scan_ranges = find_with_default(local_params.per_node_scan_ranges,
_pipelines[pip_idx]->operator_xs().front()->id(),
no_scan_ranges);
std::shared_ptr<BufferControlBlock> sender = nullptr;
if (_pipelines[pip_idx]->sink_x()->need_to_create_result_sender()) {
// create sender
RETURN_IF_ERROR(_runtime_states[i]->exec_env()->result_mgr()->create_sender(
_runtime_states[i]->fragment_instance_id(),
vectorized::RESULT_SINK_BUFFER_SIZE, &sender, true,
_runtime_states[i]->execution_timeout()));
}

std::shared_ptr<vectorized::VDataStreamRecvr> recvr = nullptr;
if (_pipelines[pip_idx]->operator_xs().front()->need_to_create_exch_recv()) {
auto* src =
(ExchangeSourceOperatorX*)_pipelines[pip_idx]->operator_xs().front().get();
recvr = _runtime_states[i]->exec_env()->vstream_mgr()->create_recvr(
_runtime_states[i].get(), src->input_row_desc(),
_runtime_states[i]->fragment_instance_id(), src->id(), src->num_senders(),
_runtime_profile.get(), src->is_merging(),
src->sub_plan_query_statistics_recvr());
}

auto task = std::make_unique<PipelineXTask>(
_pipelines[pip_idx], _total_tasks++, _runtime_states[i].get(), this,
_pipelines[pip_idx]->pipeline_profile(), scan_ranges, local_params.sender_id);
_pipelines[pip_idx]->pipeline_profile(), scan_ranges, local_params.sender_id,
sender, recvr);
pipeline_id_to_task.insert({_pipelines[pip_idx]->id(), task.get()});
RETURN_IF_ERROR(task->prepare(_runtime_states[i].get()));
_runtime_profile->add_child(_pipelines[pip_idx]->pipeline_profile(), true, nullptr);
Expand Down
12 changes: 8 additions & 4 deletions be/src/pipeline/pipeline_x/pipeline_x_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,18 @@ namespace doris::pipeline {
PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state,
PipelineFragmentContext* fragment_context,
RuntimeProfile* parent_profile,
const std::vector<TScanRangeParams>& scan_ranges, const int sender_id)
const std::vector<TScanRangeParams>& scan_ranges, const int sender_id,
std::shared_ptr<BufferControlBlock>& sender,
std::shared_ptr<vectorized::VDataStreamRecvr>& recvr)
: PipelineTask(pipeline, index, state, fragment_context, parent_profile),
_scan_ranges(scan_ranges),
_operators(pipeline->operator_xs()),
_source(_operators.front()),
_root(_operators.back()),
_sink(pipeline->sink_shared_pointer()),
_sender_id(sender_id) {
_sender_id(sender_id),
_sender(sender),
_recvr(recvr) {
_pipeline_task_watcher.start();
_sink->get_dependency(_downstream_dependency);
}
Expand Down Expand Up @@ -99,13 +103,13 @@ Status PipelineXTask::_open() {
Dependency* dep = _upstream_dependency.find(o->id()) == _upstream_dependency.end()
? (Dependency*)nullptr
: _upstream_dependency.find(o->id())->second.get();
LocalStateInfo info {_scan_ranges, dep};
LocalStateInfo info {_scan_ranges, dep, _recvr};
Status cur_st = o->setup_local_state(_state, info);
if (!cur_st.ok()) {
st = cur_st;
}
}
LocalSinkStateInfo info {_sender_id, _downstream_dependency.get()};
LocalSinkStateInfo info {_sender_id, _downstream_dependency.get(), _sender};
RETURN_IF_ERROR(_sink->setup_local_state(_state, info));
RETURN_IF_ERROR(st);
_opened = true;
Expand Down
8 changes: 7 additions & 1 deletion be/src/pipeline/pipeline_x/pipeline_x_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "vec/core/block.h"
#include "vec/sink/vresult_sink.h"

namespace doris {
class QueryContext;
Expand All @@ -50,7 +51,9 @@ class PipelineXTask : public PipelineTask {
public:
PipelineXTask(PipelinePtr& pipeline, uint32_t index, RuntimeState* state,
PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile,
const std::vector<TScanRangeParams>& scan_ranges, const int sender_id);
const std::vector<TScanRangeParams>& scan_ranges, const int sender_id,
std::shared_ptr<BufferControlBlock>& sender,
std::shared_ptr<vectorized::VDataStreamRecvr>& recvr);

Status prepare(RuntimeState* state) override;

Expand Down Expand Up @@ -127,5 +130,8 @@ class PipelineXTask : public PipelineTask {

DependencyMap _upstream_dependency;
DependencySPtr _downstream_dependency;

std::shared_ptr<BufferControlBlock> _sender;
std::shared_ptr<vectorized::VDataStreamRecvr> _recvr;
};
} // namespace doris::pipeline
3 changes: 2 additions & 1 deletion be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ void BlockedTaskScheduler::_schedule() {
} else if (task->query_context()->is_timeout(now)) {
LOG(WARNING) << "Timeout, query_id=" << print_id(task->query_context()->query_id())
<< ", instance_id="
<< print_id(task->fragment_context()->get_fragment_instance_id());
<< print_id(task->fragment_context()->get_fragment_instance_id())
<< ", task info: " << task->debug_string();

task->fragment_context()->cancel(PPlanFragmentCancelReason::TIMEOUT);
_make_task_run(local_blocked_tasks, iter, ready_tasks);
Expand Down
2 changes: 1 addition & 1 deletion regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.

SELECT SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
FROM lineorder_flat
WHERE
LO_ORDERDATE >= 19930101
Expand Down
2 changes: 1 addition & 1 deletion regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q1.2
SELECT SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
FROM lineorder_flat
WHERE
LO_ORDERDATE >= 19940101
Expand Down
2 changes: 1 addition & 1 deletion regression-test/suites/ssb_sf0.1_p1/sql/flat_q1.3.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q1.3
SELECT SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ SUM(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
FROM lineorder_flat
WHERE
weekofyear(LO_ORDERDATE) = 6
Expand Down
2 changes: 1 addition & 1 deletion regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q2.1
SELECT
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(LO_REVENUE), (LO_ORDERDATE DIV 10000) AS YEAR,
P_BRAND
FROM lineorder_flat
Expand Down
2 changes: 1 addition & 1 deletion regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q2.2
SELECT
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(LO_REVENUE), (LO_ORDERDATE DIV 10000) AS YEAR,
P_BRAND
FROM lineorder_flat
Expand Down
2 changes: 1 addition & 1 deletion regression-test/suites/ssb_sf0.1_p1/sql/flat_q2.3.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q2.3
SELECT
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(LO_REVENUE), (LO_ORDERDATE DIV 10000) AS YEAR,
P_BRAND
FROM lineorder_flat
Expand Down
2 changes: 1 addition & 1 deletion regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q3.1
SELECT
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
C_NATION,
S_NATION, (LO_ORDERDATE DIV 10000) AS YEAR,
SUM(LO_REVENUE) AS revenue
Expand Down
2 changes: 1 addition & 1 deletion regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q3.2
SELECT
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
C_CITY,
S_CITY, (LO_ORDERDATE DIV 10000) AS YEAR,
SUM(LO_REVENUE) AS revenue
Expand Down
2 changes: 1 addition & 1 deletion regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.3.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q3.3
SELECT
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
C_CITY,
S_CITY, (LO_ORDERDATE DIV 10000) AS YEAR,
SUM(LO_REVENUE) AS revenue
Expand Down
2 changes: 1 addition & 1 deletion regression-test/suites/ssb_sf0.1_p1/sql/flat_q3.4.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q3.4
SELECT
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
C_CITY,
S_CITY, (LO_ORDERDATE DIV 10000) AS YEAR,
SUM(LO_REVENUE) AS revenue
Expand Down
2 changes: 1 addition & 1 deletion regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q4.1
SELECT (LO_ORDERDATE DIV 10000) AS YEAR,
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ (LO_ORDERDATE DIV 10000) AS YEAR,
C_NATION,
SUM(LO_REVENUE - LO_SUPPLYCOST) AS profit
FROM lineorder_flat
Expand Down
2 changes: 1 addition & 1 deletion regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q4.2
SELECT (LO_ORDERDATE DIV 10000) AS YEAR,
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ (LO_ORDERDATE DIV 10000) AS YEAR,
S_NATION,
P_CATEGORY,
SUM(LO_REVENUE - LO_SUPPLYCOST) AS profit
Expand Down
2 changes: 1 addition & 1 deletion regression-test/suites/ssb_sf0.1_p1/sql/flat_q4.3.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
--Q4.3
SELECT (LO_ORDERDATE DIV 10000) AS YEAR,
SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ (LO_ORDERDATE DIV 10000) AS YEAR,
S_CITY,
P_BRAND,
SUM(LO_REVENUE - LO_SUPPLYCOST) AS profit
Expand Down

0 comments on commit 49a32c2

Please sign in to comment.