Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the tasks in bkg pool may be called more frequently than expected #6910

Merged
merged 6 commits into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 87 additions & 85 deletions dbms/src/Storages/BackgroundProcessingPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,58 @@ BackgroundProcessingPool::~BackgroundProcessingPool()
}
}

// Try to pop a task from the pool that is ready for execution.
// For task->multi == false, it ensure the task is only pop for one execution threads.
// For task->multi == true, it may pop the task multiple times.
// Return nullptr when the pool is shutting down.
BackgroundProcessingPool::TaskHandle BackgroundProcessingPool::tryPopTask(pcg64 & rng) noexcept
{
TaskHandle task;
Poco::Timestamp min_time;

std::unique_lock lock(tasks_mutex);

void BackgroundProcessingPool::threadFunction(size_t thread_idx)
while (!task && !shutdown)
{
for (const auto & [task_time, task_handle] : tasks)
{
// find the first coming task that no thread is running
// or can be run by multithreads
if (!task_handle->removed
&& (task_handle->concurrent_executors == 0 || task_handle->multi))
{
min_time = task_time;
task = task_handle;
// add the counter to indicate this task is running by one more thread
task->concurrent_executors += 1;
break;
}
}

if (!task)
{
/// No tasks ready for execution, wait for a while and check again
wake_event.wait_for(lock,
std::chrono::duration<double>(
sleep_seconds + std::uniform_real_distribution<double>(0, sleep_seconds_random_part)(rng)));
continue;
}

Poco::Timestamp current_time;
if (min_time > current_time)
{
// The coming task is not ready for execution yet, wait for a while
wake_event.wait_for(lock,
std::chrono::microseconds(
min_time - current_time + std::uniform_int_distribution<uint64_t>(0, sleep_seconds_random_part * 1000000)(rng)));
}
// here task != nullptr and is ready for execution
return task;
}
return task;
}

