Skip to content

Commit

Permalink
fix data race on MockLocalAdmissionController (#7966)
Browse files Browse the repository at this point in the history
close #7965
  • Loading branch information
SeaRise authored Aug 18, 2023
1 parent 49e21fe commit badfe24
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@ class ResourceControlQueue
public:
ResourceControlQueue()
{
RUNTIME_CHECK_MSG(
LocalAdmissionController::global_instance != nullptr,
"LocalAdmissionController::global_instance has not been initialized yet.");
LocalAdmissionController::global_instance->registerRefillTokenCallback([&]() {
std::lock_guard lock(mu);
cv.notify_all();
});
}

~ResourceControlQueue() override { LocalAdmissionController::global_instance->stop(); }
~ResourceControlQueue() override { LocalAdmissionController::global_instance->unregisterRefillTokenCallback(); }

void submit(TaskPtr && task) override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,8 @@ TEST_F(TestResourceControlQueue, TestBurstableDynamicTokenBucket)
// Test priority queue of ResourceControlQueue: Less priority value means higher priority.
TEST_F(TestResourceControlQueue, ResourceControlPriorityQueueTest)
{
setupNopLAC();

std::random_device dev;
std::mt19937 gen(dev());
std::uniform_int_distribution dist;
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Flash/Pipeline/Schedule/TaskScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Flash/Pipeline/Schedule/Tasks/Task.h>
#include <Flash/Pipeline/Schedule/ThreadPool/TaskThreadPool.h>
#include <Flash/Pipeline/Schedule/ThreadPool/TaskThreadPoolImpl.h>
#include <Flash/ResourceControl/LocalAdmissionController.h>

namespace DB
{
Expand Down Expand Up @@ -72,6 +73,11 @@ class TaskScheduler
static std::unique_ptr<TaskScheduler> instance;

private:
// LAC needs to be initialized before initializing the thread pools,
// so LAC should be placed before cpu_task_thread_pool and io_task_thread_pool,
// as ResourceGroupQueue relies on LAC.
LocalAdmissionController local_admission_controller;

TaskThreadPool<CPUImpl> cpu_task_thread_pool;

TaskThreadPool<IOImpl> io_task_thread_pool;
Expand Down
11 changes: 10 additions & 1 deletion dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,16 @@

namespace DB
{
std::unique_ptr<MockLocalAdmissionController> LocalAdmissionController::global_instance = nullptr;

auto LocalAdmissionController::global_instance = std::make_unique<MockLocalAdmissionController>();
LocalAdmissionController::LocalAdmissionController()
{
if (!global_instance)
global_instance = std::make_unique<MockLocalAdmissionController>();
}

LocalAdmissionController::~LocalAdmissionController()
{
global_instance.reset();
}
} // namespace DB
4 changes: 4 additions & 0 deletions dbms/src/Flash/ResourceControl/LocalAdmissionController.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,5 +171,9 @@ class LocalAdmissionController final : private boost::noncopyable
static bool isRUExhausted(uint64_t priority) { return priority == std::numeric_limits<uint64_t>::max(); }

static std::unique_ptr<MockLocalAdmissionController> global_instance;

LocalAdmissionController();

~LocalAdmissionController();
};
} // namespace DB
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ void MockLocalAdmissionController::refillTokenBucket()
}
}

std::lock_guard lock(call_back_mutex);
if (refill_token_callback)
refill_token_callback();
}
Expand Down
38 changes: 23 additions & 15 deletions dbms/src/Flash/ResourceControl/MockLocalAdmissionController.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@

#pragma once

#include <Common/Exception.h>
#include <Common/ThreadManager.h>
#include <Flash/Executor/toRU.h>

#include <mutex>
#include <thread>
#include <unordered_map>

namespace DB
{
Expand Down Expand Up @@ -51,16 +54,28 @@ class MockLocalAdmissionController final : private boost::noncopyable
using GetPriorityFuncType = uint64_t (*)(const std::string &);
using IsResourceGroupThrottledFuncType = bool (*)(const std::string &);

void consumeResource(const std::string & name, double ru, uint64_t cpu_time_ns)
void consumeResource(const std::string & name, double ru, uint64_t cpu_time_ns) const
{
consume_resource_func(name, ru, cpu_time_ns);
}

uint64_t getPriority(const std::string & name) { return get_priority_func(name); }
uint64_t getPriority(const std::string & name) const { return get_priority_func(name); }

bool isResourceGroupThrottled(const std::string & name) { return is_resource_group_throttled_func(name); }
bool isResourceGroupThrottled(const std::string & name) const { return is_resource_group_throttled_func(name); }

void registerRefillTokenCallback(const std::function<void()> & cb) { refill_token_callback = cb; }
void registerRefillTokenCallback(const std::function<void()> & cb)
{
std::lock_guard lock(call_back_mutex);
RUNTIME_CHECK_MSG(refill_token_callback == nullptr, "callback cannot be registered multiple times");
refill_token_callback = cb;
}

void unregisterRefillTokenCallback()
{
std::lock_guard lock(call_back_mutex);
RUNTIME_CHECK_MSG(refill_token_callback != nullptr, "callback cannot be nullptr before unregistering");
refill_token_callback = nullptr;
}

void stop()
{
Expand All @@ -76,15 +91,6 @@ class MockLocalAdmissionController final : private boost::noncopyable

void refillTokenBucket();

void resetAll()
{
resource_groups.clear();
consume_resource_func = nullptr;
get_priority_func = nullptr;
is_resource_group_throttled_func = nullptr;
max_ru_per_sec = 0;
}

std::string dump() const;

mutable std::mutex mu;
Expand All @@ -97,8 +103,10 @@ class MockLocalAdmissionController final : private boost::noncopyable

uint64_t max_ru_per_sec = 0;
bool stopped = false;
std::function<void()> refill_token_callback;

std::mutex call_back_mutex;
std::function<void()> refill_token_callback{nullptr};

std::thread refill_token_thread;
};

} // namespace DB

0 comments on commit badfe24

Please sign in to comment.