Skip to content

Commit

Permalink
Pipeline: support io priority queue (#7722)
Browse files Browse the repository at this point in the history
ref #6518
  • Loading branch information
SeaRise authored Jul 3, 2023
1 parent 834389a commit f8943e7
Show file tree
Hide file tree
Showing 31 changed files with 474 additions and 81 deletions.
10 changes: 6 additions & 4 deletions dbms/src/Flash/Pipeline/Exec/PipelineExec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ extern const char random_pipeline_model_execute_suffix_failpoint[];
case (expect_status): \
break; \
/* For the io status, the operator needs to be filled in io_op for later use in executeIO. */ \
case OperatorStatus::IO: \
case OperatorStatus::IO_IN: \
case OperatorStatus::IO_OUT: \
fillIOOp((op).get()); \
return (op_status); \
/* For the waiting status, the operator needs to be filled in awaitable for later use in await. */ \
Expand All @@ -50,7 +51,8 @@ extern const char random_pipeline_model_execute_suffix_failpoint[];
switch (op_status) \
{ \
/* For the io status, the operator needs to be filled in io_op for later use in executeIO. */ \
case OperatorStatus::IO: \
case OperatorStatus::IO_IN: \
case OperatorStatus::IO_OUT: \
fillIOOp((op).get()); \
return (op_status); \
/* For the waiting status, the operator needs to be filled in awaitable for later use in await. */ \
Expand Down Expand Up @@ -160,7 +162,7 @@ OperatorStatus PipelineExec::executeIOImpl()
auto op_status = io_op->executeIO();
if (op_status == OperatorStatus::WAITING)
fillAwaitable(io_op);
if (op_status != OperatorStatus::IO)
if (op_status != OperatorStatus::IO_IN && op_status != OperatorStatus::IO_OUT)
io_op = nullptr;
return op_status;
}
Expand All @@ -179,7 +181,7 @@ OperatorStatus PipelineExec::awaitImpl()
{
assert(awaitable);
auto op_status = awaitable->await();
if (op_status == OperatorStatus::IO)
if (op_status == OperatorStatus::IO_IN || op_status == OperatorStatus::IO_OUT)
fillIOOp(awaitable);
if (op_status != OperatorStatus::WAITING)
awaitable = nullptr;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Pipeline/Schedule/Events/tests/gtest_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ class TestPorfileTask : public EventTask
cpu_execute_time += per_execute_time;
return ExecTaskStatus::RUNNING;
}
return ExecTaskStatus::IO;
return ExecTaskStatus::IO_IN;
}

ExecTaskStatus doExecuteIOImpl() override
Expand All @@ -347,7 +347,7 @@ class TestPorfileTask : public EventTask
{
std::this_thread::sleep_for(std::chrono::nanoseconds(per_execute_time));
io_execute_time += per_execute_time;
return ExecTaskStatus::IO;
return ExecTaskStatus::IO_IN;
}
return ExecTaskStatus::WAITING;
}
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Pipeline/Schedule/Reactor/WaitReactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ bool WaitReactor::awaitAndCollectReadyTask(WaitingTask && task)
task_ptr->profile_info.elapsedAwaitTime();
cpu_tasks.push_back(std::move(task.first));
return true;
case ExecTaskStatus::IO:
case ExecTaskStatus::IO_IN:
case ExecTaskStatus::IO_OUT:
task_ptr->profile_info.elapsedAwaitTime();
io_tasks.push_back(std::move(task.first));
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class FIFOTaskQueue : public TaskQueue

bool take(TaskPtr & task) override;

void updateStatistics(const TaskPtr &, size_t) override {}
void updateStatistics(const TaskPtr &, ExecTaskStatus, UInt64) override {}

bool empty() const override;

Expand Down
130 changes: 130 additions & 0 deletions dbms/src/Flash/Pipeline/Schedule/TaskQueues/IOPriorityQueue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Pipeline/Schedule/TaskQueues/IOPriorityQueue.h>
#include <Flash/Pipeline/Schedule/Tasks/TaskHelper.h>
#include <common/likely.h>

