From 4066de375efe6ff8e156a61df4f9316b3d9eaa4e Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Sat, 23 Dec 2023 11:09:46 +0800 Subject: [PATCH] [bugfix](scannercore) scanner will core in deconstructor during collect profile (#28727) --- be/src/exec/exec_node.h | 4 + be/src/pipeline/exec/scan_operator.cpp | 64 ++--- be/src/pipeline/exec/scan_operator.h | 16 +- be/src/vec/exec/scan/pip_scanner_context.h | 20 +- be/src/vec/exec/scan/scanner_context.cpp | 272 ++++++++------------- be/src/vec/exec/scan/scanner_context.h | 47 ++-- be/src/vec/exec/scan/scanner_scheduler.cpp | 54 ++-- be/src/vec/exec/scan/scanner_scheduler.h | 4 +- be/src/vec/exec/scan/vscan_node.cpp | 32 +-- be/src/vec/exec/scan/vscan_node.h | 23 +- be/src/vec/exec/scan/vscanner.h | 13 - 11 files changed, 213 insertions(+), 336 deletions(-) diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index eeed37907f9ebe..123097cfd53d16 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -239,6 +239,10 @@ class ExecNode { size_t children_count() const { return _children.size(); } + // when the fragment is normal finished, call this method to do some finish work + // such as send the last buffer to remote. + virtual Status try_close(RuntimeState* state) { return Status::OK(); } + protected: friend class DataSink; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 05d9c7292f758c..1e0f68131e87bb 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -64,14 +64,6 @@ bool ScanOperator::can_read() { } } -bool ScanOperator::is_pending_finish() const { - return _node->_scanner_ctx && !_node->_scanner_ctx->no_schedule(); -} - -Status ScanOperator::try_close(RuntimeState* state) { - return _node->try_close(state); -} - bool ScanOperator::runtime_filters_are_ready_or_timeout() { return _node->runtime_filters_are_ready_or_timeout(); } @@ -81,9 +73,8 @@ std::string ScanOperator::debug_string() const { fmt::format_to(debug_string_buffer, "{}, scanner_ctx is null: {} ", SourceOperator::debug_string(), _node->_scanner_ctx == nullptr); if (_node->_scanner_ctx) { - fmt::format_to(debug_string_buffer, ", num_running_scanners = {}, num_scheduling_ctx = {} ", - _node->_scanner_ctx->get_num_running_scanners(), - _node->_scanner_ctx->get_num_scheduling_ctx()); + fmt::format_to(debug_string_buffer, ", num_running_scanners = {}", + _node->_scanner_ctx->get_num_running_scanners()); } return fmt::to_string(debug_string_buffer); } @@ -101,9 +92,6 @@ std::string ScanOperator::debug_string() const { template ScanLocalState::ScanLocalState(RuntimeState* state, OperatorXBase* parent) : ScanLocalStateBase(state, parent) { - _finish_dependency = std::make_shared( - parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY", - state->get_query_ctx()); _filter_dependency = std::make_shared( parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY", state->get_query_ctx()); @@ -177,7 +165,6 @@ Status ScanLocalState::open(RuntimeState* state) { auto status = _eos ? Status::OK() : _prepare_scanners(); if (_scanner_ctx) { - _finish_dependency->block(); DCHECK(!_eos && _num_scanners->value() > 0); RETURN_IF_ERROR(_scanner_ctx->init()); RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx)); @@ -570,15 +557,14 @@ std::string ScanLocalState::debug_string(int indentation_level) const { PipelineXLocalState<>::debug_string(indentation_level), _eos.load()); if (_scanner_ctx) { fmt::format_to(debug_string_buffer, ""); - fmt::format_to( - debug_string_buffer, - ", Scanner Context: (_is_finished = {}, _should_stop = {}, " - "_num_running_scanners={}, " - "_num_scheduling_ctx = {}, _num_unfinished_scanners = {}, status = {}, error = {})", - _scanner_ctx->is_finished(), _scanner_ctx->should_stop(), - _scanner_ctx->get_num_running_scanners(), _scanner_ctx->get_num_scheduling_ctx(), - _scanner_ctx->get_num_unfinished_scanners(), _scanner_ctx->status().to_string(), - _scanner_ctx->status_error()); + fmt::format_to(debug_string_buffer, + ", Scanner Context: (_is_finished = {}, _should_stop = {}, " + "_num_running_scanners={}, " + " _num_unfinished_scanners = {}, status = {}, error = {})", + _scanner_ctx->is_finished(), _scanner_ctx->should_stop(), + _scanner_ctx->get_num_running_scanners(), + _scanner_ctx->get_num_unfinished_scanners(), + _scanner_ctx->status().to_string(), _scanner_ctx->status_error()); } return fmt::to_string(debug_string_buffer); @@ -1226,24 +1212,27 @@ template Status ScanLocalState::_prepare_scanners() { std::list scanners; RETURN_IF_ERROR(_init_scanners(&scanners)); + // Init scanner wrapper + for (auto it = scanners.begin(); it != scanners.end(); ++it) { + _scanners.emplace_back(std::make_shared(*it)); + } if (scanners.empty()) { _eos = true; _scan_dependency->set_ready(); } else { COUNTER_SET(_num_scanners, static_cast(scanners.size())); - RETURN_IF_ERROR(_start_scanners(scanners)); + RETURN_IF_ERROR(_start_scanners(_scanners)); } return Status::OK(); } template Status ScanLocalState::_start_scanners( - const std::list& scanners) { + const std::list>& scanners) { auto& p = _parent->cast(); _scanner_ctx = PipScannerContext::create_shared(state(), this, p._output_tuple_desc, scanners, p.limit(), state()->scan_queue_mem_limit(), - p._col_distribute_ids, 1, _scan_dependency, - _finish_dependency); + p._col_distribute_ids, 1, _scan_dependency); return Status::OK(); } @@ -1319,9 +1308,6 @@ Status ScanLocalState::_init_profile() { _max_scanner_thread_num = ADD_COUNTER(_runtime_profile, "MaxScannerThreadNum", TUnit::UNIT); - _wait_for_finish_dependency_timer = - ADD_TIMER(_runtime_profile, "WaitForPendingFinishDependency"); - return Status::OK(); } @@ -1429,17 +1415,6 @@ Status ScanOperatorX::open(RuntimeState* state) { return Status::OK(); } -template -Status ScanOperatorX::try_close(RuntimeState* state) { - auto& local_state = get_local_state(state); - if (local_state._scanner_ctx) { - // mark this scanner ctx as should_stop to make sure scanners will not be scheduled anymore - // TODO: there is a lock in `set_should_stop` may cause some slight impact - local_state._scanner_ctx->set_should_stop(); - } - return Status::OK(); -} - template Status ScanLocalState::close(RuntimeState* state) { if (_closed) { @@ -1451,10 +1426,9 @@ Status ScanLocalState::close(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); if (_scanner_ctx) { - _scanner_ctx->clear_and_join(reinterpret_cast(this), state); + _scanner_ctx->stop_scanners(state); } COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time()); - COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time()); COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time()); return PipelineXLocalState<>::close(state); @@ -1511,7 +1485,7 @@ Status ScanOperatorX::get_block(RuntimeState* state, vectorized: if (eos) { source_state = SourceState::FINISHED; // reach limit, stop the scanners. - local_state._scanner_ctx->set_should_stop(); + local_state._scanner_ctx->stop_scanners(state); } return Status::OK(); diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 3690e9eb39ca8e..bf083d82d5d89c 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -31,6 +31,9 @@ namespace doris { class ExecNode; } // namespace doris +namespace doris::vectorized { +class ScannerDelegate; +} namespace doris::pipeline { class PipScannerContext; @@ -48,13 +51,9 @@ class ScanOperator : public SourceOperator { bool can_read() override; // for source - bool is_pending_finish() const override; - bool runtime_filters_are_ready_or_timeout() override; std::string debug_string() const override; - - Status try_close(RuntimeState* state) override; }; class ScanDependency final : public Dependency { @@ -171,7 +170,6 @@ class ScanLocalStateBase : public PipelineXLocalState<>, public vectorized::Runt RuntimeProfile::Counter* _wait_for_scanner_done_timer = nullptr; // time of prefilter input block from scanner RuntimeProfile::Counter* _wait_for_eos_timer = nullptr; - RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr; RuntimeProfile::Counter* _wait_for_rf_timer = nullptr; }; @@ -214,7 +212,6 @@ class ScanLocalState : public ScanLocalStateBase { Dependency* dependency() override { return _scan_dependency.get(); } RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); }; - Dependency* finishdependency() override { return _finish_dependency.get(); } protected: template @@ -350,7 +347,7 @@ class ScanLocalState : public ScanLocalStateBase { Status _prepare_scanners(); // Submit the scanner to the thread pool and start execution - Status _start_scanners(const std::list& scanners); + Status _start_scanners(const std::list>& scanners); // For some conjunct there is chance to elimate cast operator // Eg. Variant's sub column could eliminate cast in storage layer if @@ -413,14 +410,13 @@ class ScanLocalState : public ScanLocalStateBase { std::shared_ptr _filter_dependency; - std::shared_ptr _finish_dependency; + // ScanLocalState owns the ownership of scanner, scanner context only has its weakptr + std::list> _scanners; }; template class ScanOperatorX : public OperatorX { public: - Status try_close(RuntimeState* state) override; - Status init(const TPlanNode& tnode, RuntimeState* state) override; Status prepare(RuntimeState* state) override { return OperatorXBase::prepare(state); } Status open(RuntimeState* state) override; diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index 309aed96a8cf7f..56ceb20bf1502e 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -31,9 +31,9 @@ class PipScannerContext : public vectorized::ScannerContext { public: PipScannerContext(RuntimeState* state, vectorized::VScanNode* parent, const TupleDescriptor* output_tuple_desc, - const std::list& scanners, int64_t limit_, - int64_t max_bytes_in_blocks_queue, const std::vector& col_distribute_ids, - const int num_parallel_instances) + const std::list>& scanners, + int64_t limit_, int64_t max_bytes_in_blocks_queue, + const std::vector& col_distribute_ids, const int num_parallel_instances) : vectorized::ScannerContext(state, parent, output_tuple_desc, scanners, limit_, max_bytes_in_blocks_queue, num_parallel_instances), _col_distribute_ids(col_distribute_ids), @@ -41,14 +41,13 @@ class PipScannerContext : public vectorized::ScannerContext { PipScannerContext(RuntimeState* state, ScanLocalStateBase* local_state, const TupleDescriptor* output_tuple_desc, - const std::list& scanners, int64_t limit_, - int64_t max_bytes_in_blocks_queue, const std::vector& col_distribute_ids, - const int num_parallel_instances, - std::shared_ptr dependency, - std::shared_ptr finish_dependency) + const std::list>& scanners, + int64_t limit_, int64_t max_bytes_in_blocks_queue, + const std::vector& col_distribute_ids, const int num_parallel_instances, + std::shared_ptr dependency) : vectorized::ScannerContext(state, output_tuple_desc, scanners, limit_, max_bytes_in_blocks_queue, num_parallel_instances, - local_state, dependency, finish_dependency), + local_state, dependency), _need_colocate_distribute(false) {} Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos, @@ -111,9 +110,6 @@ class PipScannerContext : public vectorized::ScannerContext { return Status::OK(); } - // We should make those method lock free. - bool done() override { return _is_finished || _should_stop; } - void append_blocks_to_queue(std::vector& blocks) override { const int queue_size = _blocks_queues.size(); const int block_size = blocks.size(); diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 5ad2dbec5b69fa..908b2a663b7eb0 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -46,11 +46,11 @@ namespace doris::vectorized { using namespace std::chrono_literals; ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* output_tuple_desc, - const std::list& scanners, int64_t limit_, - int64_t max_bytes_in_blocks_queue, const int num_parallel_instances, + const std::list>& scanners, + int64_t limit_, int64_t max_bytes_in_blocks_queue, + const int num_parallel_instances, pipeline::ScanLocalStateBase* local_state, - std::shared_ptr dependency, - std::shared_ptr finish_dependency) + std::shared_ptr dependency) : _state(state), _parent(nullptr), _local_state(local_state), @@ -61,11 +61,10 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, (int64_t)1024) * num_parallel_instances), _scanner_scheduler(state->exec_env()->scanner_scheduler()), - _scanners(scanners), - _scanners_ref(scanners.begin(), scanners.end()), + _scanners(scanners.begin(), scanners.end()), + _all_scanners(scanners.begin(), scanners.end()), _num_parallel_instances(num_parallel_instances), - _dependency(dependency), - _finish_dependency(finish_dependency) { + _dependency(dependency) { // Use the task exec context as a lock between scanner threads and fragment exection threads _task_exec_ctx = _state->get_task_execution_context(); _query_id = _state->get_query_ctx()->query_id(); @@ -92,8 +91,9 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu ScannerContext::ScannerContext(doris::RuntimeState* state, doris::vectorized::VScanNode* parent, const doris::TupleDescriptor* output_tuple_desc, - const std::list& scanners, int64_t limit_, - int64_t max_bytes_in_blocks_queue, const int num_parallel_instances, + const std::list>& scanners, + int64_t limit_, int64_t max_bytes_in_blocks_queue, + const int num_parallel_instances, pipeline::ScanLocalStateBase* local_state) : _state(state), _parent(parent), @@ -105,8 +105,8 @@ ScannerContext::ScannerContext(doris::RuntimeState* state, doris::vectorized::VS _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, (int64_t)1024) * num_parallel_instances), _scanner_scheduler(state->exec_env()->scanner_scheduler()), - _scanners(scanners), - _scanners_ref(scanners.begin(), scanners.end()), + _scanners(scanners.begin(), scanners.end()), + _all_scanners(scanners.begin(), scanners.end()), _num_parallel_instances(num_parallel_instances) { // Use the task exec context as a lock between scanner threads and fragment exection threads _task_exec_ctx = _state->get_task_execution_context(); @@ -182,10 +182,6 @@ Status ScannerContext::init() { } #endif - // 4. This ctx will be submitted to the scanner scheduler right after init. - // So set _num_scheduling_ctx to 1 here. - _num_scheduling_ctx = 1; - _num_unfinished_scanners = _scanners.size(); if (_parent) { @@ -275,11 +271,9 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo bool is_scheduled = false; if (!done() && to_be_schedule && _num_running_scanners == 0) { is_scheduled = true; - auto state = _scanner_scheduler->submit(shared_from_this()); - if (state.ok()) { - _num_scheduling_ctx++; - } else { - set_status_on_error(state, false); + auto submit_status = _scanner_scheduler->submit(shared_from_this()); + if (!submit_status.ok()) { + set_status_on_error(submit_status, false); } } @@ -370,41 +364,17 @@ Status ScannerContext::validate_block_schema(Block* block) { return Status::OK(); } -void ScannerContext::set_should_stop() { - std::lock_guard l(_transfer_lock); - _should_stop = true; - _set_scanner_done(); - for (const VScannerWPtr& scanner : _scanners_ref) { - if (VScannerSPtr sc = scanner.lock()) { - sc->try_stop(); - } - } - _blocks_queue_added_cv.notify_one(); - set_ready_to_finish(); -} - void ScannerContext::inc_num_running_scanners(int32_t inc) { std::lock_guard l(_transfer_lock); _num_running_scanners += inc; } -void ScannerContext::dec_num_scheduling_ctx() { +void ScannerContext::dec_num_running_scanners(int32_t scanner_dec) { std::lock_guard l(_transfer_lock); - _num_scheduling_ctx--; - set_ready_to_finish(); - if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) { - _ctx_finish_cv.notify_one(); - } -} - -void ScannerContext::set_ready_to_finish() { - // `_should_stop == true` means this task has already ended and wait for pending finish now. - if (_finish_dependency && done() && _num_running_scanners == 0 && _num_scheduling_ctx == 0) { - _finish_dependency->set_ready(); - } + _num_running_scanners -= scanner_dec; } -bool ScannerContext::set_status_on_error(const Status& status, bool need_lock) { +void ScannerContext::set_status_on_error(const Status& status, bool need_lock) { std::unique_lock l(_transfer_lock, std::defer_lock); if (need_lock) { l.lock(); @@ -415,14 +385,20 @@ bool ScannerContext::set_status_on_error(const Status& status, bool need_lock) { _blocks_queue_added_cv.notify_one(); _should_stop = true; _set_scanner_done(); - return true; } - return false; } -template -Status ScannerContext::_close_and_clear_scanners(Parent* parent, RuntimeState* state) { - std::unique_lock l(_scanners_lock); +void ScannerContext::stop_scanners(RuntimeState* state) { + std::unique_lock l(_transfer_lock); + _should_stop = true; + _set_scanner_done(); + for (const std::weak_ptr& scanner : _all_scanners) { + if (std::shared_ptr sc = scanner.lock()) { + sc->_scanner->try_stop(); + } + } + _blocks_queue.clear(); + // TODO yiguolei, call mark close to scanners if (state->enable_profile()) { std::stringstream scanner_statistics; std::stringstream scanner_rows_read; @@ -430,76 +406,38 @@ Status ScannerContext::_close_and_clear_scanners(Parent* parent, RuntimeState* s scanner_statistics << "["; scanner_rows_read << "["; scanner_wait_worker_time << "["; - for (auto finished_scanner_time : _finished_scanner_runtime) { - scanner_statistics << PrettyPrinter::print(finished_scanner_time, TUnit::TIME_NS) - << ", "; - } - for (auto finished_scanner_rows : _finished_scanner_rows_read) { - scanner_rows_read << PrettyPrinter::print(finished_scanner_rows, TUnit::UNIT) << ", "; - } - for (auto finished_scanner_wait_time : _finished_scanner_wait_worker_time) { - scanner_wait_worker_time - << PrettyPrinter::print(finished_scanner_wait_time, TUnit::TIME_NS) << ", "; - } - // Only unfinished scanners here - for (auto& scanner : _scanners) { - // Scanners are in ObjPool in ScanNode, - // so no need to delete them here. + // Scanners can in 3 state + // state 1: in scanner context, not scheduled + // state 2: in scanner worker pool's queue, scheduled but not running + // state 3: scanner is running. + for (auto& scanner_ref : _all_scanners) { + auto scanner = scanner_ref.lock(); + if (scanner == nullptr) { + continue; + } // Add per scanner running time before close them - scanner_statistics << PrettyPrinter::print(scanner->get_time_cost_ns(), TUnit::TIME_NS) + scanner_statistics << PrettyPrinter::print(scanner->_scanner->get_time_cost_ns(), + TUnit::TIME_NS) << ", "; - scanner_rows_read << PrettyPrinter::print(scanner->get_rows_read(), TUnit::UNIT) + scanner_rows_read << PrettyPrinter::print(scanner->_scanner->get_rows_read(), + TUnit::UNIT) << ", "; scanner_wait_worker_time - << PrettyPrinter::print(scanner->get_scanner_wait_worker_timer(), + << PrettyPrinter::print(scanner->_scanner->get_scanner_wait_worker_timer(), TUnit::TIME_NS) << ", "; + // since there are all scanners, some scanners is running, so that could not call scanner + // close here. } scanner_statistics << "]"; scanner_rows_read << "]"; scanner_wait_worker_time << "]"; - parent->scanner_profile()->add_info_string("PerScannerRunningTime", - scanner_statistics.str()); - parent->scanner_profile()->add_info_string("PerScannerRowsRead", scanner_rows_read.str()); - parent->scanner_profile()->add_info_string("PerScannerWaitTime", - scanner_wait_worker_time.str()); - } - // Only unfinished scanners here - for (auto& scanner : _scanners) { - static_cast(scanner->close(state)); - // Scanners are in ObjPool in ScanNode, - // so no need to delete them here. + _scanner_profile->add_info_string("PerScannerRunningTime", scanner_statistics.str()); + _scanner_profile->add_info_string("PerScannerRowsRead", scanner_rows_read.str()); + _scanner_profile->add_info_string("PerScannerWaitTime", scanner_wait_worker_time.str()); } - _scanners.clear(); - return Status::OK(); -} - -template -void ScannerContext::clear_and_join(Parent* parent, RuntimeState* state) { - std::unique_lock l(_transfer_lock); - do { - if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) { - break; - } else { - DCHECK(!state->enable_pipeline_exec()) - << " _num_running_scanners: " << _num_running_scanners - << " _num_scheduling_ctx: " << _num_scheduling_ctx; - while (!(_num_running_scanners == 0 && _num_scheduling_ctx == 0)) { - _ctx_finish_cv.wait(l); - } - break; - } - } while (false); - // Must wait all running scanners stop running. - // So that we can make sure to close all scanners. - static_cast(_close_and_clear_scanners(parent, state)); - - _blocks_queue.clear(); -} -bool ScannerContext::no_schedule() { - std::unique_lock l(_transfer_lock); - return _num_running_scanners == 0 && _num_scheduling_ctx == 0; + _blocks_queue_added_cv.notify_one(); } void ScannerContext::_set_scanner_done() { @@ -512,12 +450,11 @@ std::string ScannerContext::debug_string() { return fmt::format( "id: {}, sacnners: {}, blocks in queue: {}," " status: {}, _should_stop: {}, _is_finished: {}, free blocks: {}," - " limit: {}, _num_running_scanners: {}, _num_scheduling_ctx: {}, _max_thread_num: {}," + " limit: {}, _num_running_scanners: {}, _max_thread_num: {}," " _block_per_scanner: {}, _cur_bytes_in_queue: {}, MAX_BYTE_OF_QUEUE: {}", ctx_id, _scanners.size(), _blocks_queue.size(), status().ok(), _should_stop, - _is_finished, _free_blocks.size_approx(), limit, _num_running_scanners, - _num_scheduling_ctx, _max_thread_num, _block_per_scanner, _cur_bytes_in_queue, - _max_bytes_in_queue); + _is_finished, _free_blocks.size_approx(), limit, _num_running_scanners, _max_thread_num, + _block_per_scanner, _cur_bytes_in_queue, _max_bytes_in_queue); } void ScannerContext::reschedule_scanner_ctx() { @@ -525,84 +462,67 @@ void ScannerContext::reschedule_scanner_ctx() { if (done()) { return; } - auto state = _scanner_scheduler->submit(shared_from_this()); + auto submit_status = _scanner_scheduler->submit(shared_from_this()); //todo(wb) rethinking is it better to mark current scan_context failed when submit failed many times? - if (state.ok()) { - _num_scheduling_ctx++; - } else { - set_status_on_error(state, false); + if (!submit_status.ok()) { + set_status_on_error(submit_status, false); } } -void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) { - { - std::unique_lock l(_scanners_lock); - _scanners.push_front(scanner); - } +void ScannerContext::push_back_scanner_and_reschedule(std::shared_ptr scanner) { std::lock_guard l(_transfer_lock); - - // In pipeline engine, doris will close scanners when `no_schedule`. - // We have to decrease _num_running_scanners before schedule, otherwise - // schedule does not woring due to _num_running_scanners. - _num_running_scanners--; - set_ready_to_finish(); - - if (!done() && should_be_scheduled()) { - auto state = _scanner_scheduler->submit(shared_from_this()); - if (state.ok()) { - _num_scheduling_ctx++; - } else { - set_status_on_error(state, false); + // Use a transfer lock to avoid the scanner be scheduled concurrently. For example, that after + // calling "_scanners.push_front(scanner)", there may be other ctx in scheduler + // to schedule that scanner right away, and in that schedule run, the scanner may be marked as closed + // before we call the following if() block. + if (scanner->_scanner->need_to_close()) { + --_num_unfinished_scanners; + if (_num_unfinished_scanners == 0) { + _dispose_coloate_blocks_not_in_queue(); + _is_finished = true; + _set_scanner_done(); + _blocks_queue_added_cv.notify_one(); + return; } } - // Notice that after calling "_scanners.push_front(scanner)", there may be other ctx in scheduler - // to schedule that scanner right away, and in that schedule run, the scanner may be marked as closed - // before we call the following if() block. - // So we need "scanner->set_counted_down()" to avoid "_num_unfinished_scanners" being decreased twice by - // same scanner. - if (scanner->need_to_close() && scanner->set_counted_down() && - (--_num_unfinished_scanners) == 0) { - _dispose_coloate_blocks_not_in_queue(); - _is_finished = true; - _set_scanner_done(); - _blocks_queue_added_cv.notify_one(); + _scanners.push_front(scanner); + + if (should_be_scheduled()) { + auto submit_status = _scanner_scheduler->submit(shared_from_this()); + if (!submit_status.ok()) { + set_status_on_error(submit_status, false); + } } - _ctx_finish_cv.notify_one(); } -void ScannerContext::get_next_batch_of_scanners(std::list* current_run) { +// This method is called in scanner scheduler, and task context is hold +void ScannerContext::get_next_batch_of_scanners( + std::list>* current_run) { + std::lock_guard l(_transfer_lock); + // Update the sched counter for profile + Defer defer {[&]() { _scanner_sched_counter->update(current_run->size()); }}; // 1. Calculate how many scanners should be scheduled at this run. - int thread_slot_num = 0; - { - // If there are enough space in blocks queue, - // the scanner number depends on the _free_blocks numbers - thread_slot_num = get_available_thread_slot_num(); - } + // If there are enough space in blocks queue, + // the scanner number depends on the _free_blocks numbers + int thread_slot_num = get_available_thread_slot_num(); // 2. get #thread_slot_num scanners from ctx->scanners // and put them into "this_run". - { - std::unique_lock l(_scanners_lock); - for (int i = 0; i < thread_slot_num && !_scanners.empty();) { - VScannerSPtr scanner = _scanners.front(); - _scanners.pop_front(); - if (scanner->need_to_close()) { - _finished_scanner_runtime.push_back(scanner->get_time_cost_ns()); - _finished_scanner_rows_read.push_back(scanner->get_rows_read()); - _finished_scanner_wait_worker_time.push_back( - scanner->get_scanner_wait_worker_timer()); - static_cast(scanner->close(_state)); - } else { - current_run->push_back(scanner); - i++; - } + for (int i = 0; i < thread_slot_num && !_scanners.empty();) { + std::weak_ptr scanner_ref = _scanners.front(); + std::shared_ptr scanner = scanner_ref.lock(); + _scanners.pop_front(); + if (scanner == nullptr) { + continue; + } + if (scanner->_scanner->need_to_close()) { + static_cast(scanner->_scanner->close(_state)); + } else { + current_run->push_back(scanner_ref); + i++; } } } -template void ScannerContext::clear_and_join(pipeline::ScanLocalStateBase* parent, - RuntimeState* state); -template void ScannerContext::clear_and_join(VScanNode* parent, RuntimeState* state); - } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index ba9c1fdee10a5b..e320eb55b2e19c 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -53,6 +53,7 @@ class TaskGroup; namespace vectorized { class VScanner; +class ScannerDelegate; class VScanNode; class ScannerScheduler; class SimplifiedScanScheduler; @@ -70,7 +71,7 @@ class ScannerContext : public std::enable_shared_from_this { public: ScannerContext(RuntimeState* state, VScanNode* parent, const TupleDescriptor* output_tuple_desc, - const std::list& scanners, int64_t limit_, + const std::list>& scanners, int64_t limit_, int64_t max_bytes_in_blocks_queue, const int num_parallel_instances = 1, pipeline::ScanLocalStateBase* local_state = nullptr); @@ -92,9 +93,9 @@ class ScannerContext : public std::enable_shared_from_this { // When a scanner complete a scan, this method will be called // to return the scanner to the list for next scheduling. - void push_back_scanner_and_reschedule(VScannerSPtr scanner); + void push_back_scanner_and_reschedule(std::shared_ptr scanner); - bool set_status_on_error(const Status& status, bool need_lock = true); + void set_status_on_error(const Status& status, bool need_lock = true); Status status() { if (_process_status.is()) { @@ -103,34 +104,21 @@ class ScannerContext : public std::enable_shared_from_this { return _process_status; } - // Called by ScanNode. - // Used to notify the scheduler that this ScannerContext can stop working. - void set_should_stop(); - // Return true if this ScannerContext need no more process - virtual bool done() { return _is_finished || _should_stop; } + bool done() const { return _is_finished || _should_stop; } bool is_finished() { return _is_finished.load(); } bool should_stop() { return _should_stop.load(); } bool status_error() { return _status_error.load(); } void inc_num_running_scanners(int32_t scanner_inc); - void set_ready_to_finish(); + void dec_num_running_scanners(int32_t scanner_dec); int get_num_running_scanners() const { return _num_running_scanners; } int get_num_unfinished_scanners() const { return _num_unfinished_scanners; } - void dec_num_scheduling_ctx(); - - int get_num_scheduling_ctx() const { return _num_scheduling_ctx; } - - void get_next_batch_of_scanners(std::list* current_run); - - template - void clear_and_join(Parent* parent, RuntimeState* state); - - bool no_schedule(); + void get_next_batch_of_scanners(std::list>* current_run); virtual std::string debug_string(); @@ -138,7 +126,6 @@ class ScannerContext : public std::enable_shared_from_this { void incr_num_ctx_scheduling(int64_t num) { _scanner_ctx_sched_counter->update(num); } void incr_ctx_scheduling_time(int64_t num) { _scanner_ctx_sched_time->update(num); } - void incr_num_scanner_scheduling(int64_t num) { _scanner_sched_counter->update(num); } std::string parent_name(); @@ -146,7 +133,7 @@ class ScannerContext : public std::enable_shared_from_this { // todo(wb) rethinking how to calculate ```_max_bytes_in_queue``` when executing shared scan inline bool should_be_scheduled() const { - return (_cur_bytes_in_queue < _max_bytes_in_queue / 2) && + return !done() && (_cur_bytes_in_queue < _max_bytes_in_queue / 2) && (_serving_blocks_num < allowed_blocks_num()); } @@ -169,6 +156,8 @@ class ScannerContext : public std::enable_shared_from_this { SimplifiedScanScheduler* get_simple_scan_scheduler() { return _simple_scan_scheduler; } + void stop_scanners(RuntimeState* state); + void reschedule_scanner_ctx(); // the unique id of this context @@ -181,17 +170,12 @@ class ScannerContext : public std::enable_shared_from_this { std::weak_ptr get_task_execution_context() { return _task_exec_ctx; } -private: - template - Status _close_and_clear_scanners(Parent* parent, RuntimeState* state); - protected: ScannerContext(RuntimeState* state_, const TupleDescriptor* output_tuple_desc, - const std::list& scanners_, int64_t limit_, + const std::list>& scanners_, int64_t limit_, int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances, pipeline::ScanLocalStateBase* local_state, - std::shared_ptr dependency, - std::shared_ptr finish_dependency); + std::shared_ptr dependency); virtual void _dispose_coloate_blocks_not_in_queue() {} void _set_scanner_done(); @@ -275,9 +259,11 @@ class ScannerContext : public std::enable_shared_from_this { // and then if the scanner is not finished, will be pushed back to this list. // Not need to protect by lock, because only one scheduler thread will access to it. std::mutex _scanners_lock; - std::list _scanners; + // Scanner's ownership belong to vscannode or scanoperator, scanner context does not own it. + // ScannerContext has to check if scanner is deconstructed before use it. + std::list> _scanners; // weak pointer for _scanners, used in stop function - std::vector _scanners_ref; + std::vector> _all_scanners; std::vector _finished_scanner_runtime; std::vector _finished_scanner_rows_read; std::vector _finished_scanner_wait_worker_time; @@ -294,7 +280,6 @@ class ScannerContext : public std::enable_shared_from_this { RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr; std::shared_ptr _dependency = nullptr; - std::shared_ptr _finish_dependency = nullptr; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index e8d7f8a7139a6d..a67b9d7f27ab58 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -180,20 +180,14 @@ void ScannerScheduler::_schedule_scanners(std::shared_ptr ctx) { watch.reset(); watch.start(); ctx->incr_num_ctx_scheduling(1); - size_t size = 0; - Defer defer {[&]() { - ctx->incr_num_scanner_scheduling(size); - ctx->dec_num_scheduling_ctx(); - }}; if (ctx->done()) { return; } - std::list this_run; + std::list> this_run; ctx->get_next_batch_of_scanners(&this_run); - size = this_run.size(); - if (!size) { + if (this_run.empty()) { // There will be 2 cases when this_run is empty: // 1. The blocks queue reaches limit. // The consumer will continue scheduling the ctx. @@ -212,9 +206,14 @@ void ScannerScheduler::_schedule_scanners(std::shared_ptr ctx) { if (ctx->thread_token != nullptr) { // TODO llj tg how to treat this? while (iter != this_run.end()) { - (*iter)->start_wait_worker_timer(); - auto s = ctx->thread_token->submit_func( - [this, scanner = *iter, ctx] { this->_scanner_scan(this, ctx, scanner); }); + std::shared_ptr scanner_delegate = (*iter).lock(); + if (scanner_delegate == nullptr) { + continue; + } + scanner_delegate->_scanner->start_wait_worker_timer(); + auto s = ctx->thread_token->submit_func([this, scanner_ref = *iter, ctx]() { + this->_scanner_scan(this, ctx, scanner_ref); + }); if (s.ok()) { this_run.erase(iter++); } else { @@ -224,28 +223,32 @@ void ScannerScheduler::_schedule_scanners(std::shared_ptr ctx) { } } else { while (iter != this_run.end()) { - (*iter)->start_wait_worker_timer(); - TabletStorageType type = (*iter)->get_storage_type(); + std::shared_ptr scanner_delegate = (*iter).lock(); + if (scanner_delegate == nullptr) { + continue; + } + scanner_delegate->_scanner->start_wait_worker_timer(); + TabletStorageType type = scanner_delegate->_scanner->get_storage_type(); bool ret = false; if (type == TabletStorageType::STORAGE_TYPE_LOCAL) { if (auto* scan_sche = ctx->get_simple_scan_scheduler()) { - auto work_func = [this, scanner = *iter, ctx] { - this->_scanner_scan(this, ctx, scanner); + auto work_func = [this, scanner_ref = *iter, ctx]() { + this->_scanner_scan(this, ctx, scanner_ref); }; SimplifiedScanTask simple_scan_task = {work_func, ctx}; ret = scan_sche->get_scan_queue()->try_put(simple_scan_task); } else { PriorityThreadPool::Task task; - task.work_function = [this, scanner = *iter, ctx] { - this->_scanner_scan(this, ctx, scanner); + task.work_function = [this, scanner_ref = *iter, ctx]() { + this->_scanner_scan(this, ctx, scanner_ref); }; task.priority = nice; ret = _local_scan_thread_pool->offer(task); } } else { PriorityThreadPool::Task task; - task.work_function = [this, scanner = *iter, ctx] { - this->_scanner_scan(this, ctx, scanner); + task.work_function = [this, scanner_ref = *iter, ctx]() { + this->_scanner_scan(this, ctx, scanner_ref); }; task.priority = nice; ret = _remote_scan_thread_pool->offer(task); @@ -263,13 +266,22 @@ void ScannerScheduler::_schedule_scanners(std::shared_ptr ctx) { } void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, - std::shared_ptr ctx, VScannerSPtr scanner) { + std::shared_ptr ctx, + std::weak_ptr scanner_ref) { + Defer defer {[&]() { ctx->dec_num_running_scanners(1); }}; auto task_lock = ctx->get_task_execution_context().lock(); if (task_lock == nullptr) { // LOG(WARNING) << "could not lock task execution context, query " << print_id(_query_id) // << " maybe finished"; return; } + // will release scanner if it is the last one, task lock is hold here, to ensure + // that scanner could call scannode's method during deconstructor + std::shared_ptr scanner_delegate = scanner_ref.lock(); + auto& scanner = scanner_delegate->_scanner; + if (scanner_delegate == nullptr) { + return; + } SCOPED_ATTACH_TASK(scanner->runtime_state()); // for cpu hard limit, thread name should not be reset if (ctx->_should_reset_thread_name) { @@ -400,7 +412,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, if (eos || should_stop) { scanner->mark_to_need_to_close(); } - ctx->push_back_scanner_and_reschedule(scanner); + ctx->push_back_scanner_and_reschedule(scanner_delegate); } void ScannerScheduler::_register_metrics() { diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index eb4d1380e3947c..9fedd27dbd8bb3 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -36,7 +36,7 @@ class BlockingQueue; } // namespace doris namespace doris::vectorized { - +class ScannerDelegate; class ScannerContext; // Responsible for the scheduling and execution of all Scanners of a BE node. @@ -79,7 +79,7 @@ class ScannerScheduler { void _schedule_scanners(std::shared_ptr ctx); // execution thread function void _scanner_scan(ScannerScheduler* scheduler, std::shared_ptr ctx, - VScannerSPtr scanner); + std::weak_ptr scanner); void _register_metrics(); diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index 5176d7900b3c7e..b780fc1a8a97f0 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -273,7 +273,7 @@ Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* reached_limit(block, eos); if (*eos) { // reach limit, stop the scanners. - _scanner_ctx->set_should_stop(); + _scanner_ctx->stop_scanners(state); } return Status::OK(); @@ -318,8 +318,8 @@ Status VScanNode::_init_profile() { return Status::OK(); } -Status VScanNode::_start_scanners(const std::list& scanners, - const int query_parallel_instance_num) { +void VScanNode::_start_scanners(const std::list>& scanners, + const int query_parallel_instance_num) { if (_is_pipeline_scan) { int max_queue_size = _shared_scan_opt ? std::max(query_parallel_instance_num, 1) : 1; _scanner_ctx = pipeline::PipScannerContext::create_shared( @@ -329,41 +329,29 @@ Status VScanNode::_start_scanners(const std::list& scanners, _scanner_ctx = ScannerContext::create_shared(_state, this, _output_tuple_desc, scanners, limit(), _state->scan_queue_mem_limit()); } - return Status::OK(); } Status VScanNode::close(RuntimeState* state) { if (is_closed()) { return Status::OK(); } + RETURN_IF_ERROR(ExecNode::close(state)); return Status::OK(); } void VScanNode::release_resource(RuntimeState* state) { if (_scanner_ctx) { - if (!state->enable_pipeline_exec()) { + if (!state->enable_pipeline_exec() || _should_create_scanner) { // stop and wait the scanner scheduler to be done // _scanner_ctx may not be created for some short circuit case. - _scanner_ctx->set_should_stop(); - _scanner_ctx->clear_and_join(this, state); - } else if (_should_create_scanner) { - _scanner_ctx->clear_and_join(this, state); + _scanner_ctx->stop_scanners(state); } } - + _scanners.clear(); ExecNode::release_resource(state); } -Status VScanNode::try_close(RuntimeState* state) { - if (_scanner_ctx) { - // mark this scanner ctx as should_stop to make sure scanners will not be scheduled anymore - // TODO: there is a lock in `set_should_stop` may cause some slight impact - _scanner_ctx->set_should_stop(); - } - return Status::OK(); -} - Status VScanNode::_normalize_conjuncts() { // The conjuncts is always on output tuple, so use _output_tuple_desc; std::vector slots = _output_tuple_desc->slots(); @@ -1329,11 +1317,15 @@ VScanNode::PushDownType VScanNode::_should_push_down_in_predicate(VInPredicate* Status VScanNode::_prepare_scanners(const int query_parallel_instance_num) { std::list scanners; RETURN_IF_ERROR(_init_scanners(&scanners)); + // Init scanner wrapper + for (auto it = scanners.begin(); it != scanners.end(); ++it) { + _scanners.emplace_back(std::make_shared(*it)); + } if (scanners.empty()) { _eos = true; } else { COUNTER_SET(_num_scanners, static_cast(scanners.size())); - RETURN_IF_ERROR(_start_scanners(scanners, query_parallel_instance_num)); + _start_scanners(_scanners, query_parallel_instance_num); } return Status::OK(); } diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 5917d0ff46b5eb..d4a054cacd5a82 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -87,6 +87,16 @@ struct FilterPredicates { std::vector>> in_filters; }; +// We want to close scanner automatically, so using a delegate class +// and call close method in the delegate class's dctor. +class ScannerDelegate { +public: + VScannerSPtr _scanner; + ScannerDelegate(VScannerSPtr& scanner_ptr) : _scanner(scanner_ptr) {} + ~ScannerDelegate() { static_cast(_scanner->close(_scanner->runtime_state())); } + ScannerDelegate(ScannerDelegate&&) = delete; +}; + class VScanNode : public ExecNode, public RuntimeFilterConsumer { public: VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) @@ -156,8 +166,6 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer { Status alloc_resource(RuntimeState* state) override; void release_resource(RuntimeState* state) override; - Status try_close(RuntimeState* state); - bool should_run_serial() const { return _should_run_serial || _state->enable_scan_node_run_serial(); } @@ -262,8 +270,11 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer { int _max_scan_key_num; int _max_pushdown_conditions_per_column; - // Each scan node will generates a ScannerContext to manage all Scanners. - // See comments of ScannerContext for more details + // ScanNode owns the ownership of scanner, scanner context only has its weakptr + std::list> _scanners; + + // Each scan node will generates a ScannerContext to do schedule work + // ScannerContext will be added to scanner scheduler std::shared_ptr _scanner_ctx = nullptr; // indicate this scan node has no more data to return @@ -437,8 +448,8 @@ class VScanNode : public ExecNode, public RuntimeFilterConsumer { const std::string& fn_name, int slot_ref_child = -1); // Submit the scanner to the thread pool and start execution - Status _start_scanners(const std::list& scanners, - const int query_parallel_instance_num); + void _start_scanners(const std::list>& scanners, + const int query_parallel_instance_num); }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index 29daf9a68c557f..6046d87ac91ef5 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -145,16 +145,6 @@ class VScanner { void set_status_on_failure(const Status& st) { _status = st; } - // return false if _is_counted_down is already true, - // otherwise, set _is_counted_down to true and return true. - bool set_counted_down() { - if (_is_counted_down) { - return false; - } - _is_counted_down = true; - return true; - } - protected: void _discard_conjuncts() { for (auto& conjunct : _conjuncts) { @@ -215,8 +205,6 @@ class VScanner { int64_t _scan_cpu_timer = 0; bool _is_load = false; - // set to true after decrease the "_num_unfinished_scanners" in scanner context - bool _is_counted_down = false; bool _is_init = true; @@ -227,6 +215,5 @@ class VScanner { }; using VScannerSPtr = std::shared_ptr; -using VScannerWPtr = std::weak_ptr; } // namespace doris::vectorized