diff --git a/dbms/src/Flash/Pipeline/Schedule/TaskQueues/ResourceControlQueue.h b/dbms/src/Flash/Pipeline/Schedule/TaskQueues/ResourceControlQueue.h index eaae7ab7de4..a75730e2caa 100644 --- a/dbms/src/Flash/Pipeline/Schedule/TaskQueues/ResourceControlQueue.h +++ b/dbms/src/Flash/Pipeline/Schedule/TaskQueues/ResourceControlQueue.h @@ -31,13 +31,16 @@ class ResourceControlQueue public: ResourceControlQueue() { + RUNTIME_CHECK_MSG( + LocalAdmissionController::global_instance != nullptr, + "LocalAdmissionController::global_instance has not been initialized yet."); LocalAdmissionController::global_instance->registerRefillTokenCallback([&]() { std::lock_guard lock(mu); cv.notify_all(); }); } - ~ResourceControlQueue() override { LocalAdmissionController::global_instance->stop(); } + ~ResourceControlQueue() override { LocalAdmissionController::global_instance->unregisterRefillTokenCallback(); } void submit(TaskPtr && task) override; diff --git a/dbms/src/Flash/Pipeline/Schedule/TaskQueues/tests/gtest_resource_control_queue.cpp b/dbms/src/Flash/Pipeline/Schedule/TaskQueues/tests/gtest_resource_control_queue.cpp index 375f6ef58cc..a1e6290d53f 100644 --- a/dbms/src/Flash/Pipeline/Schedule/TaskQueues/tests/gtest_resource_control_queue.cpp +++ b/dbms/src/Flash/Pipeline/Schedule/TaskQueues/tests/gtest_resource_control_queue.cpp @@ -534,6 +534,8 @@ TEST_F(TestResourceControlQueue, TestBurstableDynamicTokenBucket) // Test priority queue of ResourceControlQueue: Less priority value means higher priority. TEST_F(TestResourceControlQueue, ResourceControlPriorityQueueTest) { + setupNopLAC(); + std::random_device dev; std::mt19937 gen(dev()); std::uniform_int_distribution dist; diff --git a/dbms/src/Flash/Pipeline/Schedule/TaskScheduler.h b/dbms/src/Flash/Pipeline/Schedule/TaskScheduler.h index 482669a3c04..c5f0732cf41 100644 --- a/dbms/src/Flash/Pipeline/Schedule/TaskScheduler.h +++ b/dbms/src/Flash/Pipeline/Schedule/TaskScheduler.h @@ -19,6 +19,7 @@ #include #include #include +#include namespace DB { @@ -72,6 +73,11 @@ class TaskScheduler static std::unique_ptr instance; private: + // LAC needs to be initialized before initializing the thread pools, + // so LAC should be placed before cpu_task_thread_pool and io_task_thread_pool, + // as ResourceGroupQueue relies on LAC. + LocalAdmissionController local_admission_controller; + TaskThreadPool cpu_task_thread_pool; TaskThreadPool io_task_thread_pool; diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp b/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp index 463de2ae7df..132c86bdb98 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp @@ -16,7 +16,16 @@ namespace DB { +std::unique_ptr LocalAdmissionController::global_instance = nullptr; -auto LocalAdmissionController::global_instance = std::make_unique(); +LocalAdmissionController::LocalAdmissionController() +{ + if (!global_instance) + global_instance = std::make_unique(); +} +LocalAdmissionController::~LocalAdmissionController() +{ + global_instance.reset(); +} } // namespace DB diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h index c4e50ba7411..ad8d83eeeca 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h @@ -171,5 +171,9 @@ class LocalAdmissionController final : private boost::noncopyable static bool isRUExhausted(uint64_t priority) { return priority == std::numeric_limits::max(); } static std::unique_ptr global_instance; + + LocalAdmissionController(); + + ~LocalAdmissionController(); }; } // namespace DB diff --git a/dbms/src/Flash/ResourceControl/MockLocalAdmissionController.cpp b/dbms/src/Flash/ResourceControl/MockLocalAdmissionController.cpp index ade72fb9708..fd81baa8cae 100644 --- a/dbms/src/Flash/ResourceControl/MockLocalAdmissionController.cpp +++ b/dbms/src/Flash/ResourceControl/MockLocalAdmissionController.cpp @@ -38,6 +38,7 @@ void MockLocalAdmissionController::refillTokenBucket() } } + std::lock_guard lock(call_back_mutex); if (refill_token_callback) refill_token_callback(); } diff --git a/dbms/src/Flash/ResourceControl/MockLocalAdmissionController.h b/dbms/src/Flash/ResourceControl/MockLocalAdmissionController.h index 9c1cab40da2..d8b1bd74e10 100644 --- a/dbms/src/Flash/ResourceControl/MockLocalAdmissionController.h +++ b/dbms/src/Flash/ResourceControl/MockLocalAdmissionController.h @@ -14,10 +14,13 @@ #pragma once +#include #include #include +#include #include +#include namespace DB { @@ -51,16 +54,28 @@ class MockLocalAdmissionController final : private boost::noncopyable using GetPriorityFuncType = uint64_t (*)(const std::string &); using IsResourceGroupThrottledFuncType = bool (*)(const std::string &); - void consumeResource(const std::string & name, double ru, uint64_t cpu_time_ns) + void consumeResource(const std::string & name, double ru, uint64_t cpu_time_ns) const { consume_resource_func(name, ru, cpu_time_ns); } - uint64_t getPriority(const std::string & name) { return get_priority_func(name); } + uint64_t getPriority(const std::string & name) const { return get_priority_func(name); } - bool isResourceGroupThrottled(const std::string & name) { return is_resource_group_throttled_func(name); } + bool isResourceGroupThrottled(const std::string & name) const { return is_resource_group_throttled_func(name); } - void registerRefillTokenCallback(const std::function & cb) { refill_token_callback = cb; } + void registerRefillTokenCallback(const std::function & cb) + { + std::lock_guard lock(call_back_mutex); + RUNTIME_CHECK_MSG(refill_token_callback == nullptr, "callback cannot be registered multiple times"); + refill_token_callback = cb; + } + + void unregisterRefillTokenCallback() + { + std::lock_guard lock(call_back_mutex); + RUNTIME_CHECK_MSG(refill_token_callback != nullptr, "callback cannot be nullptr before unregistering"); + refill_token_callback = nullptr; + } void stop() { @@ -76,15 +91,6 @@ class MockLocalAdmissionController final : private boost::noncopyable void refillTokenBucket(); - void resetAll() - { - resource_groups.clear(); - consume_resource_func = nullptr; - get_priority_func = nullptr; - is_resource_group_throttled_func = nullptr; - max_ru_per_sec = 0; - } - std::string dump() const; mutable std::mutex mu; @@ -97,8 +103,10 @@ class MockLocalAdmissionController final : private boost::noncopyable uint64_t max_ru_per_sec = 0; bool stopped = false; - std::function refill_token_callback; + + std::mutex call_back_mutex; + std::function refill_token_callback{nullptr}; + std::thread refill_token_thread; }; - } // namespace DB