Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pipeline] Fix PipScannerContext::can_finish return wrong status #15259

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@ namespace doris::pipeline {
OPERATOR_CODE_GENERATOR(ScanOperator, SourceOperator)

bool ScanOperator::can_read() {
if (_node->_eos || _node->_scanner_ctx->done() || _node->_scanner_ctx->can_finish()) {
if (_node->_eos || _node->_scanner_ctx->done() || _node->_scanner_ctx->no_schedule()) {
// _eos: need eos
// _scanner_ctx->done(): need finish
// _scanner_ctx->can_finish(): should be scheduled
// _scanner_ctx->no_schedule(): should schedule _scanner_ctx
return true;
} else {
return !_node->_scanner_ctx->empty_in_queue(); // there are some blocks to process
}
}

bool ScanOperator::is_pending_finish() const {
return _node->_scanner_ctx && !_node->_scanner_ctx->can_finish();
return _node->_scanner_ctx && !_node->_scanner_ctx->no_schedule();
}

bool ScanOperator::runtime_filters_are_ready_or_timeout() {
Expand Down
44 changes: 36 additions & 8 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,17 @@ void PipelineTask::_init_profile() {
auto* task_profile = new RuntimeProfile(ss.str());
_parent_profile->add_child(task_profile, true, nullptr);
_task_profile.reset(task_profile);
_sink_timer = ADD_TIMER(_task_profile, "SinkTime");
_get_block_timer = ADD_TIMER(_task_profile, "GetBlockTime");
_task_cpu_timer = ADD_TIMER(_task_profile, "TaskCpuTime");

static const char* exec_time = "ExecuteTime";
_exec_timer = ADD_TIMER(_task_profile, exec_time);
_prepare_timer = ADD_CHILD_TIMER(_task_profile, "PrepareTime", exec_time);
_open_timer = ADD_CHILD_TIMER(_task_profile, "OpenTime", exec_time);
_get_block_timer = ADD_CHILD_TIMER(_task_profile, "GetBlockTime", exec_time);
_sink_timer = ADD_CHILD_TIMER(_task_profile, "SinkTime", exec_time);
_finalize_timer = ADD_CHILD_TIMER(_task_profile, "FinalizeTime", exec_time);
_close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time);

_wait_source_timer = ADD_TIMER(_task_profile, "WaitSourceTime");
_wait_sink_timer = ADD_TIMER(_task_profile, "WaitSinkTime");
_wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime");
Expand All @@ -39,12 +48,16 @@ void PipelineTask::_init_profile() {
_block_by_sink_counts = ADD_COUNTER(_task_profile, "NumBlockedBySinkTimes", TUnit::UNIT);
_schedule_counts = ADD_COUNTER(_task_profile, "NumScheduleTimes", TUnit::UNIT);
_yield_counts = ADD_COUNTER(_task_profile, "NumYieldTimes", TUnit::UNIT);
_core_change_times = ADD_COUNTER(_task_profile, "CoreChangeTimes", TUnit::UNIT);
}

Status PipelineTask::prepare(RuntimeState* state) {
DCHECK(_sink);
DCHECK(_cur_state == NOT_READY);
_init_profile();
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_CPU_TIMER(_task_cpu_timer);
SCOPED_TIMER(_prepare_timer);
RETURN_IF_ERROR(_sink->prepare(state));
for (auto& o : _operators) {
RETURN_IF_ERROR(o->prepare(state));
Expand Down Expand Up @@ -94,6 +107,9 @@ bool PipelineTask::has_dependency() {
}

Status PipelineTask::open() {
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_CPU_TIMER(_task_cpu_timer);
SCOPED_TIMER(_open_timer);
for (auto& o : _operators) {
RETURN_IF_ERROR(o->open(_state));
}
Expand All @@ -105,8 +121,10 @@ Status PipelineTask::open() {
}

Status PipelineTask::execute(bool* eos) {
SCOPED_ATTACH_TASK(runtime_state());
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_CPU_TIMER(_task_cpu_timer);
SCOPED_TIMER(_exec_timer);
SCOPED_ATTACH_TASK(runtime_state());
int64_t time_spent = 0;
// The status must be runnable
*eos = false;
Expand Down Expand Up @@ -170,15 +188,23 @@ Status PipelineTask::execute(bool* eos) {
}

Status PipelineTask::finalize() {
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_CPU_TIMER(_task_cpu_timer);
SCOPED_TIMER(_finalize_timer);
return _sink->finalize(_state);
}

Status PipelineTask::close() {
auto s = _sink->close(_state);
for (auto& op : _operators) {
auto tem = op->close(_state);
if (!tem.ok() && s.ok()) {
s = tem;
int64_t close_ns = 0;
Status s;
{
SCOPED_RAW_TIMER(&close_ns);
s = _sink->close(_state);
for (auto& op : _operators) {
auto tem = op->close(_state);
if (!tem.ok() && s.ok()) {
s = tem;
}
}
}
if (_opened) {
Expand All @@ -187,6 +213,8 @@ Status PipelineTask::close() {
COUNTER_UPDATE(_wait_sink_timer, _wait_sink_watcher.elapsed_time());
COUNTER_UPDATE(_wait_worker_timer, _wait_worker_watcher.elapsed_time());
COUNTER_UPDATE(_wait_schedule_timer, _wait_schedule_watcher.elapsed_time());
COUNTER_UPDATE(_close_timer, close_ns);
COUNTER_UPDATE(_task_profile->total_time_counter(), close_ns);
}
return s;
}
Expand Down
19 changes: 17 additions & 2 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,15 @@ class PipelineTask {

int get_previous_core_id() const { return _previous_schedule_id; }

void set_previous_core_id(int id) { _previous_schedule_id = id; }
void set_previous_core_id(int id) {
if (id == _previous_schedule_id) {
return;
}
if (_previous_schedule_id != -1) {
COUNTER_UPDATE(_core_change_times, 1);
}
_previous_schedule_id = id;
}

bool has_dependency();

Expand Down Expand Up @@ -190,8 +198,14 @@ class PipelineTask {

RuntimeProfile* _parent_profile;
std::unique_ptr<RuntimeProfile> _task_profile;
RuntimeProfile::Counter* _sink_timer;
RuntimeProfile::Counter* _task_cpu_timer;
RuntimeProfile::Counter* _prepare_timer;
RuntimeProfile::Counter* _open_timer;
RuntimeProfile::Counter* _exec_timer;
RuntimeProfile::Counter* _get_block_timer;
RuntimeProfile::Counter* _sink_timer;
RuntimeProfile::Counter* _finalize_timer;
RuntimeProfile::Counter* _close_timer;
RuntimeProfile::Counter* _block_counts;
RuntimeProfile::Counter* _block_by_source_counts;
RuntimeProfile::Counter* _block_by_sink_counts;
Expand All @@ -206,5 +220,6 @@ class PipelineTask {
MonotonicStopWatch _wait_schedule_watcher;
RuntimeProfile::Counter* _wait_schedule_timer;
RuntimeProfile::Counter* _yield_counts;
RuntimeProfile::Counter* _core_change_times;
};
} // namespace doris::pipeline
6 changes: 5 additions & 1 deletion be/src/vec/exec/scan/pip_scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ class PipScannerContext : public vectorized::ScannerContext {

void _update_block_queue_empty() override { _blocks_queue_empty = _blocks_queue.empty(); }

Status get_block_from_queue(vectorized::Block** block, bool* eos, bool wait = false) override {
return vectorized::ScannerContext::get_block_from_queue(block, eos, false);
}

// We should make those method lock free.
bool done() override { return _is_finished || _should_stop || _status_error; }
bool can_finish() override { return _num_running_scanners == 0 && _num_scheduling_ctx == 0; }
bool no_schedule() override { return _num_running_scanners == 0 && _num_scheduling_ctx == 0; }
bool empty_in_queue() override { return _blocks_queue_empty; }

private:
Expand Down
14 changes: 9 additions & 5 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ Status ScannerContext::get_block_from_queue(vectorized::Block** block, bool* eos
_state->exec_env()->scanner_scheduler()->submit(this);
}
// Wait for block from queue
{
if (wait) {
SCOPED_TIMER(_parent->_scanner_wait_batch_timer);
_blocks_queue_added_cv.wait(l, [this]() {
return !_blocks_queue.empty() || _is_finished || !_process_status.ok() ||
Expand Down Expand Up @@ -240,7 +240,7 @@ void ScannerContext::clear_and_join() {
return;
}

bool ScannerContext::can_finish() {
bool ScannerContext::no_schedule() {
std::unique_lock<std::mutex> l(_transfer_lock);
return _num_running_scanners == 0 && _num_scheduling_ctx == 0;
}
Expand All @@ -263,9 +263,11 @@ void ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) {
}

std::lock_guard<std::mutex> l(_transfer_lock);
_num_running_scanners--;
_num_scheduling_ctx++;
_state->exec_env()->scanner_scheduler()->submit(this);
auto submit_st = _state->exec_env()->scanner_scheduler()->submit(this);
if (!submit_st.ok()) {
_num_scheduling_ctx--;
}

// Notice that after calling "_scanners.push_front(scanner)", there may be other ctx in scheduler
// to schedule that scanner right away, and in that schedule run, the scanner may be marked as closed
Expand All @@ -274,14 +276,16 @@ void ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) {
// same scanner.
if (scanner->need_to_close() && scanner->set_counted_down() &&
(--_num_unfinished_scanners) == 0) {
_is_finished = true;
// ATTN: this 2 counters will be set at close() again, which is the final values.
// But we set them here because the counter set at close() can not send to FE's profile.
// So we set them here, and the counter value may be little less than final values.
COUNTER_SET(_parent->_scanner_sched_counter, _num_scanner_scheduling);
COUNTER_SET(_parent->_scanner_ctx_sched_counter, _num_ctx_scheduling);
_is_finished = true;
_blocks_queue_added_cv.notify_one();
}
// In pipeline engine, doris will close scanners when `no_schedule`.
_num_running_scanners--;
_ctx_finish_cv.notify_one();
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class ScannerContext {
// Get next block from blocks queue. Called by ScanNode
// Set eos to true if there is no more data to read.
// And if eos is true, the block returned must be nullptr.
Status get_block_from_queue(vectorized::Block** block, bool* eos, bool wait = true);
virtual Status get_block_from_queue(vectorized::Block** block, bool* eos, bool wait = true);

// When a scanner complete a scan, this method will be called
// to return the scanner to the list for next scheduling.
Expand Down Expand Up @@ -118,7 +118,7 @@ class ScannerContext {

void clear_and_join();

virtual bool can_finish();
virtual bool no_schedule();

std::string debug_string();

Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/vbroker_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class VBrokerScanNode final : public ScanNode {
std::condition_variable _queue_reader_cond;
std::condition_variable _queue_writer_cond;

int _num_running_scanners;
std::atomic<int> _num_running_scanners;

std::atomic<bool> _scan_finished;

Expand Down