namespace DB
{
IOPriorityQueue::~IOPriorityQueue()
{
RUNTIME_ASSERT(io_in_task_queue.empty(), logger, "all task should be taken before it is destructed");
RUNTIME_ASSERT(io_out_task_queue.empty(), logger, "all task should be taken before it is destructed");
}

bool IOPriorityQueue::take(TaskPtr & task)
{
std::unique_lock lock(mu);
while (true)
{
bool io_out_first = ratio_of_out_to_in * total_io_in_time_microsecond >= total_io_out_time_microsecond;
auto & first_queue = io_out_first ? io_out_task_queue : io_in_task_queue;
auto & next_queue = io_out_first ? io_in_task_queue : io_out_task_queue;
if (!first_queue.empty())
{
task = std::move(first_queue.front());
first_queue.pop_front();
return true;
}
if (!next_queue.empty())
{
task = std::move(next_queue.front());
next_queue.pop_front();
return true;
}
if (unlikely(is_finished))
return false;
cv.wait(lock);
}
}

void IOPriorityQueue::updateStatistics(const TaskPtr &, ExecTaskStatus exec_task_status, UInt64 inc_ns)
{
switch (exec_task_status)
{
case ExecTaskStatus::IO_IN:
total_io_in_time_microsecond += (inc_ns / 1000);
break;
case ExecTaskStatus::IO_OUT:
total_io_out_time_microsecond += (inc_ns / 1000);
break;
default:; // ignore not io status.
}
}

bool IOPriorityQueue::empty() const
{
std::lock_guard lock(mu);
return io_out_task_queue.empty() && io_in_task_queue.empty();
}

void IOPriorityQueue::finish()
{
{
std::lock_guard lock(mu);
is_finished = true;
}
cv.notify_all();
}

void IOPriorityQueue::submitTaskWithoutLock(TaskPtr && task)
{
auto status = task->getStatus();
switch (status)
{
case ExecTaskStatus::IO_IN:
io_in_task_queue.push_back(std::move(task));
break;
case ExecTaskStatus::IO_OUT:
io_out_task_queue.push_back(std::move(task));
break;
default:
throw Exception(fmt::format("Unexpected status: {}, IOPriorityQueue only accepts tasks with IO status", magic_enum::enum_name(status)));
}
}

void IOPriorityQueue::submit(TaskPtr && task)
{
if unlikely (is_finished)
{
FINALIZE_TASK(task);
return;
}

{
std::lock_guard lock(mu);
submitTaskWithoutLock(std::move(task));
}
cv.notify_one();
}

void IOPriorityQueue::submit(std::vector<TaskPtr> & tasks)
{
if unlikely (is_finished)
{
FINALIZE_TASKS(tasks);
return;
}

if (tasks.empty())
return;
std::lock_guard lock(mu);
for (auto & task : tasks)
{
submitTaskWithoutLock(std::move(task));
cv.notify_one();
}
}
} // namespace DB
63 changes: 63 additions & 0 deletions dbms/src/Flash/Pipeline/Schedule/TaskQueues/IOPriorityQueue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Flash/Pipeline/Schedule/TaskQueues/TaskQueue.h>

#include <deque>
#include <mutex>

namespace DB
{
/// The queue only used by io thread pool.
/// In IOPriorityQueue, the priority of io_out is higher than io_in, which means the ratio of the total execution time of io_out to io_in is `ratio_of_in_to_out`:1.
/// Because the IO_OUT task usually writes the data in the memory to the external storage and releases the occupied memory,
/// while the IO_IN task usually reads the data from the external storage into the memory and occupies the memory.
/// Prioritizing the execution of IO_OUT tasks can effectively reduce the memory usage.
class IOPriorityQueue : public TaskQueue
{
public:
// // The ratio of total execution time between io_in and io_out is 3:1.
static constexpr size_t ratio_of_out_to_in = 3;

~IOPriorityQueue() override;

void submit(TaskPtr && task) override;

void submit(std::vector<TaskPtr> & tasks) override;

bool take(TaskPtr & task) override;

void updateStatistics(const TaskPtr &, ExecTaskStatus exec_task_status, UInt64 inc_ns) override;

bool empty() const override;

void finish() override;

private:
void submitTaskWithoutLock(TaskPtr && task);

private:
mutable std::mutex mu;
std::condition_variable cv;
std::atomic_bool is_finished = false;

std::deque<TaskPtr> io_in_task_queue;
std::atomic_uint64_t total_io_in_time_microsecond{0};

std::deque<TaskPtr> io_out_task_queue;
std::atomic_uint64_t total_io_out_time_microsecond{0};
};
} // namespace DB
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ void UnitQueue::submit(TaskPtr && task)
task_queue.push_back(std::move(task));
}

