Skip to content

Commit

Permalink
[refine](pipelineX) refine some code in pipelineX (apache#27472)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange authored Nov 27, 2023
1 parent 82d1566 commit 3838b6f
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 61 deletions.
5 changes: 0 additions & 5 deletions be/src/pipeline/pipeline_x/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -528,11 +528,6 @@ Status AsyncWriterSink<Writer, Parent>::sink(RuntimeState* state, vectorized::Bl
return _writer->sink(block, source_state == SourceState::FINISHED);
}

template <typename Writer, typename Parent>
Dependency* AsyncWriterSink<Writer, Parent>::write_blocked_by(PipelineXTask* task) {
return _writer->write_blocked_by(task);
}

template <typename Writer, typename Parent>
Status AsyncWriterSink<Writer, Parent>::close(RuntimeState* state, Status exec_status) {
if (_closed) {
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/pipeline_x/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,6 @@ class AsyncWriterSink : public PipelineXSinkLocalState<FakeDependency> {

Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state);

Dependency* write_blocked_by(PipelineXTask* task);
Dependency* dependency() override { return _async_writer_dependency.get(); }
Status close(RuntimeState* state, Status exec_status) override;

Expand Down
56 changes: 30 additions & 26 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
params.__isset.send_query_statistics_with_every_batch
? params.send_query_statistics_with_every_batch
: false;
_sink.reset(new ExchangeSinkOperatorX(state, row_desc, next_operator_id(),
_sink.reset(new ExchangeSinkOperatorX(state, row_desc, next_sink_operator_id(),
thrift_sink.stream_sink, params.destinations,
send_query_statistics_with_every_batch));
break;
Expand All @@ -272,18 +272,18 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
}

// TODO: figure out good buffer size based on size of output row
_sink.reset(new ResultSinkOperatorX(next_operator_id(), row_desc, output_exprs,
_sink.reset(new ResultSinkOperatorX(next_sink_operator_id(), row_desc, output_exprs,
thrift_sink.result_sink));
break;
}
case TDataSinkType::OLAP_TABLE_SINK: {
if (state->query_options().enable_memtable_on_sink_node &&
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
_sink.reset(new OlapTableSinkV2OperatorX(pool, next_operator_id(), row_desc,
_sink.reset(new OlapTableSinkV2OperatorX(pool, next_sink_operator_id(), row_desc,
output_exprs, false));
} else {
_sink.reset(new OlapTableSinkOperatorX(pool, next_operator_id(), row_desc, output_exprs,
false));
_sink.reset(new OlapTableSinkOperatorX(pool, next_sink_operator_id(), row_desc,
output_exprs, false));
}
break;
}
Expand All @@ -292,7 +292,8 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
return Status::InternalError("Missing data jdbc sink.");
}
if (config::enable_java_support) {
_sink.reset(new JdbcTableSinkOperatorX(row_desc, next_operator_id(), output_exprs));
_sink.reset(
new JdbcTableSinkOperatorX(row_desc, next_sink_operator_id(), output_exprs));
} else {
return Status::InternalError(
"Jdbc table sink is not enabled, you can change be config "
Expand All @@ -313,18 +314,20 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
// Result file sink is not the top sink
if (params.__isset.destinations && params.destinations.size() > 0) {
_sink.reset(new ResultFileSinkOperatorX(
next_operator_id(), row_desc, thrift_sink.result_file_sink, params.destinations,
send_query_statistics_with_every_batch, output_exprs, desc_tbl));
next_sink_operator_id(), row_desc, thrift_sink.result_file_sink,
params.destinations, send_query_statistics_with_every_batch, output_exprs,
desc_tbl));
} else {
_sink.reset(new ResultFileSinkOperatorX(next_operator_id(), row_desc, output_exprs));
_sink.reset(
new ResultFileSinkOperatorX(next_sink_operator_id(), row_desc, output_exprs));
}
break;
}
case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
// TODO: figure out good buffer size based on size of output row
auto sink_id = next_operator_id();
auto sink_id = next_sink_operator_id();
auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
// one sink has multiple sources.
std::vector<int> sources;
Expand Down Expand Up @@ -359,7 +362,7 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData

DataSinkOperatorXPtr sink_op;
sink_op.reset(new ExchangeSinkOperatorX(
state, *_row_desc, next_operator_id(),
state, *_row_desc, next_sink_operator_id(),
thrift_sink.multi_cast_stream_sink.sinks[i],
thrift_sink.multi_cast_stream_sink.destinations[i], false));

Expand Down Expand Up @@ -421,7 +424,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
_runtime_states[i]->set_desc_tbl(_desc_tbl);
_runtime_states[i]->set_per_fragment_instance_idx(local_params.sender_id);
_runtime_states[i]->set_num_per_fragment_instances(request.num_senders);
_runtime_states[i]->resize_op_id_to_local_state(max_operator_id());
_runtime_states[i]->resize_op_id_to_local_state(max_operator_id(), max_sink_operator_id());
_runtime_states[i]->set_load_stream_per_node(request.load_stream_per_node);
_runtime_states[i]->set_total_load_streams(request.total_load_streams);
_runtime_states[i]->set_num_local_sink(request.num_local_sink);
Expand Down Expand Up @@ -675,8 +678,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
cur_pipe = add_pipeline();
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorXPtr sink;
sink.reset(
new DistinctStreamingAggSinkOperatorX(pool, next_operator_id(), tnode, descs));
sink.reset(new DistinctStreamingAggSinkOperatorX(pool, next_sink_operator_id(), tnode,
descs));
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
Expand All @@ -692,7 +695,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
cur_pipe = add_pipeline();
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorXPtr sink;
sink.reset(new StreamingAggSinkOperatorX(pool, next_operator_id(), tnode, descs));
sink.reset(new StreamingAggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs));
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
Expand All @@ -708,7 +711,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
_dag[downstream_pipeline_id].push_back(cur_pipe->id());

DataSinkOperatorXPtr sink;
sink.reset(new AggSinkOperatorX<>(pool, next_operator_id(), tnode, descs));
sink.reset(new AggSinkOperatorX<>(pool, next_sink_operator_id(), tnode, descs));
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
Expand All @@ -733,7 +736,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
_dag[downstream_pipeline_id].push_back(build_side_pipe->id());

DataSinkOperatorXPtr sink;
sink.reset(new HashJoinBuildSinkOperatorX(pool, next_operator_id(), tnode, descs));
sink.reset(new HashJoinBuildSinkOperatorX(pool, next_sink_operator_id(), tnode, descs));
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get()));
Expand All @@ -753,7 +756,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
_dag[downstream_pipeline_id].push_back(build_side_pipe->id());

DataSinkOperatorXPtr sink;
sink.reset(new NestedLoopJoinBuildSinkOperatorX(pool, next_operator_id(), tnode, descs));
sink.reset(
new NestedLoopJoinBuildSinkOperatorX(pool, next_sink_operator_id(), tnode, descs));
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get()));
Expand All @@ -774,7 +778,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
PipelinePtr build_side_pipe = add_pipeline();
_dag[downstream_pipeline_id].push_back(build_side_pipe->id());
DataSinkOperatorXPtr sink;
sink.reset(new UnionSinkOperatorX(i, next_operator_id(), pool, tnode, descs));
sink.reset(new UnionSinkOperatorX(i, next_sink_operator_id(), pool, tnode, descs));
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, _runtime_state.get()));
Expand All @@ -795,7 +799,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
_dag[downstream_pipeline_id].push_back(cur_pipe->id());

