From 6f125b6ff62abb167908d7028aaa47caead15086 Mon Sep 17 00:00:00 2001 From: wangbo Date: Thu, 16 Nov 2023 09:42:44 +0800 Subject: [PATCH] [improvement](pipeline) task group scan entity (#19924) (#27040) Co-authored-by: Lijia Liu --- be/src/common/config.cpp | 2 +- be/src/common/config.h | 2 +- be/src/pipeline/pipeline_fragment_context.cpp | 5 +- be/src/pipeline/pipeline_fragment_context.h | 10 +- be/src/pipeline/pipeline_task.cpp | 10 +- be/src/pipeline/pipeline_task.h | 9 +- be/src/pipeline/task_queue.cpp | 59 +++-- be/src/pipeline/task_queue.h | 28 +-- be/src/pipeline/task_scheduler.cpp | 6 +- be/src/pipeline/task_scheduler.h | 3 +- be/src/runtime/exec_env.h | 5 + be/src/runtime/exec_env_init.cpp | 3 + be/src/runtime/fragment_mgr.cpp | 2 +- be/src/runtime/task_group/task_group.cpp | 98 +++++--- be/src/runtime/task_group/task_group.h | 59 +++-- .../runtime/task_group/task_group_manager.cpp | 8 +- .../runtime/task_group/task_group_manager.h | 8 +- be/src/util/mem_info.cpp | 4 +- be/src/vec/exec/scan/scan_task_queue.cpp | 213 ++++++++++++++++++ be/src/vec/exec/scan/scan_task_queue.h | 98 ++++++++ be/src/vec/exec/scan/scanner_context.cpp | 19 ++ be/src/vec/exec/scan/scanner_context.h | 6 + be/src/vec/exec/scan/scanner_scheduler.cpp | 54 ++++- be/src/vec/exec/scan/scanner_scheduler.h | 13 ++ be/src/vec/exec/scan/vscanner.h | 2 + .../java/org/apache/doris/common/Config.java | 5 +- 26 files changed, 592 insertions(+), 139 deletions(-) create mode 100644 be/src/vec/exec/scan/scan_task_queue.cpp create mode 100644 be/src/vec/exec/scan/scan_task_queue.h diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 22ec143c6da8c1..b09e77f7eb31cc 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -955,7 +955,7 @@ DEFINE_Bool(enable_fuzzy_mode, "false"); DEFINE_Bool(enable_debug_points, "false"); DEFINE_Int32(pipeline_executor_size, "0"); -DEFINE_mInt16(pipeline_short_query_timeout_s, "20"); +DEFINE_mBool(enable_workload_group_for_scan, "false"); // Temp config. True to use optimization for bitmap_index apply predicate except leaf node of the and node. // Will remove after fully test. diff --git a/be/src/common/config.h b/be/src/common/config.h index 0a11a4b46d703b..de372deaf95f65 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -997,7 +997,7 @@ DECLARE_Bool(enable_fuzzy_mode); DECLARE_Bool(enable_debug_points); DECLARE_Int32(pipeline_executor_size); -DECLARE_mInt16(pipeline_short_query_timeout_s); +DECLARE_mBool(enable_workload_group_for_scan); // Temp config. True to use optimization for bitmap_index apply predicate except leaf node of the and node. // Will remove after fully test. diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index d718e1c6eeec1d..1b7dffae1c1b3b 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -128,6 +128,9 @@ PipelineFragmentContext::PipelineFragmentContext( _report_thread_active(false), _report_status_cb(report_status_cb), _is_report_on_cancel(true) { + if (_query_ctx->get_task_group()) { + _task_group_entity = _query_ctx->get_task_group()->task_entity(); + } _report_thread_future = _report_thread_promise.get_future(); _fragment_watcher.start(); } @@ -689,7 +692,7 @@ Status PipelineFragmentContext::submit() { int submit_tasks = 0; Status st; auto* scheduler = _exec_env->pipeline_task_scheduler(); - if (get_task_group()) { + if (_task_group_entity) { scheduler = _exec_env->pipeline_task_group_scheduler(); } for (auto& task : _tasks) { diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index ed79800ec60f9c..1b44894b84231a 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -48,10 +48,6 @@ class RuntimeFilterMergeControllerEntity; class TDataSink; class TPipelineFragmentParams; -namespace taskgroup { -class TaskGroup; -} // namespace taskgroup - namespace pipeline { class PipelineFragmentContext : public std::enable_shared_from_this { @@ -121,7 +117,9 @@ class PipelineFragmentContext : public std::enable_shared_from_thisget_task_group(); } + taskgroup::TaskGroupPipelineTaskEntity* get_task_group_entity() const { + return _task_group_entity; + } private: Status _create_sink(int sender_id, const TDataSink& t_data_sink, RuntimeState* state); @@ -176,6 +174,8 @@ class PipelineFragmentContext : public std::enable_shared_from_this _query_ctx; + taskgroup::TaskGroupPipelineTaskEntity* _task_group_entity = nullptr; + std::shared_ptr _merge_controller_handler; MonotonicStopWatch _fragment_watcher; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 9e40b85f9170a2..0d3cb413bf7841 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -37,9 +37,6 @@ namespace doris { class RuntimeState; -namespace taskgroup { -class TaskGroup; -} // namespace taskgroup } // namespace doris namespace doris::pipeline { @@ -402,7 +399,8 @@ std::string PipelineTask::debug_string() { _task_profile->pretty_print(&profile_ss, ""); fmt::format_to(debug_string_buffer, "Profile: {}\n", profile_ss.str()); } - fmt::format_to(debug_string_buffer, "PipelineTask[id = {}, state = {}]\noperators: ", _index, + fmt::format_to(debug_string_buffer, + "PipelineTask[this = {}, state = {}]\noperators: ", (void*)this, get_state_name(_cur_state)); for (size_t i = 0; i < _operators.size(); i++) { fmt::format_to(debug_string_buffer, "\n{}{}", std::string(i * 2, ' '), @@ -422,8 +420,8 @@ std::string PipelineTask::debug_string() { return fmt::to_string(debug_string_buffer); } -taskgroup::TaskGroup* PipelineTask::get_task_group() const { - return _fragment_context->get_task_group(); +taskgroup::TaskGroupPipelineTaskEntity* PipelineTask::get_task_group_entity() const { + return _fragment_context->get_task_group_entity(); } } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 9fdf3c82e0593e..ea78d795006a8a 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -26,6 +26,7 @@ #include "common/status.h" #include "exec/operator.h" #include "pipeline.h" +#include "runtime/task_group/task_group.h" #include "util/runtime_profile.h" #include "util/stopwatch.hpp" #include "vec/core/block.h" @@ -36,9 +37,6 @@ class RuntimeState; namespace pipeline { class PipelineFragmentContext; } // namespace pipeline -namespace taskgroup { -class TaskGroup; -} // namespace taskgroup } // namespace doris namespace doris::pipeline { @@ -107,6 +105,7 @@ inline const char* get_state_name(PipelineTaskState idx) { } class TaskQueue; +class PriorityTaskQueue; // The class do the pipeline task. Minest schdule union by task scheduler class PipelineTask { @@ -185,11 +184,11 @@ class PipelineTask { std::string debug_string(); - taskgroup::TaskGroup* get_task_group() const; + taskgroup::TaskGroupPipelineTaskEntity* get_task_group_entity() const; void set_task_queue(TaskQueue* task_queue); - static constexpr auto THREAD_TIME_SLICE = 100'000'000L; + static constexpr auto THREAD_TIME_SLICE = 100'000'000ULL; // 1 used for update priority queue // note(wb) an ugly implementation, need refactor later diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index 715c9601cdffe5..a68a2ba4a7f622 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -23,7 +23,6 @@ #include "common/logging.h" #include "pipeline/pipeline_task.h" -#include "runtime/task_group/task_group.h" namespace doris { namespace pipeline { @@ -55,7 +54,7 @@ void PriorityTaskQueue::close() { _wait_task.notify_all(); } -PipelineTask* PriorityTaskQueue::try_take_unprotected(bool is_steal) { +PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) { if (_total_task_size == 0 || _closed) { return nullptr; } @@ -94,12 +93,12 @@ int PriorityTaskQueue::_compute_level(uint64_t runtime) { PipelineTask* PriorityTaskQueue::try_take(bool is_steal) { // TODO other efficient lock? e.g. if get lock fail, return null_ptr std::unique_lock lock(_work_size_mutex); - return try_take_unprotected(is_steal); + return _try_take_unprotected(is_steal); } PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) { std::unique_lock lock(_work_size_mutex); - auto task = try_take_unprotected(false); + auto task = _try_take_unprotected(false); if (task) { return task; } else { @@ -108,7 +107,7 @@ PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) { } else { _wait_task.wait(lock); } - return try_take_unprotected(false); + return _try_take_unprotected(false); } } @@ -131,6 +130,11 @@ Status PriorityTaskQueue::push(PipelineTask* task) { return Status::OK(); } +int PriorityTaskQueue::task_size() { + std::unique_lock lock(_work_size_mutex); + return _total_task_size; +} + MultiCoreTaskQueue::~MultiCoreTaskQueue() = default; MultiCoreTaskQueue::MultiCoreTaskQueue(size_t core_size) : TaskQueue(core_size), _closed(false) { @@ -201,9 +205,9 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task, size_t core_id) { } bool TaskGroupTaskQueue::TaskGroupSchedEntityComparator::operator()( - const taskgroup::TGEntityPtr& lhs_ptr, const taskgroup::TGEntityPtr& rhs_ptr) const { - int64_t lhs_val = lhs_ptr->vruntime_ns(); - int64_t rhs_val = rhs_ptr->vruntime_ns(); + const taskgroup::TGPTEntityPtr& lhs_ptr, const taskgroup::TGPTEntityPtr& rhs_ptr) const { + auto lhs_val = lhs_ptr->vruntime_ns(); + auto rhs_val = rhs_ptr->vruntime_ns(); if (lhs_val != rhs_val) { return lhs_val < rhs_val; } else { @@ -217,7 +221,8 @@ bool TaskGroupTaskQueue::TaskGroupSchedEntityComparator::operator()( } } -TaskGroupTaskQueue::TaskGroupTaskQueue(size_t core_size) : TaskQueue(core_size) {} +TaskGroupTaskQueue::TaskGroupTaskQueue(size_t core_size) + : TaskQueue(core_size), _min_tg_entity(nullptr) {} TaskGroupTaskQueue::~TaskGroupTaskQueue() = default; @@ -237,9 +242,10 @@ Status TaskGroupTaskQueue::push_back(PipelineTask* task, size_t core_id) { template Status TaskGroupTaskQueue::_push_back(PipelineTask* task) { - auto* entity = task->get_task_group()->task_entity(); + task->put_in_runnable_queue(); + auto* entity = task->get_task_group_entity(); std::unique_lock lock(_rs_mutex); - entity->push_back(task); + entity->task_queue()->emplace(task); if (_group_entities.find(entity) == _group_entities.end()) { _enqueue_task_group(entity); } @@ -250,7 +256,7 @@ Status TaskGroupTaskQueue::_push_back(PipelineTask* task) { // TODO pipeline support steal PipelineTask* TaskGroupTaskQueue::take(size_t core_id) { std::unique_lock lock(_rs_mutex); - taskgroup::TGEntityPtr entity = nullptr; + taskgroup::TGPTEntityPtr entity = nullptr; while (entity == nullptr) { if (_closed) { return nullptr; @@ -268,11 +274,16 @@ PipelineTask* TaskGroupTaskQueue::take(size_t core_id) { if (entity->task_size() == 1) { _dequeue_task_group(entity); } - return entity->take(); + auto task = entity->task_queue()->front(); + if (task) { + entity->task_queue()->pop(); + task->pop_out_runnable_queue(); + } + return task; } template -void TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGEntityPtr tg_entity) { +void TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGPTEntityPtr tg_entity) { _total_cpu_share += tg_entity->cpu_share(); if constexpr (!from_worker) { /** @@ -283,7 +294,9 @@ void TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGEntityPtr tg_entity) { auto old_v_ns = tg_entity->vruntime_ns(); auto* min_entity = _min_tg_entity.load(); if (min_entity) { - int64_t new_vruntime_ns = min_entity->vruntime_ns() - _ideal_runtime_ns(tg_entity) / 2; + auto min_tg_v = min_entity->vruntime_ns(); + auto ideal_r = _ideal_runtime_ns(tg_entity) / 2; + uint64_t new_vruntime_ns = min_tg_v > ideal_r ? min_tg_v - ideal_r : min_tg_v; if (new_vruntime_ns > old_v_ns) { tg_entity->adjust_vruntime_ns(new_vruntime_ns); } @@ -297,7 +310,7 @@ void TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGEntityPtr tg_entity) { _update_min_tg(); } -void TaskGroupTaskQueue::_dequeue_task_group(taskgroup::TGEntityPtr tg_entity) { +void TaskGroupTaskQueue::_dequeue_task_group(taskgroup::TGPTEntityPtr tg_entity) { _total_cpu_share -= tg_entity->cpu_share(); _group_entities.erase(tg_entity); VLOG_DEBUG << "dequeue tg " << tg_entity->debug_string() @@ -317,12 +330,12 @@ void TaskGroupTaskQueue::_update_min_tg() { } // like sched_fair.c calc_delta_fair, THREAD_TIME_SLICE maybe a dynamic value. -int64_t TaskGroupTaskQueue::_ideal_runtime_ns(taskgroup::TGEntityPtr tg_entity) const { +uint64_t TaskGroupTaskQueue::_ideal_runtime_ns(taskgroup::TGPTEntityPtr tg_entity) const { return PipelineTask::THREAD_TIME_SLICE * _core_size * tg_entity->cpu_share() / _total_cpu_share; } -taskgroup::TGEntityPtr TaskGroupTaskQueue::_next_tg_entity() { - taskgroup::TGEntityPtr res = nullptr; +taskgroup::TGPTEntityPtr TaskGroupTaskQueue::_next_tg_entity() { + taskgroup::TGPTEntityPtr res = nullptr; for (auto* entity : _group_entities) { res = entity; break; @@ -332,8 +345,7 @@ taskgroup::TGEntityPtr TaskGroupTaskQueue::_next_tg_entity() { void TaskGroupTaskQueue::update_statistics(PipelineTask* task, int64_t time_spent) { std::unique_lock lock(_rs_mutex); - auto* group = task->get_task_group(); - auto* entity = group->task_entity(); + auto* entity = task->get_task_group_entity(); auto find_entity = _group_entities.find(entity); bool is_in_queue = find_entity != _group_entities.end(); VLOG_DEBUG << "update_statistics " << entity->debug_string() << ", in queue:" << is_in_queue; @@ -348,15 +360,14 @@ void TaskGroupTaskQueue::update_statistics(PipelineTask* task, int64_t time_spen } void TaskGroupTaskQueue::update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info, - taskgroup::TaskGroupPtr task_group) { + taskgroup::TGPTEntityPtr entity) { std::unique_lock lock(_rs_mutex); - auto* entity = task_group->task_entity(); bool is_in_queue = _group_entities.find(entity) != _group_entities.end(); if (is_in_queue) { _group_entities.erase(entity); _total_cpu_share -= entity->cpu_share(); } - task_group->update_cpu_share_unlock(task_group_info); + entity->check_and_update_cpu_share(task_group_info); if (is_in_queue) { _group_entities.emplace(entity); _total_cpu_share += entity->cpu_share(); diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index 9956ba3cb988d0..d693cbe2168e66 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -33,7 +33,6 @@ #include "runtime/task_group/task_group.h" namespace doris { - namespace pipeline { class TaskQueue { @@ -54,7 +53,7 @@ class TaskQueue { virtual void update_statistics(PipelineTask* task, int64_t time_spent) {} virtual void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info, - taskgroup::TaskGroupPtr task_group) = 0; + taskgroup::TGPTEntityPtr entity) = 0; int cores() const { return _core_size; } @@ -95,12 +94,10 @@ class SubTaskQueue { // A Multilevel Feedback Queue class PriorityTaskQueue { public: - explicit PriorityTaskQueue(); + PriorityTaskQueue(); void close(); - PipelineTask* try_take_unprotected(bool is_steal); - PipelineTask* try_take(bool is_steal); PipelineTask* take(uint32_t timeout_ms = 0); @@ -111,7 +108,10 @@ class PriorityTaskQueue { _sub_queues[level].inc_runtime(runtime); } + int task_size(); + private: + PipelineTask* _try_take_unprotected(bool is_steal); static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 2; static constexpr size_t SUB_QUEUE_LEVEL = 6; SubTaskQueue _sub_queues[SUB_QUEUE_LEVEL]; @@ -155,7 +155,7 @@ class MultiCoreTaskQueue : public TaskQueue { } void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info, - taskgroup::TaskGroupPtr task_group) override { + taskgroup::TGPTEntityPtr entity) override { LOG(FATAL) << "update_tg_cpu_share not implemented"; } @@ -185,29 +185,29 @@ class TaskGroupTaskQueue : public TaskQueue { void update_statistics(PipelineTask* task, int64_t time_spent) override; void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info, - taskgroup::TaskGroupPtr task_group) override; + taskgroup::TGPTEntityPtr entity) override; private: template Status _push_back(PipelineTask* task); template - void _enqueue_task_group(taskgroup::TGEntityPtr); - void _dequeue_task_group(taskgroup::TGEntityPtr); - taskgroup::TGEntityPtr _next_tg_entity(); - int64_t _ideal_runtime_ns(taskgroup::TGEntityPtr tg_entity) const; + void _enqueue_task_group(taskgroup::TGPTEntityPtr); + void _dequeue_task_group(taskgroup::TGPTEntityPtr); + taskgroup::TGPTEntityPtr _next_tg_entity(); + uint64_t _ideal_runtime_ns(taskgroup::TGPTEntityPtr tg_entity) const; void _update_min_tg(); // Like cfs rb tree in sched_entity struct TaskGroupSchedEntityComparator { - bool operator()(const taskgroup::TGEntityPtr&, const taskgroup::TGEntityPtr&) const; + bool operator()(const taskgroup::TGPTEntityPtr&, const taskgroup::TGPTEntityPtr&) const; }; - using ResouceGroupSet = std::set; + using ResouceGroupSet = std::set; ResouceGroupSet _group_entities; std::condition_variable _wait_task; std::mutex _rs_mutex; bool _closed = false; int _total_cpu_share = 0; - std::atomic _min_tg_entity = nullptr; + std::atomic _min_tg_entity = nullptr; uint64_t _min_tg_v_runtime_ns = 0; }; diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index b83a51ecde23d9..4298efd01dbd0e 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -112,6 +112,7 @@ void BlockedTaskScheduler::_schedule() { if (state == PipelineTaskState::PENDING_FINISH) { // should cancel or should finish if (task->is_pending_finish()) { + VLOG_DEBUG << "Task pending" << task->debug_string(); iter++; } else { _make_task_run(local_blocked_tasks, iter, PipelineTaskState::PENDING_FINISH); @@ -365,9 +366,4 @@ void TaskScheduler::shutdown() { } } -void TaskScheduler::update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info, - taskgroup::TaskGroupPtr task_group) { - _task_queue->update_tg_cpu_share(task_group_info, task_group); -} - } // namespace doris::pipeline diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index ad69e10d8b0f8a..ac9389c0887d1a 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -91,8 +91,7 @@ class TaskScheduler { void shutdown(); - void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info, - taskgroup::TaskGroupPtr task_group); + TaskQueue* task_queue() const { return _task_queue.get(); } private: std::unique_ptr _fix_thread_pool; diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index afc64f65f4b6f2..2032d7883923e7 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -39,6 +39,9 @@ class ScannerScheduler; namespace pipeline { class TaskScheduler; } +namespace taskgroup { +class TaskGroupManager; +} class BfdParser; class BrokerMgr; template @@ -108,6 +111,7 @@ class ExecEnv { pipeline::TaskScheduler* pipeline_task_group_scheduler() { return _pipeline_task_group_scheduler; } + taskgroup::TaskGroupManager* task_group_manager() { return _task_group_manager; } // using template to simplify client cache management template @@ -233,6 +237,7 @@ class ExecEnv { FragmentMgr* _fragment_mgr = nullptr; pipeline::TaskScheduler* _pipeline_task_scheduler = nullptr; pipeline::TaskScheduler* _pipeline_task_group_scheduler = nullptr; + taskgroup::TaskGroupManager* _task_group_manager = nullptr; ResultCache* _result_cache = nullptr; TMasterInfo* _master_info = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index a4e1479fc4eeff..d0fab65a75fac4 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -65,6 +65,7 @@ #include "runtime/small_file_mgr.h" #include "runtime/stream_load/new_load_stream_mgr.h" #include "runtime/stream_load/stream_load_executor.h" +#include "runtime/task_group/task_group_manager.h" #include "runtime/thread_context.h" #include "service/point_query_executor.h" #include "util/bfd_parser.h" @@ -147,6 +148,7 @@ Status ExecEnv::_init(const std::vector& store_paths) { .build(&_join_node_thread_pool); RETURN_IF_ERROR(init_pipeline_task_scheduler()); + _task_group_manager = new taskgroup::TaskGroupManager(); _scanner_scheduler = new doris::vectorized::ScannerScheduler(); _fragment_mgr = new FragmentMgr(this); _result_cache = new ResultCache(config::query_cache_max_size_mb, @@ -389,6 +391,7 @@ void ExecEnv::_destroy() { SAFE_DELETE(_load_path_mgr); SAFE_DELETE(_pipeline_task_scheduler); SAFE_DELETE(_pipeline_task_group_scheduler); + SAFE_DELETE(_task_group_manager); SAFE_DELETE(_fragment_mgr); SAFE_DELETE(_broker_client_cache); SAFE_DELETE(_frontend_client_cache); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 37588f36616dfc..7e7330a43ecd21 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -736,7 +736,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo auto status = taskgroup::TaskGroupInfo::parse_group_info(params.workload_groups[0], &task_group_info); if (status.ok()) { - auto tg = taskgroup::TaskGroupManager::instance()->get_or_create_task_group( + auto tg = _exec_env->task_group_manager()->get_or_create_task_group( task_group_info); tg->add_mem_tracker_limiter(query_ctx->query_mem_tracker); query_ctx->set_task_group(tg); diff --git a/be/src/runtime/task_group/task_group.cpp b/be/src/runtime/task_group/task_group.cpp index 37149ddfd8b116..7c3d8ff42b0f43 100644 --- a/be/src/runtime/task_group/task_group.cpp +++ b/be/src/runtime/task_group/task_group.cpp @@ -27,11 +27,14 @@ #include #include "common/logging.h" +#include "pipeline/task_queue.h" #include "pipeline/task_scheduler.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker_limiter.h" #include "util/mem_info.h" #include "util/parse_util.h" +#include "vec/exec/scan/scan_task_queue.h" +#include "vec/exec/scan/scanner_scheduler.h" namespace doris { namespace taskgroup { @@ -40,50 +43,75 @@ const static std::string CPU_SHARE = "cpu_share"; const static std::string MEMORY_LIMIT = "memory_limit"; const static std::string ENABLE_MEMORY_OVERCOMMIT = "enable_memory_overcommit"; -pipeline::PipelineTask* TaskGroupEntity::take() { - if (_queue.empty()) { - return nullptr; - } - auto task = _queue.front(); - _queue.pop(); - return task; +template +TaskGroupEntity::TaskGroupEntity(taskgroup::TaskGroup* tg, std::string type) + : _tg(tg), _type(type), _version(tg->version()), _cpu_share(tg->cpu_share()) { + _task_queue = new QueueType(); +} + +template +TaskGroupEntity::~TaskGroupEntity() { + delete _task_queue; } -void TaskGroupEntity::incr_runtime_ns(uint64_t runtime_ns) { - auto v_time = runtime_ns / _tg->cpu_share(); +template +QueueType* TaskGroupEntity::task_queue() { + return _task_queue; +} + +template +void TaskGroupEntity::incr_runtime_ns(uint64_t runtime_ns) { + auto v_time = runtime_ns / _cpu_share; _vruntime_ns += v_time; } -void TaskGroupEntity::adjust_vruntime_ns(uint64_t vruntime_ns) { +template +void TaskGroupEntity::adjust_vruntime_ns(uint64_t vruntime_ns) { VLOG_DEBUG << "adjust " << debug_string() << "vtime to " << vruntime_ns; _vruntime_ns = vruntime_ns; } -void TaskGroupEntity::push_back(pipeline::PipelineTask* task) { - _queue.emplace(task); +template +size_t TaskGroupEntity::task_size() const { + return _task_queue->size(); } -uint64_t TaskGroupEntity::cpu_share() const { - return _tg->cpu_share(); +template +uint64_t TaskGroupEntity::cpu_share() const { + return _cpu_share; } -uint64_t TaskGroupEntity::task_group_id() const { +template +uint64_t TaskGroupEntity::task_group_id() const { return _tg->id(); } -std::string TaskGroupEntity::debug_string() const { - return fmt::format("TGE[id = {}, cpu_share = {}, task size: {}, v_time:{}ns]", _tg->id(), - cpu_share(), _queue.size(), _vruntime_ns); +template +void TaskGroupEntity::check_and_update_cpu_share(const TaskGroupInfo& tg_info) { + if (tg_info.version > _version) { + _cpu_share = tg_info.cpu_share; + _version = tg_info.version; + } } +template +std::string TaskGroupEntity::debug_string() const { + return fmt::format("TGE[id = {}, name = {}-{}, cpu_share = {}, task size: {}, v_time:{} ns]", + _tg->id(), _tg->name(), _type, cpu_share(), task_size(), _vruntime_ns); +} + +template class TaskGroupEntity>; +template class TaskGroupEntity; + TaskGroup::TaskGroup(const TaskGroupInfo& tg_info) : _id(tg_info.id), _name(tg_info.name), - _cpu_share(tg_info.cpu_share), + _version(tg_info.version), _memory_limit(tg_info.memory_limit), _enable_memory_overcommit(tg_info.enable_memory_overcommit), - _version(tg_info.version), - _task_entity(this), + _cpu_share(tg_info.cpu_share), + _task_entity(this, "pipeline task entity"), + _local_scan_entity(this, "local scan entity"), _mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM) {} std::string TaskGroup::debug_string() const { @@ -105,22 +133,22 @@ void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) { return; } } - - std::lock_guard wl {_mutex}; - if (tg_info.version > _version) { - _name = tg_info.name; - _version = tg_info.version; - _memory_limit = tg_info.memory_limit; - _enable_memory_overcommit = tg_info.enable_memory_overcommit; - if (_cpu_share != tg_info.cpu_share) { - ExecEnv::GetInstance()->pipeline_task_group_scheduler()->update_tg_cpu_share( - tg_info, shared_from_this()); + { + std::lock_guard wl {_mutex}; + if (tg_info.version > _version) { + _name = tg_info.name; + _version = tg_info.version; + _memory_limit = tg_info.memory_limit; + _enable_memory_overcommit = tg_info.enable_memory_overcommit; + _cpu_share = tg_info.cpu_share; + } else { + return; } } -} - -void TaskGroup::update_cpu_share_unlock(const TaskGroupInfo& tg_info) { - _cpu_share = tg_info.cpu_share; + ExecEnv::GetInstance()->pipeline_task_group_scheduler()->task_queue()->update_tg_cpu_share( + tg_info, &_task_entity); + ExecEnv::GetInstance()->scanner_scheduler()->local_scan_task_queue()->update_tg_cpu_share( + tg_info, &_local_scan_entity); } int64_t TaskGroup::memory_used() { diff --git a/be/src/runtime/task_group/task_group.h b/be/src/runtime/task_group/task_group.h index c96664fc532166..cd547c9c7e9324 100644 --- a/be/src/runtime/task_group/task_group.h +++ b/be/src/runtime/task_group/task_group.h @@ -31,31 +31,34 @@ namespace doris { -namespace pipeline { -class PipelineTask; -} - class TPipelineWorkloadGroup; class MemTrackerLimiter; +namespace pipeline { +class PipelineTask; +} // namespace pipeline + namespace taskgroup { class TaskGroup; struct TaskGroupInfo; +class ScanTaskQueue; +template class TaskGroupEntity { public: - explicit TaskGroupEntity(taskgroup::TaskGroup* ts) : _tg(ts) {} - void push_back(pipeline::PipelineTask* task); + explicit TaskGroupEntity(taskgroup::TaskGroup* tg, std::string type); + ~TaskGroupEntity(); + uint64_t vruntime_ns() const { return _vruntime_ns; } - pipeline::PipelineTask* take(); + QueueType* task_queue(); void incr_runtime_ns(uint64_t runtime_ns); void adjust_vruntime_ns(uint64_t vruntime_ns); - size_t task_size() const { return _queue.size(); } + size_t task_size() const; uint64_t cpu_share() const; @@ -63,14 +66,29 @@ class TaskGroupEntity { uint64_t task_group_id() const; + void check_and_update_cpu_share(const TaskGroupInfo& tg_info); + private: - // TODO pipeline use MLFQ - std::queue _queue; - taskgroup::TaskGroup* _tg; + QueueType* _task_queue; + uint64_t _vruntime_ns = 0; + taskgroup::TaskGroup* _tg; + + std::string _type; + + // Because updating cpu share of entity requires locking the task queue(pipeline task queue or + // scan task queue) contains that entity, we kept version and cpu share in entity for + // independent updates. + int64_t _version; + uint64_t _cpu_share; }; -using TGEntityPtr = TaskGroupEntity*; +// TODO llj tg use PriorityTaskQueue to replace std::queue +using TaskGroupPipelineTaskEntity = TaskGroupEntity>; +using TGPTEntityPtr = TaskGroupPipelineTaskEntity*; + +using TaskGroupScanTaskEntity = TaskGroupEntity; +using TGSTEntityPtr = TaskGroupScanTaskEntity*; struct TgTrackerLimiterGroup { std::unordered_set> trackers; @@ -81,12 +99,17 @@ class TaskGroup : public std::enable_shared_from_this { public: explicit TaskGroup(const TaskGroupInfo& tg_info); - TaskGroupEntity* task_entity() { return &_task_entity; } + TaskGroupPipelineTaskEntity* task_entity() { return &_task_entity; } + TGSTEntityPtr local_scan_task_entity() { return &_local_scan_entity; } + + int64_t version() const { return _version; } uint64_t cpu_share() const { return _cpu_share.load(); } uint64_t id() const { return _id; } + std::string name() const { return _name; }; + bool enable_memory_overcommit() const { std::shared_lock r_lock(_mutex); return _enable_memory_overcommit; @@ -103,8 +126,6 @@ class TaskGroup : public std::enable_shared_from_this { void check_and_update(const TaskGroupInfo& tg_info); - void update_cpu_share_unlock(const TaskGroupInfo& tg_info); - void add_mem_tracker_limiter(std::shared_ptr mem_tracker_ptr); void remove_mem_tracker_limiter(std::shared_ptr mem_tracker_ptr); @@ -119,12 +140,12 @@ class TaskGroup : public std::enable_shared_from_this { mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit const uint64_t _id; std::string _name; - std::atomic _cpu_share; + int64_t _version; int64_t _memory_limit; // bytes bool _enable_memory_overcommit; - int64_t _version; - TaskGroupEntity _task_entity; - + std::atomic _cpu_share; + TaskGroupPipelineTaskEntity _task_entity; + TaskGroupScanTaskEntity _local_scan_entity; std::vector _mem_tracker_limiter_pool; }; diff --git a/be/src/runtime/task_group/task_group_manager.cpp b/be/src/runtime/task_group/task_group_manager.cpp index 552ab2c0a92e4b..6ce6d3160480eb 100644 --- a/be/src/runtime/task_group/task_group_manager.cpp +++ b/be/src/runtime/task_group/task_group_manager.cpp @@ -20,19 +20,17 @@ #include #include +#include "pipeline/task_scheduler.h" +#include "runtime/exec_env.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/task_group/task_group.h" +#include "vec/exec/scan/scanner_scheduler.h" namespace doris::taskgroup { TaskGroupManager::TaskGroupManager() = default; TaskGroupManager::~TaskGroupManager() = default; -TaskGroupManager* TaskGroupManager::instance() { - static TaskGroupManager tgm; - return &tgm; -} - TaskGroupPtr TaskGroupManager::get_or_create_task_group(const TaskGroupInfo& task_group_info) { { std::shared_lock r_lock(_group_mutex); diff --git a/be/src/runtime/task_group/task_group_manager.h b/be/src/runtime/task_group/task_group_manager.h index 0b7472438c302b..375208dc6e595e 100644 --- a/be/src/runtime/task_group/task_group_manager.h +++ b/be/src/runtime/task_group/task_group_manager.h @@ -23,13 +23,14 @@ #include "task_group.h" -namespace doris::taskgroup { +namespace doris { +class ExecEnv; +namespace taskgroup { class TaskGroupManager { public: TaskGroupManager(); ~TaskGroupManager(); - static TaskGroupManager* instance(); TaskGroupPtr get_or_create_task_group(const TaskGroupInfo& task_group_info); @@ -41,4 +42,5 @@ class TaskGroupManager { std::unordered_map _task_groups; }; -} // namespace doris::taskgroup +} // namespace taskgroup +} // namespace doris diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 9250101e67d25c..38f9039816c095 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -235,7 +235,7 @@ int64_t MemInfo::tg_not_enable_overcommit_group_gc() { std::unique_ptr tg_profile = std::make_unique("WorkloadGroup"); int64_t total_free_memory = 0; - taskgroup::TaskGroupManager::instance()->get_resource_groups( + ExecEnv::GetInstance()->task_group_manager()->get_resource_groups( [](const taskgroup::TaskGroupPtr& task_group) { return !task_group->enable_memory_overcommit(); }, @@ -277,7 +277,7 @@ int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t request_free_memory, MonotonicStopWatch watch; watch.start(); std::vector task_groups; - taskgroup::TaskGroupManager::instance()->get_resource_groups( + ExecEnv::GetInstance()->task_group_manager()->get_resource_groups( [](const taskgroup::TaskGroupPtr& task_group) { return task_group->enable_memory_overcommit(); }, diff --git a/be/src/vec/exec/scan/scan_task_queue.cpp b/be/src/vec/exec/scan/scan_task_queue.cpp new file mode 100644 index 00000000000000..538f77211c3aa3 --- /dev/null +++ b/be/src/vec/exec/scan/scan_task_queue.cpp @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "scan_task_queue.h" + +#include "pipeline/pipeline_task.h" +#include "runtime/task_group/task_group.h" +#include "vec/exec/scan/scanner_context.h" + +namespace doris { +namespace taskgroup { +static void empty_function() {} +ScanTask::ScanTask() : ScanTask(empty_function, nullptr, 1) {} + +ScanTask::ScanTask(WorkFunction scan_func, vectorized::ScannerContext* scanner_context, + int priority) + : scan_func(std::move(scan_func)), scanner_context(scanner_context), priority(priority) {} + +ScanTaskQueue::ScanTaskQueue() : _queue(config::doris_scanner_thread_pool_queue_size) {} + +Status ScanTaskQueue::try_push_back(ScanTask scan_task) { + if (_queue.try_put(std::move(scan_task))) { + VLOG_DEBUG << "try_push_back scan task " << scan_task.scanner_context->ctx_id << " " + << scan_task.priority; + return Status::OK(); + } else { + return Status::InternalError("failed to submit scan task to ScanTaskQueue"); + } +} +bool ScanTaskQueue::try_get(ScanTask* scan_task, uint32_t timeout_ms) { + auto r = _queue.blocking_get(scan_task, timeout_ms); + if (r) { + VLOG_DEBUG << "try get scan task " << scan_task->scanner_context->ctx_id << " " + << scan_task->priority; + } + return r; +} + +ScanTaskTaskGroupQueue::ScanTaskTaskGroupQueue(size_t core_size) : _core_size(core_size) {} +ScanTaskTaskGroupQueue::~ScanTaskTaskGroupQueue() = default; + +void ScanTaskTaskGroupQueue::close() { + std::unique_lock lock(_rs_mutex); + _closed = true; + _wait_task.notify_all(); +} + +bool ScanTaskTaskGroupQueue::take(ScanTask* scan_task) { + std::unique_lock lock(_rs_mutex); + taskgroup::TGSTEntityPtr entity = nullptr; + while (entity == nullptr) { + if (_closed) { + return false; + } + if (_group_entities.empty()) { + _wait_task.wait_for(lock, std::chrono::milliseconds(WAIT_CORE_TASK_TIMEOUT_MS * 5)); + } else { + entity = _next_tg_entity(); + if (!entity) { + _wait_task.wait_for(lock, std::chrono::milliseconds(WAIT_CORE_TASK_TIMEOUT_MS)); + } + } + } + DCHECK(entity->task_size() > 0); + if (entity->task_size() == 1) { + _dequeue_task_group(entity); + } + return entity->task_queue()->try_get(scan_task, WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */); +} + +bool ScanTaskTaskGroupQueue::push_back(ScanTask scan_task) { + auto* entity = scan_task.scanner_context->get_task_group()->local_scan_task_entity(); + std::unique_lock lock(_rs_mutex); + auto status = entity->task_queue()->try_push_back(scan_task); + if (!status.ok()) { + LOG(WARNING) << "try_push_back scan task fail: " << status; + return false; + } + if (_group_entities.find(entity) == _group_entities.end()) { + _enqueue_task_group(entity); + } + _wait_task.notify_one(); + return true; +} + +void ScanTaskTaskGroupQueue::update_statistics(ScanTask scan_task, int64_t time_spent) { + auto* entity = scan_task.scanner_context->get_task_group()->local_scan_task_entity(); + std::unique_lock lock(_rs_mutex); + auto find_entity = _group_entities.find(entity); + bool is_in_queue = find_entity != _group_entities.end(); + VLOG_DEBUG << "scan task task group queue update_statistics " << entity->debug_string() + << ", in queue:" << is_in_queue << ", time_spent: " << time_spent; + if (is_in_queue) { + _group_entities.erase(entity); + } + entity->incr_runtime_ns(time_spent); + if (is_in_queue) { + _group_entities.emplace(entity); + _update_min_tg(); + } +} + +void ScanTaskTaskGroupQueue::update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info, + taskgroup::TGSTEntityPtr entity) { + std::unique_lock lock(_rs_mutex); + bool is_in_queue = _group_entities.find(entity) != _group_entities.end(); + if (is_in_queue) { + _group_entities.erase(entity); + _total_cpu_share -= entity->cpu_share(); + } + entity->check_and_update_cpu_share(task_group_info); + if (is_in_queue) { + _group_entities.emplace(entity); + _total_cpu_share += entity->cpu_share(); + } +} + +void ScanTaskTaskGroupQueue::_enqueue_task_group(TGSTEntityPtr tg_entity) { + _total_cpu_share += tg_entity->cpu_share(); + // TODO llj tg If submitted back to this queue from the scanner thread, `adjust_vruntime_ns` + // should be avoided. + /** + * If a task group entity leaves task queue for a long time, its v runtime will be very + * small. This can cause it to preempt too many execution time. So, in order to avoid this + * situation, it is necessary to adjust the task group's v runtime. + * */ + auto old_v_ns = tg_entity->vruntime_ns(); + auto* min_entity = _min_tg_entity.load(); + if (min_entity) { + auto min_tg_v = min_entity->vruntime_ns(); + auto ideal_r = _ideal_runtime_ns(tg_entity) / 2; + uint64_t new_vruntime_ns = min_tg_v > ideal_r ? min_tg_v - ideal_r : min_tg_v; + if (new_vruntime_ns > old_v_ns) { + VLOG_DEBUG << tg_entity->debug_string() << ", adjust to new " << new_vruntime_ns; + tg_entity->adjust_vruntime_ns(new_vruntime_ns); + } + } else if (old_v_ns < _min_tg_v_runtime_ns) { + VLOG_DEBUG << tg_entity->debug_string() << ", adjust to " << _min_tg_v_runtime_ns; + tg_entity->adjust_vruntime_ns(_min_tg_v_runtime_ns); + } + _group_entities.emplace(tg_entity); + VLOG_DEBUG << "scan enqueue tg " << tg_entity->debug_string() + << ", group entity size: " << _group_entities.size(); + _update_min_tg(); +} + +void ScanTaskTaskGroupQueue::_dequeue_task_group(TGSTEntityPtr tg_entity) { + _total_cpu_share -= tg_entity->cpu_share(); + _group_entities.erase(tg_entity); + VLOG_DEBUG << "scan task group queue dequeue tg " << tg_entity->debug_string() + << ", group entity size: " << _group_entities.size(); + _update_min_tg(); +} + +TGSTEntityPtr ScanTaskTaskGroupQueue::_next_tg_entity() { + taskgroup::TGSTEntityPtr res = nullptr; + for (auto* entity : _group_entities) { + res = entity; + break; + } + return res; +} + +uint64_t ScanTaskTaskGroupQueue::_ideal_runtime_ns(TGSTEntityPtr tg_entity) const { + // Scan task does not have time slice, so we use pipeline task's instead. + return pipeline::PipelineTask::THREAD_TIME_SLICE * _core_size * tg_entity->cpu_share() / + _total_cpu_share; +} + +void ScanTaskTaskGroupQueue::_update_min_tg() { + auto* min_entity = _next_tg_entity(); + _min_tg_entity = min_entity; + if (min_entity) { + auto min_v_runtime = min_entity->vruntime_ns(); + if (min_v_runtime > _min_tg_v_runtime_ns) { + _min_tg_v_runtime_ns = min_v_runtime; + } + } +} + +bool ScanTaskTaskGroupQueue::TaskGroupSchedEntityComparator::operator()( + const taskgroup::TGSTEntityPtr& lhs_ptr, const taskgroup::TGSTEntityPtr& rhs_ptr) const { + auto lhs_val = lhs_ptr->vruntime_ns(); + auto rhs_val = rhs_ptr->vruntime_ns(); + if (lhs_val != rhs_val) { + return lhs_val < rhs_val; + } else { + auto l_share = lhs_ptr->cpu_share(); + auto r_share = rhs_ptr->cpu_share(); + if (l_share != r_share) { + return l_share < r_share; + } else { + return lhs_ptr->task_group_id() < rhs_ptr->task_group_id(); + } + } +} + +} // namespace taskgroup +} // namespace doris \ No newline at end of file diff --git a/be/src/vec/exec/scan/scan_task_queue.h b/be/src/vec/exec/scan/scan_task_queue.h new file mode 100644 index 00000000000000..f3c3b792a48b2d --- /dev/null +++ b/be/src/vec/exec/scan/scan_task_queue.h @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include "olap/tablet.h" +#include "runtime/task_group/task_group.h" +#include "util/blocking_priority_queue.hpp" + +namespace doris { +namespace vectorized { +class ScannerContext; +}; + +namespace taskgroup { + +using WorkFunction = std::function; +static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100; + +// Like PriorityThreadPool::Task +struct ScanTask { + ScanTask(); + ScanTask(WorkFunction scan_func, vectorized::ScannerContext* scanner_context, int priority); + bool operator<(const ScanTask& o) const { return priority < o.priority; } + ScanTask& operator++() { + priority += 2; + return *this; + } + + WorkFunction scan_func; + vectorized::ScannerContext* scanner_context; + int priority; +}; + +// Like pipeline::PriorityTaskQueue use BlockingPriorityQueue directly? +class ScanTaskQueue { +public: + ScanTaskQueue(); + Status try_push_back(ScanTask); + bool try_get(ScanTask* scan_task, uint32_t timeout_ms); + int size() { return _queue.get_size(); } + +private: + BlockingPriorityQueue _queue; +}; + +// Like TaskGroupTaskQueue +class ScanTaskTaskGroupQueue { +public: + explicit ScanTaskTaskGroupQueue(size_t core_size); + ~ScanTaskTaskGroupQueue(); + + void close(); + bool take(ScanTask* scan_task); + bool push_back(ScanTask); + + void update_statistics(ScanTask task, int64_t time_spent); + + void update_tg_cpu_share(const taskgroup::TaskGroupInfo&, taskgroup::TGSTEntityPtr); + +private: + TGSTEntityPtr _task_entity(ScanTask& scan_task); + void _enqueue_task_group(TGSTEntityPtr); + void _dequeue_task_group(TGSTEntityPtr); + TGSTEntityPtr _next_tg_entity(); + uint64_t _ideal_runtime_ns(TGSTEntityPtr tg_entity) const; + void _update_min_tg(); + + // Like cfs rb tree in sched_entity + struct TaskGroupSchedEntityComparator { + bool operator()(const taskgroup::TGSTEntityPtr&, const taskgroup::TGSTEntityPtr&) const; + }; + using ResouceGroupSet = std::set; + ResouceGroupSet _group_entities; + std::condition_variable _wait_task; + std::mutex _rs_mutex; + bool _closed = false; + int _total_cpu_share = 0; + std::atomic _min_tg_entity = nullptr; + uint64_t _min_tg_v_runtime_ns = 0; + size_t _core_size; +}; + +} // namespace taskgroup +} // namespace doris diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 478d9fb4cb71cd..27ed7509987e36 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -261,8 +261,10 @@ Status ScannerContext::_close_and_clear_scanners(VScanNode* node, RuntimeState* if (state->enable_profile()) { std::stringstream scanner_statistics; std::stringstream scanner_rows_read; + std::stringstream scanner_wait_worker_time; scanner_statistics << "["; scanner_rows_read << "["; + scanner_wait_worker_time << "["; for (auto finished_scanner_time : _finished_scanner_runtime) { scanner_statistics << PrettyPrinter::print(finished_scanner_time, TUnit::TIME_NS) << ", "; @@ -270,6 +272,10 @@ Status ScannerContext::_close_and_clear_scanners(VScanNode* node, RuntimeState* for (auto finished_scanner_rows : _finished_scanner_rows_read) { scanner_rows_read << PrettyPrinter::print(finished_scanner_rows, TUnit::UNIT) << ", "; } + for (auto finished_scanner_wait_time : _finished_scanner_wait_worker_time) { + scanner_wait_worker_time + << PrettyPrinter::print(finished_scanner_wait_time, TUnit::TIME_NS) << ", "; + } // Only unfinished scanners here for (auto& scanner : _scanners) { // Scanners are in ObjPool in ScanNode, @@ -279,11 +285,18 @@ Status ScannerContext::_close_and_clear_scanners(VScanNode* node, RuntimeState* << ", "; scanner_rows_read << PrettyPrinter::print(scanner->get_rows_read(), TUnit::UNIT) << ", "; + scanner_wait_worker_time + << PrettyPrinter::print(scanner->get_scanner_wait_worker_timer(), + TUnit::TIME_NS) + << ", "; } scanner_statistics << "]"; scanner_rows_read << "]"; + scanner_wait_worker_time << "]"; node->_scanner_profile->add_info_string("PerScannerRunningTime", scanner_statistics.str()); node->_scanner_profile->add_info_string("PerScannerRowsRead", scanner_rows_read.str()); + node->_scanner_profile->add_info_string("PerScannerWaitTime", + scanner_wait_worker_time.str()); } // Only unfinished scanners here for (auto& scanner : _scanners) { @@ -397,6 +410,8 @@ void ScannerContext::get_next_batch_of_scanners(std::list* current if (scanner->need_to_close()) { _finished_scanner_runtime.push_back(scanner->get_time_cost_ns()); _finished_scanner_rows_read.push_back(scanner->get_rows_read()); + _finished_scanner_wait_worker_time.push_back( + scanner->get_scanner_wait_worker_timer()); scanner->close(_state); } else { current_run->push_back(scanner); @@ -406,4 +421,8 @@ void ScannerContext::get_next_batch_of_scanners(std::list* current } } +taskgroup::TaskGroup* ScannerContext::get_task_group() const { + return _state->get_query_ctx()->get_task_group(); +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 3aad0d6263fa23..c2c8612f861992 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -41,6 +41,10 @@ class ThreadPoolToken; class RuntimeState; class TupleDescriptor; +namespace taskgroup { +class TaskGroup; +} // namespace taskgroup + namespace vectorized { class VScanner; @@ -149,6 +153,7 @@ class ScannerContext { } return thread_slot_num; } + taskgroup::TaskGroup* get_task_group() const; void reschedule_scanner_ctx(); @@ -241,6 +246,7 @@ class ScannerContext { std::list _scanners; std::vector _finished_scanner_runtime; std::vector _finished_scanner_rows_read; + std::vector _finished_scanner_wait_worker_time; const int _num_parallel_instances; diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index f63b04692fdc47..b1c783a8520ab3 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -36,6 +36,7 @@ #include "runtime/exec_env.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" +#include "scan_task_queue.h" #include "util/async_io.h" // IWYU pragma: keep #include "util/blocking_queue.hpp" #include "util/cpu_info.h" @@ -80,6 +81,7 @@ ScannerScheduler::~ScannerScheduler() { _local_scan_thread_pool->shutdown(); _remote_scan_thread_pool->shutdown(); _limited_scan_thread_pool->shutdown(); + _group_local_scan_thread_pool->shutdown(); _scheduler_pool->wait(); _local_scan_thread_pool->join(); @@ -88,6 +90,9 @@ ScannerScheduler::~ScannerScheduler() { delete _pending_queues[i]; } delete[] _pending_queues; + + _task_group_local_scan_queue->close(); + _group_local_scan_thread_pool->wait(); } Status ScannerScheduler::init(ExecEnv* env) { @@ -127,6 +132,19 @@ Status ScannerScheduler::init(ExecEnv* env) { _register_metrics(); + // 5. task group local scan + _task_group_local_scan_queue = std::make_unique( + config::doris_scanner_thread_pool_thread_num); + ThreadPoolBuilder("local_scan_group") + .set_min_threads(config::doris_scanner_thread_pool_thread_num) + .set_max_threads(config::doris_scanner_thread_pool_thread_num) + .build(&_group_local_scan_thread_pool); + for (int i = 0; i < config::doris_scanner_thread_pool_thread_num; i++) { + _group_local_scan_thread_pool->submit_func([this] { + this->_task_group_scanner_scan(this, _task_group_local_scan_queue.get()); + }); + } + _is_init = true; return Status::OK(); } @@ -219,12 +237,20 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { TabletStorageType type = (*iter)->get_storage_type(); bool ret = false; if (type == TabletStorageType::STORAGE_TYPE_LOCAL) { - PriorityThreadPool::Task task; - task.work_function = [this, scanner = *iter, ctx] { - this->_scanner_scan(this, ctx, scanner); - }; - task.priority = nice; - ret = _local_scan_thread_pool->offer(task); + if (ctx->get_task_group() && config::enable_workload_group_for_scan) { + auto work_func = [this, scanner = *iter, ctx] { + this->_scanner_scan(this, ctx, scanner); + }; + taskgroup::ScanTask scan_task = {work_func, ctx, nice}; + ret = _task_group_local_scan_queue->push_back(scan_task); + } else { + PriorityThreadPool::Task task; + task.work_function = [this, scanner = *iter, ctx] { + this->_scanner_scan(this, ctx, scanner); + }; + task.priority = nice; + ret = _local_scan_thread_pool->offer(task); + } } else { ret = _remote_scan_thread_pool->submit_func([this, scanner = *iter, ctx] { this->_scanner_scan(this, ctx, scanner); @@ -438,4 +464,20 @@ void ScannerScheduler::_deregister_metrics() { DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num); } +void ScannerScheduler::_task_group_scanner_scan(ScannerScheduler* scheduler, + taskgroup::ScanTaskTaskGroupQueue* scan_queue) { + while (!_is_closed) { + taskgroup::ScanTask scan_task; + auto success = scan_queue->take(&scan_task); + if (success) { + int64_t time_spent = 0; + { + SCOPED_RAW_TIMER(&time_spent); + scan_task.scan_func(); + } + scan_queue->update_statistics(scan_task, time_spent); + } + } +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index df81fcf8b47253..87a9498e77c980 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -21,6 +21,7 @@ #include #include "common/status.h" +#include "scan_task_queue.h" #include "util/threadpool.h" #include "vec/exec/scan/vscanner.h" @@ -30,6 +31,9 @@ class ExecEnv; namespace vectorized { class VScanner; } // namespace vectorized +namespace taskgroup { +class ScanTaskTaskGroupQueue; +} template class BlockingQueue; } // namespace doris @@ -66,6 +70,9 @@ class ScannerScheduler { std::unique_ptr new_limited_scan_pool_token(ThreadPool::ExecutionMode mode, int max_concurrency); + taskgroup::ScanTaskTaskGroupQueue* local_scan_task_queue() { + return _task_group_local_scan_queue.get(); + } int remote_thread_pool_max_size() const { return _remote_thread_pool_max_size; } @@ -81,6 +88,9 @@ class ScannerScheduler { static void _deregister_metrics(); + void _task_group_scanner_scan(ScannerScheduler* scheduler, + taskgroup::ScanTaskTaskGroupQueue* scan_queue); + // Scheduling queue number. // TODO: make it configurable. static const int QUEUE_NUM = 4; @@ -103,6 +113,9 @@ class ScannerScheduler { std::unique_ptr _remote_scan_thread_pool; std::unique_ptr _limited_scan_thread_pool; + std::unique_ptr _task_group_local_scan_queue; + std::unique_ptr _group_local_scan_thread_pool; + // true is the scheduler is closed. std::atomic_bool _is_closed = {false}; bool _is_init = false; diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index 2ee39979563c3b..c29aafafc04614 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -108,6 +108,8 @@ class VScanner { void update_wait_worker_timer() { _scanner_wait_worker_timer += _watch.elapsed_time(); } + int64_t get_scanner_wait_worker_timer() { return _scanner_wait_worker_timer; } + void update_scan_cpu_timer() { _scan_cpu_timer += _cpu_watch.elapsed_time(); } RuntimeState* runtime_state() { return _state; } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 142616f2a0a8af..f81545f3b909aa 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1581,7 +1581,7 @@ public class Config extends ConfigBase { public static boolean enable_pipeline_load = false; // enable_workload_group should be immutable and temporarily set to mutable during the development test phase - @ConfField(mutable = true, masterOnly = true, expType = ExperimentalType.EXPERIMENTAL) + @ConfField(mutable = true, expType = ExperimentalType.EXPERIMENTAL) public static boolean enable_workload_group = false; @ConfField(mutable = true) @@ -1590,9 +1590,6 @@ public class Config extends ConfigBase { @ConfField(mutable = true) public static boolean disable_shared_scan = false; - @ConfField(mutable = true, expType = ExperimentalType.EXPERIMENTAL) - public static boolean enable_cpu_hard_limit = false; - @ConfField(mutable = false, masterOnly = true) public static int backend_rpc_timeout_ms = 60000; // 1 min