Skip to content
Closed

test #57313

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 1 addition & 14 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -514,20 +514,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
for (auto& channel : local_state.channels) {
COUNTER_UPDATE(local_state.memory_used_counter(), -channel->mem_usage());
Status st = channel->close(state);
/**
* Consider this case below:
*
* +--- Channel0 (Running)
* |
* ExchangeSink ---+--- Channel1 (EOF)
* |
* +--- Channel2 (Running)
*
* Channel1 is EOF now and return `END_OF_FILE` here. However, Channel0 and Channel2
* still need new data. If ExchangeSink returns EOF, downstream tasks will no longer receive
* blocks including EOS signal. So we must ensure to return EOF iff all channels are EOF.
*/
if (!st.ok() && !st.is<ErrorCode::END_OF_FILE>() && final_st.ok()) {
if (!st.ok() && final_st.ok()) {
final_st = st;
}
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ Status Pipeline::set_sink(DataSinkOperatorPtr& sink) {
return Status::OK();
}

void Pipeline::make_all_runnable(PipelineId wake_by) {
void Pipeline::make_all_runnable() {
DBUG_EXECUTE_IF("Pipeline::make_all_runnable.sleep", {
auto pipeline_id = DebugPoints::instance()->get_debug_param_or_default<int32_t>(
"Pipeline::make_all_runnable.sleep", "pipeline_id", -1);
Expand All @@ -124,7 +124,7 @@ void Pipeline::make_all_runnable(PipelineId wake_by) {
if (_sink->count_down_destination()) {
for (auto* task : _tasks) {
if (task) {
task->set_wake_up_early(wake_by);
task->set_wake_up_early();
}
}
for (auto* task : _tasks) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
_tasks[i] = task;
}

void make_all_runnable(PipelineId wake_by);
void make_all_runnable();

void set_num_tasks(int num_tasks) {
_num_tasks = num_tasks;
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1795,7 +1795,7 @@ void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
if (_dag.contains(pipeline_id)) {
for (auto dep : _dag[pipeline_id]) {
_pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id);
_pip_id_to_pipeline[dep]->make_all_runnable();
}
}
}
Expand Down
7 changes: 3 additions & 4 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -756,11 +756,10 @@ std::string PipelineTask::debug_string() {

fmt::format_to(debug_string_buffer,
"PipelineTask[id = {}, open = {}, eos = {}, state = {}, dry run = "
"{}, _wake_up_early = {}, _wake_up_by = {}, time elapsed since last state "
"changing = {}s, spilling = {}, is running = {}]",
"{}, _wake_up_early = {}, time elapsed since last state changing = {}s, spilling"
" = {}, is running = {}]",
_index, _opened, _eos, _to_string(_exec_state), _dry_run, _wake_up_early.load(),
_wake_by, _state_change_watcher.elapsed_time() / NANOS_PER_SEC, _spilling,
is_running());
_state_change_watcher.elapsed_time() / NANOS_PER_SEC, _spilling, is_running());
std::unique_lock<std::mutex> lc(_dependency_lock);
auto* cur_blocked_dep = _blocked_dep;
auto fragment = _fragment_context.lock();
Expand Down
6 changes: 1 addition & 5 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,7 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
int task_id() const { return _index; };
virtual bool is_finalized() const { return _exec_state == State::FINALIZED; }

void set_wake_up_early(PipelineId wake_by = -1) {
_wake_up_early = true;
_wake_by = wake_by;
}
void set_wake_up_early() { _wake_up_early = true; }

// Execution phase should be terminated. This is called if this task is canceled or waken up early.
void terminate();
Expand Down Expand Up @@ -318,7 +315,6 @@ class PipelineTask : public std::enable_shared_from_this<PipelineTask> {
MonotonicStopWatch _state_change_watcher;
std::atomic<bool> _spilling = false;
const std::string _pipeline_name;
int _wake_by = -1;
};

using PipelineTaskSPtr = std::shared_ptr<PipelineTask>;
Expand Down
Loading