DataSinkOperatorXPtr sink;
sink.reset(new SortSinkOperatorX(pool, next_operator_id(), tnode, descs));
sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs));
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
Expand All @@ -813,7 +817,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
_dag[downstream_pipeline_id].push_back(cur_pipe->id());

DataSinkOperatorXPtr sink;
sink.reset(new PartitionSortSinkOperatorX(pool, next_operator_id(), tnode, descs));
sink.reset(new PartitionSortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs));
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
Expand All @@ -831,7 +835,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
_dag[downstream_pipeline_id].push_back(cur_pipe->id());

DataSinkOperatorXPtr sink;
sink.reset(new AnalyticSinkOperatorX(pool, next_operator_id(), tnode, descs));
sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs));
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
Expand Down Expand Up @@ -912,11 +916,11 @@ Status PipelineXFragmentContext::_build_operators_for_set_operation_node(

DataSinkOperatorXPtr sink;
if (child_id == 0) {
sink.reset(new SetSinkOperatorX<is_intersect>(child_id, next_operator_id(), pool, tnode,
descs));
sink.reset(new SetSinkOperatorX<is_intersect>(child_id, next_sink_operator_id(), pool,
tnode, descs));
} else {
sink.reset(new SetProbeSinkOperatorX<is_intersect>(child_id, next_operator_id(), pool,
tnode, descs));
sink.reset(new SetProbeSinkOperatorX<is_intersect>(child_id, next_sink_operator_id(),
pool, tnode, descs));
}
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(probe_side_pipe->set_sink(sink));
Expand Down
12 changes: 7 additions & 5 deletions be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class PipelineXFragmentContext : public PipelineFragmentContext {

RuntimeState* get_runtime_state(UniqueId fragment_instance_id) override {
std::lock_guard<std::mutex> l(_state_map_lock);
if (_instance_id_to_runtime_state.count(fragment_instance_id) > 0) {
if (_instance_id_to_runtime_state.contains(fragment_instance_id)) {
return _instance_id_to_runtime_state[fragment_instance_id];
} else {
return _runtime_state.get();
Expand All @@ -115,6 +115,10 @@ class PipelineXFragmentContext : public PipelineFragmentContext {

[[nodiscard]] int max_operator_id() const { return _operator_id; }

[[nodiscard]] int next_sink_operator_id() { return _sink_operator_id++; }

[[nodiscard]] int max_sink_operator_id() const { return _sink_operator_id; }

std::string debug_string() override;

private:
Expand Down Expand Up @@ -203,11 +207,9 @@ class PipelineXFragmentContext : public PipelineFragmentContext {

std::map<UniqueId, RuntimeState*> _instance_id_to_runtime_state;
std::mutex _state_map_lock;
// We can guarantee that a plan node ID can correspond to an operator ID,
// but some operators do not have a corresponding plan node ID.
// We set these IDs as negative numbers, which are not visible to the user.
int _operator_id = 0;

int _operator_id = 0;
int _sink_operator_id = 0;
std::map<PipelineId, std::shared_ptr<LocalExchangeSharedState>> _op_id_to_le_state;
};

Expand Down
20 changes: 8 additions & 12 deletions be/src/pipeline/pipeline_x/pipeline_x_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,15 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams
std::vector<TScanRangeParams> no_scan_ranges;
auto scan_ranges = find_with_default(local_params.per_node_scan_ranges,
_operators.front()->node_id(), no_scan_ranges);

auto* parent_profile = _parent_profile;
for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
auto& deps = get_upstream_dependency(_operators[op_idx]->operator_id());
LocalStateInfo info {
op_idx == _operators.size() - 1
? _parent_profile
: state->get_local_state(_operators[op_idx + 1]->operator_id())->profile(),
scan_ranges,
deps,
_local_exchange_state,
_task_idx,
_source_dependency[_operators[op_idx]->operator_id()]};
RETURN_IF_ERROR(_operators[op_idx]->setup_local_state(state, info));
auto& op = _operators[op_idx];
auto& deps = get_upstream_dependency(op->operator_id());
LocalStateInfo info {parent_profile, scan_ranges,
deps, _local_exchange_state,
_task_idx, _source_dependency[op->operator_id()]};
RETURN_IF_ERROR(op->setup_local_state(state, info));
parent_profile = state->get_local_state(op->operator_id())->profile();
}

_block = doris::vectorized::Block::create_unique();
Expand Down
11 changes: 8 additions & 3 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <fmt/format.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>

#include <string>

Expand Down Expand Up @@ -424,13 +425,15 @@ int64_t RuntimeState::get_load_mem_limit() {
}
}

void RuntimeState::resize_op_id_to_local_state(int size) {
_op_id_to_local_state.resize(size);
_op_id_to_sink_local_state.resize(size);
void RuntimeState::resize_op_id_to_local_state(int operator_size, int sink_size) {
_op_id_to_local_state.resize(operator_size);
_op_id_to_sink_local_state.resize(sink_size);
}

void RuntimeState::emplace_local_state(
int id, std::unique_ptr<doris::pipeline::PipelineXLocalStateBase> state) {
DCHECK(id < _op_id_to_local_state.size());
DCHECK(!_op_id_to_local_state[id]);
_op_id_to_local_state[id] = std::move(state);
}

Expand All @@ -451,6 +454,8 @@ Result<RuntimeState::LocalState*> RuntimeState::get_local_state_result(int id) {

void RuntimeState::emplace_sink_local_state(
int id, std::unique_ptr<doris::pipeline::PipelineXSinkLocalStateBase> state) {
DCHECK(id < _op_id_to_sink_local_state.size());
DCHECK(!_op_id_to_sink_local_state[id]);
_op_id_to_sink_local_state[id] = std::move(state);
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ class RuntimeState {

Result<SinkLocalState*> get_sink_local_state_result(int id);

void resize_op_id_to_local_state(int size);
void resize_op_id_to_local_state(int operator_size, int sink_size);

private:
Status create_error_log_file();
Expand Down
6 changes: 0 additions & 6 deletions be/src/vec/sink/writer/async_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,5 @@ std::unique_ptr<Block> AsyncResultWriter::_get_free_block(doris::vectorized::Blo
return b;
}

pipeline::Dependency* AsyncResultWriter::write_blocked_by(pipeline::PipelineXTask* task) {
std::lock_guard l(_m);
DCHECK(_dependency != nullptr);
return _dependency->is_blocked_by(task);
}

} // namespace vectorized
} // namespace doris
2 changes: 0 additions & 2 deletions be/src/vec/sink/writer/async_result_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ class AsyncResultWriter : public ResultWriter {
return _data_queue_is_available() || _is_finished();
}

pipeline::Dependency* write_blocked_by(pipeline::PipelineXTask* task);

[[nodiscard]] bool is_pending_finish() const { return !_writer_thread_closed; }

void process_block(RuntimeState* state, RuntimeProfile* profile);
Expand Down

0 comments on commit 3838b6f

Please sign in to comment.