void BackgroundProcessingPool::threadFunction(size_t thread_idx) noexcept
{
{
const auto name = thread_prefix + std::to_string(thread_idx);
Expand All @@ -154,6 +204,7 @@ void BackgroundProcessingPool::threadFunction(size_t thread_idx)
addThreadId(getTid());
}

// set up the thread local memory tracker
auto memory_tracker = MemoryTracker::create();
memory_tracker->setNext(root_of_non_query_mem_trackers.get());
memory_tracker->setMetric(CurrentMetrics::MemoryTrackingInBackgroundProcessingPool);
Expand All @@ -164,113 +215,64 @@ void BackgroundProcessingPool::threadFunction(size_t thread_idx)

while (!shutdown)
{
TaskHandle task;
// The time to sleep before running next task, `sleep_seconds` by default.
Poco::Timespan next_sleep_time_span(sleep_seconds, 0);
TaskHandle task = tryPopTask(rng);
if (shutdown)
break;
// not shutting down but a null task pop, should not happen
if (task == nullptr)
{
LOG_ERROR(Logger::get(), "a null task has been pop!");
continue;
}

bool done_work = false;
try
{
Poco::Timestamp min_time;

{
std::unique_lock lock(tasks_mutex);

if (!tasks.empty())
{
for (const auto & time_handle : tasks)
{
if (!time_handle.second->removed)
{
min_time = time_handle.first;
task = time_handle.second;
break;
}
}
}
}

if (shutdown)
break;

if (!task)
{
std::unique_lock lock(tasks_mutex);
wake_event.wait_for(lock,
std::chrono::duration<double>(
sleep_seconds + std::uniform_real_distribution<double>(0, sleep_seconds_random_part)(rng)));
continue;
}

/// No tasks ready for execution.
Poco::Timestamp current_time;
if (min_time > current_time)
{
std::unique_lock lock(tasks_mutex);
wake_event.wait_for(lock,
std::chrono::microseconds(
min_time - current_time + std::uniform_int_distribution<uint64_t>(0, sleep_seconds_random_part * 1000000)(rng)));
}

std::shared_lock<std::shared_mutex> rlock(task->rwlock);

if (task->removed)
continue;

{
bool done_work = false;
if (!task->multi)
{
bool expected = false;
if (task->occupied == expected && task->occupied.compare_exchange_strong(expected, true))
{
done_work = task->function();
task->occupied = false;
}
else
done_work = false;
}
else
done_work = task->function();

/// If task has done work, it could be executed again immediately.
/// If not, add delay before next run.
if (done_work)
{
next_sleep_time_span = 0;
}
else if (task->interval_milliseconds != 0)
{
// Update `next_sleep_time_span` by user-defined interval if the later one is non-zero
next_sleep_time_span = Poco::Timespan(0, /*microseconds=*/task->interval_milliseconds * 1000);
}
// else `sleep_seconds` by default
}
done_work = task->function();
}
catch (...)
{
if (task && !task->multi)
{
std::unique_lock<std::shared_mutex> wlock(task->rwlock);
task->occupied = false;
}

tryLogCurrentException(__PRETTY_FUNCTION__);
}

if (shutdown)
break;

/// If task has done work, it could be executed again immediately.
/// If not, add delay before next run.
Poco::Timestamp next_time_to_execute = Poco::Timestamp() + next_sleep_time_span;
// Get the time to sleep before the task in the next run.
// - If task has done work, it could be executed again immediately.
// - If not, add delay before next run.
const auto next_sleep_time_span = [](bool done_work, const TaskHandle & t) {
if (done_work)
{
return Poco::Timespan(0, 0);
}
else if (t->interval_milliseconds != 0)
{
// Update `next_sleep_time_span` by user-defined interval if the later one is non-zero
return Poco::Timespan(0, /*microseconds=*/t->interval_milliseconds * 1000);
}
else
{
// else `sleep_seconds` by default
return Poco::Timespan(sleep_seconds, 0);
}
}(done_work, task);

{
std::unique_lock lock(tasks_mutex);

// the task has been done in this thread
task->concurrent_executors -= 1;

if (task->removed)
continue;

// reschedule this task
tasks.erase(task->iterator);
Poco::Timestamp next_time_to_execute = Poco::Timestamp() + next_sleep_time_span;
task->iterator = tasks.emplace(next_time_to_execute, task);
}
}
Expand Down
36 changes: 21 additions & 15 deletions dbms/src/Storages/BackgroundProcessingPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <list>
#include <map>
#include <mutex>
#include <pcg_random.hpp>
#include <set>
#include <shared_mutex>
#include <thread>
Expand All @@ -43,6 +44,7 @@ class BackgroundProcessingPool
{
public:
/// Returns true, if some useful work was done. In that case, thread will not sleep before next run of this task.
/// Returns false, the next time will be done later.
using Task = std::function<bool()>;


Expand All @@ -69,10 +71,13 @@ class BackgroundProcessingPool
std::shared_mutex rwlock;
std::atomic<bool> removed{false};

/// only can be invoked by one thread at same time.
// multi=true, can be run by multiple threads concurrently
// multi=false, only run on one thread
const bool multi;
std::atomic_bool occupied{false};
// The number of worker threads is going to execute this task
size_t concurrent_executors = 0;

// User defined execution interval
const uint64_t interval_milliseconds;

std::multimap<Poco::Timestamp, std::shared_ptr<TaskInfo>>::iterator iterator;
Expand All @@ -85,17 +90,15 @@ class BackgroundProcessingPool

size_t getNumberOfThreads() const { return size; }

/// if multi == false, this task can only be called by one thread at same time.
/// If interval_ms is zero, this task will be scheduled with `sleep_seconds`.
/// If interval_ms is not zero, this task will be scheduled with `interval_ms`.
///
/// But at each scheduled time, there may be multiple threads try to run the same task,
/// and then execute the same task one by one in sequential order(not simultaneously) even if `multi` is false.
/// For example, consider the following case when it's time to schedule a task,
/// 1. thread A get the task, mark the task as occupied and begin to execute it
/// 2. thread B also get the same task
/// 3. thread A finish the execution of the task quickly, release the task and try to update the next schedule time of the task
/// 4. thread B find the task is not occupied and execute the task again almost immediately
/// task
/// - A function return bool.
/// - Returning true mean some useful work was done. In that case, thread will not sleep before next run of this task.
/// - Returning false, the next time will be done later.
/// multi
/// - If multi == false, this task can only be executed by one thread within each scheduled time.
/// interval_ms
/// - If interval_ms is zero, this task will be scheduled with `sleep_seconds`.
/// - If interval_ms is not zero, this task will be scheduled with `interval_ms`.
TaskHandle addTask(const Task & task, bool multi = true, size_t interval_ms = 0);
void removeTask(const TaskHandle & task);

Expand All @@ -104,6 +107,11 @@ class BackgroundProcessingPool
std::vector<pid_t> getThreadIds();
void addThreadId(pid_t tid);

private:
void threadFunction(size_t thread_idx) noexcept;

TaskHandle tryPopTask(pcg64 & rng) noexcept;

private:
using Tasks = std::multimap<Poco::Timestamp, TaskHandle>; /// key is desired next time to execute (priority).
using Threads = std::vector<std::thread>;
Expand All @@ -123,8 +131,6 @@ class BackgroundProcessingPool

std::atomic<bool> shutdown{false};
std::condition_variable wake_event;

void threadFunction(size_t thread_idx);
};

using BackgroundProcessingPoolPtr = std::shared_ptr<BackgroundProcessingPool>;
Expand Down
74 changes: 74 additions & 0 deletions dbms/src/Storages/tests/gtest_bkg_pool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Logger.h>
#include <Storages/BackgroundProcessingPool.h>
#include <common/logger_useful.h>
#include <gtest/gtest.h>

#include <chrono>
#include <limits>
#include <thread>

namespace DB::tests
{

TEST(BackgroundProcessingPoolTest, FixedInterval)
{
BackgroundProcessingPool pool(10, "test");

using Clock = std::chrono::system_clock;
using TimePoint = std::chrono::time_point<Clock>;


using namespace std::chrono_literals;
const auto sleep_seconds = 10s;
const Int64 expect_interval_ms = 2 * 1000;
const auto num_expect_called = 5;

Int64 num_actual_called = 0;
TimePoint last_update_timepoint = Clock::now();
Int64 min_diff_ms = std::numeric_limits<Int64>::max();
Int64 max_diff_ms = 0;
auto task = pool.addTask(
[&]() {
num_actual_called += 1;
if (num_actual_called != 1)
{
auto diff_ms = std::chrono::duration_cast<std::chrono::milliseconds>(Clock::now() - last_update_timepoint).count();
if (diff_ms < expect_interval_ms / 2)
{
LOG_ERROR(Logger::get(), "Unexpected frequent call, actual interval={}ms", diff_ms);
}
min_diff_ms = std::min(min_diff_ms, diff_ms);
max_diff_ms = std::max(max_diff_ms, diff_ms);
}

last_update_timepoint = Clock::now();
return false; // expected to be run n a fixed interval
},
/*multi*/ false,
expect_interval_ms);

std::this_thread::sleep_for(sleep_seconds);

pool.removeTask(task);

LOG_INFO(Logger::get(), "actual being called for {} times, min_diff={} max_diff={}", num_actual_called, min_diff_ms, max_diff_ms);
ASSERT_TRUE(num_expect_called - 1 <= num_actual_called
&& num_actual_called <= num_expect_called + 1)
<< fmt::format("actual_called={} min_diff_ms={}", num_actual_called, min_diff_ms);
}

} // namespace DB::tests