double UnitQueue::normalizedTime()
double UnitQueue::normalizedTimeMicrosecond()
{
return accu_consume_time / info.factor_for_normal;
return accu_consume_time_microsecond / info.factor_for_normal;
}

template <typename TimeGetter>
Expand Down Expand Up @@ -142,7 +142,7 @@ bool MultiLevelFeedbackQueue<TimeGetter>::take(TaskPtr & task)
{
// -1 means no candidates; else has candidate.
int queue_idx = -1;
double target_accu_time = 0;
double target_accu_time_microsecond = 0;
std::unique_lock lock(mu);
while (true)
{
Expand All @@ -153,10 +153,10 @@ bool MultiLevelFeedbackQueue<TimeGetter>::take(TaskPtr & task)
const auto & cur_queue = level_queues[i];
if (!cur_queue->empty())
{
double local_target_time = cur_queue->normalizedTime();
if (queue_idx < 0 || local_target_time < target_accu_time)
double local_target_time_microsecond = cur_queue->normalizedTimeMicrosecond();
if (queue_idx < 0 || local_target_time_microsecond < target_accu_time_microsecond)
{
target_accu_time = local_target_time;
target_accu_time_microsecond = local_target_time_microsecond;
queue_idx = i;
}
}
Expand All @@ -176,10 +176,10 @@ bool MultiLevelFeedbackQueue<TimeGetter>::take(TaskPtr & task)
}

template <typename TimeGetter>
void MultiLevelFeedbackQueue<TimeGetter>::updateStatistics(const TaskPtr & task, size_t inc_value)
void MultiLevelFeedbackQueue<TimeGetter>::updateStatistics(const TaskPtr & task, ExecTaskStatus, UInt64 inc_ns)
{
assert(task);
level_queues[task->mlfq_level]->accu_consume_time += inc_value;
level_queues[task->mlfq_level]->accu_consume_time_microsecond += (inc_ns / 1000);
}

template <typename TimeGetter>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ struct UnitQueueInfo
UInt64 time_slice;

// factor for normalization.
// The priority value is equal to `accu_consume_time / factor_for_normal`.
// The priority value is equal to `accu_consume_time_microsecond / factor_for_normal`.
// The smaller the value, the higher the priority.
// Therefore, the higher the priority of the queue, the larger the value of factor_for_normal.
double factor_for_normal;
Expand All @@ -81,11 +81,11 @@ class UnitQueue

bool empty() const;

double normalizedTime();
double normalizedTimeMicrosecond();

public:
const UnitQueueInfo info;
std::atomic_uint64_t accu_consume_time{0};
std::atomic_uint64_t accu_consume_time_microsecond{0};

private:
std::deque<TaskPtr> task_queue;
Expand All @@ -106,7 +106,7 @@ class MultiLevelFeedbackQueue : public TaskQueue

bool take(TaskPtr & task) override;

void updateStatistics(const TaskPtr & task, size_t inc_value) override;
void updateStatistics(const TaskPtr & task, ExecTaskStatus, UInt64 inc_ns) override;

bool empty() const override;

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Pipeline/Schedule/TaskQueues/TaskQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class TaskQueue

// Update the execution metrics of the task taken from the queue.
// Used to adjust the priority of tasks within a queue.
virtual void updateStatistics(const TaskPtr & task, size_t inc_value) = 0;
virtual void updateStatistics(const TaskPtr & task, ExecTaskStatus exec_task_status, UInt64 inc_ns) = 0;

virtual bool empty() const = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ enum class TaskQueueType
DEFAULT, // Determined internally by the task thread pool.
FIFO, // fifo queue
MLFQ, // multi-level feedback queue
IO_PRIORITY, // io priority queue
};
} // namespace DB
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ class IndexTask : public Task
};
} // namespace

class FIFOTestRunner : public ::testing::Test
class TestFIFOTaskQueue : public ::testing::Test
{
};

TEST_F(FIFOTestRunner, base)
TEST_F(TestFIFOTaskQueue, base)
try
{
FIFOTaskQueue queue;
Expand Down
Loading

0 comments on commit f8943e7

Please sign in to comment.