Skip to content

Commit

Permalink
[refine](pipelinex) get sink local state does not require an id. apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange authored Feb 22, 2024
1 parent 83ad7a6 commit 87ea524
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 17 deletions.
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class HashJoinBuildSinkOperatorX final
SourceState source_state) override;

bool should_dry_run(RuntimeState* state) override {
return _is_broadcast_join && !state->get_sink_local_state(operator_id())
return _is_broadcast_join && !state->get_sink_local_state()
->cast<HashJoinBuildSinkLocalState>()
._should_build_hash_table;
}
Expand Down
5 changes: 2 additions & 3 deletions be/src/pipeline/pipeline_x/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ std::string DataSinkOperatorXBase::debug_string(int indentation_level) const {
}

std::string DataSinkOperatorXBase::debug_string(RuntimeState* state, int indentation_level) const {
return state->get_sink_local_state(operator_id())->debug_string(indentation_level);
return state->get_sink_local_state()->debug_string(indentation_level);
}

Status DataSinkOperatorXBase::init(const TDataSink& tsink) {
Expand Down Expand Up @@ -498,8 +498,7 @@ Status StreamingOperatorX<LocalStateType>::get_block(RuntimeState* state, vector
template <typename LocalStateType>
Status StatefulOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
auto& local_state = state->get_local_state(OperatorX<LocalStateType>::operator_id())
->template cast<LocalStateType>();
auto& local_state = get_local_state(state);
if (need_more_input_data(state)) {
local_state._child_block->clear_column_data();
RETURN_IF_ERROR(OperatorX<LocalStateType>::_child_x->get_block_after_projects(
Expand Down
10 changes: 6 additions & 4 deletions be/src/pipeline/pipeline_x/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -536,8 +536,8 @@ class DataSinkOperatorXBase : public OperatorBase {

[[nodiscard]] bool is_source() const override { return false; }

Status close(RuntimeState* state, Status exec_status) {
auto result = state->get_sink_local_state_result(operator_id());
static Status close(RuntimeState* state, Status exec_status) {
auto result = state->get_sink_local_state_result();
if (!result) {
return result.error();
}
Expand Down Expand Up @@ -600,7 +600,7 @@ class DataSinkOperatorX : public DataSinkOperatorXBase {

using LocalState = LocalStateType;
[[nodiscard]] LocalState& get_local_state(RuntimeState* state) const {
return state->get_sink_local_state(operator_id())->template cast<LocalState>();
return state->get_sink_local_state()->template cast<LocalState>();
}
};

Expand Down Expand Up @@ -663,8 +663,10 @@ class StatefulOperatorX : public OperatorX<LocalStateType> {
: OperatorX<LocalStateType>(pool, tnode, operator_id, descs) {}
virtual ~StatefulOperatorX() = default;

using OperatorX<LocalStateType>::get_local_state;

[[nodiscard]] Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;
SourceState& source_state) final;

[[nodiscard]] virtual Status pull(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) const = 0;
Expand Down
8 changes: 4 additions & 4 deletions be/src/pipeline/pipeline_x/pipeline_x_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, const
std::vector<TScanRangeParams> no_scan_ranges;
auto scan_ranges = find_with_default(local_params.per_node_scan_ranges,
_operators.front()->node_id(), no_scan_ranges);
auto* parent_profile = _state->get_sink_local_state(_sink->operator_id())->profile();
auto* parent_profile = _state->get_sink_local_state()->profile();
query_ctx->register_query_statistics(
_state->get_sink_local_state(_sink->operator_id())->get_query_statistics_ptr());
_state->get_sink_local_state()->get_query_statistics_ptr());

for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
auto& op = _operators[op_idx];
Expand Down Expand Up @@ -135,7 +135,7 @@ Status PipelineXTask::_extract_dependencies() {
}
}
{
auto* local_state = _state->get_sink_local_state(_sink->operator_id());
auto* local_state = _state->get_sink_local_state();
auto* dep = local_state->dependency();
DCHECK(dep != nullptr);
_write_dependencies = dep;
Expand Down Expand Up @@ -206,7 +206,7 @@ Status PipelineXTask::_open() {
RETURN_IF_ERROR(st);
}
}
RETURN_IF_ERROR(_state->get_sink_local_state(_sink->operator_id())->open(_state));
RETURN_IF_ERROR(_state->get_sink_local_state()->open(_state));
_opened = true;
return Status::OK();
}
Expand Down
6 changes: 3 additions & 3 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -486,13 +486,13 @@ void RuntimeState::emplace_sink_local_state(
_sink_local_state = std::move(state);
}

doris::pipeline::PipelineXSinkLocalStateBase* RuntimeState::get_sink_local_state(int) {
doris::pipeline::PipelineXSinkLocalStateBase* RuntimeState::get_sink_local_state() {
return _sink_local_state.get();
}

Result<RuntimeState::SinkLocalState*> RuntimeState::get_sink_local_state_result(int id) {
Result<RuntimeState::SinkLocalState*> RuntimeState::get_sink_local_state_result() {
if (!_sink_local_state) {
return ResultError(Status::InternalError("_op_id_to_sink_local_state id:{} is null", id));
return ResultError(Status::InternalError("_op_id_to_sink_local_state not exist"));
}
return _sink_local_state.get();
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -552,9 +552,9 @@ class RuntimeState {

void emplace_sink_local_state(int id, std::unique_ptr<SinkLocalState> state);

SinkLocalState* get_sink_local_state(int id);
SinkLocalState* get_sink_local_state();

Result<SinkLocalState*> get_sink_local_state_result(int id);
Result<SinkLocalState*> get_sink_local_state_result();

void resize_op_id_to_local_state(int operator_size);

Expand Down

0 comments on commit 87ea524

Please sign in to comment.