Skip to content

Commit

Permalink
Storage: Avoid wasting CPU with inefficient scheduling in SegmentRead…
Browse files Browse the repository at this point in the history
…TaskScheduler (#6496)

* Fix useless scheduling.

* ci

* format

* ci

* Refine

* add comment

* fix

Co-authored-by: JaySon <tshent@qq.com>
Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
3 people authored Jan 12, 2023
1 parent a795860 commit 9c3181d
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 29 deletions.
6 changes: 4 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,8 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
std::move(tasks),
after_segment_read,
req_info,
enable_read_thread);
enable_read_thread,
final_num_stream);

BlockInputStreams res;
for (size_t i = 0; i < final_num_stream; ++i)
Expand Down Expand Up @@ -1003,7 +1004,8 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
std::move(tasks),
after_segment_read,
log_tracing_id,
enable_read_thread);
enable_read_thread,
final_num_stream);

BlockInputStreams res;
for (size_t i = 0; i < final_num_stream; ++i)
Expand Down
13 changes: 13 additions & 0 deletions dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,17 @@ void MergedTaskPool::push(const MergedTaskPtr & t)
std::lock_guard lock(mtx);
merged_task_pool.push_back(t);
}

bool MergedTaskPool::has(UInt64 pool_id)
{
std::lock_guard lock(mtx);
for (const auto & t : merged_task_pool)
{
if (t->containPool(pool_id))
{
return true;
}
}
return false;
}
} // namespace DB::DM
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ class MergedTaskPool

MergedTaskPtr pop(uint64_t pool_id);
void push(const MergedTaskPtr & t);
bool has(UInt64 pool_id);

private:
std::mutex mtx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,21 @@ SegmentReadTaskPools SegmentReadTaskScheduler::getPoolsUnlock(const std::vector<
return pools;
}

bool SegmentReadTaskScheduler::needScheduleToRead(const SegmentReadTaskPoolPtr & pool)
{
return pool->getFreeBlockSlots() > 0 && // Block queue is not full and
(merged_task_pool.has(pool->poolId()) || // can schedule a segment from MergedTaskPool or
pool->getFreeActiveSegments() > 0); // schedule a new segment.
}

SegmentReadTaskPoolPtr SegmentReadTaskScheduler::scheduleSegmentReadTaskPoolUnlock()
{
int64_t pool_count = read_pools.size(); // All read task pool need to be scheduled, including invalid read task pool.
for (int64_t i = 0; i < pool_count; i++)
{
auto pool = read_pools.next();
// If pool->getFreeBlockSlots() > 0, schedule it for read blocks.
// If !pool->valid(), schedule it for clean MergedTaskPool.
if (pool != nullptr && (pool->getFreeBlockSlots() > 0 || !pool->valid()))
if (pool != nullptr && (needScheduleToRead(pool) || !pool->valid()))
{
return pool;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class SegmentReadTaskScheduler
bool isStop() const;
bool schedule();
void schedLoop();

bool needScheduleToRead(const SegmentReadTaskPoolPtr & pool);
SegmentReadTaskPools getPoolsUnlock(const std::vector<uint64_t> & pool_ids);
// <seg_id, pool_ids>
std::optional<std::pair<uint64_t, std::vector<uint64_t>>> scheduleSegmentUnlock(const SegmentReadTaskPoolPtr & pool);
Expand Down
30 changes: 13 additions & 17 deletions dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ std::unordered_map<uint64_t, std::vector<uint64_t>>::const_iterator SegmentReadT
{
auto target = segments.end();
std::lock_guard lock(mutex);
if (getFreeActiveSegmentCountUnlock() <= 0)
if (getFreeActiveSegmentsUnlock() <= 0)
{
return target;
}
Expand Down Expand Up @@ -269,33 +269,29 @@ void SegmentReadTaskPool::pushBlock(Block && block)
q.push(std::move(block), nullptr);
}

int64_t SegmentReadTaskPool::increaseUnorderedInputStreamRefCount()
Int64 SegmentReadTaskPool::increaseUnorderedInputStreamRefCount()
{
return unordered_input_stream_ref_count.fetch_add(1, std::memory_order_relaxed);
}
int64_t SegmentReadTaskPool::decreaseUnorderedInputStreamRefCount()
Int64 SegmentReadTaskPool::decreaseUnorderedInputStreamRefCount()
{
return unordered_input_stream_ref_count.fetch_sub(1, std::memory_order_relaxed);
}

int64_t SegmentReadTaskPool::getFreeBlockSlots() const
Int64 SegmentReadTaskPool::getFreeBlockSlots() const
{
auto block_slots = unordered_input_stream_ref_count.load(std::memory_order_relaxed);
if (block_slots < 3)
{
block_slots = 3;
}
return block_slots - blk_stat.pendingCount();
return block_slot_limit - blk_stat.pendingCount();
}

int64_t SegmentReadTaskPool::getFreeActiveSegmentCountUnlock()
Int64 SegmentReadTaskPool::getFreeActiveSegments() const
{
auto active_segment_limit = unordered_input_stream_ref_count.load(std::memory_order_relaxed);
if (active_segment_limit < 2)
{
active_segment_limit = 2;
}
return active_segment_limit - static_cast<int64_t>(active_segment_ids.size());
std::lock_guard lock(mutex);
return getFreeActiveSegmentsUnlock();
}

Int64 SegmentReadTaskPool::getFreeActiveSegmentsUnlock() const
{
return active_segment_limit - static_cast<Int64>(active_segment_ids.size());
}

bool SegmentReadTaskPool::exceptionHappened() const
Expand Down
26 changes: 19 additions & 7 deletions dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ class SegmentReadTaskPool : private boost::noncopyable
SegmentReadTasks && tasks_,
AfterSegmentRead after_segment_read_,
const String & tracing_id,
bool enable_read_thread_)
bool enable_read_thread_,
Int64 num_streams_)
: pool_id(nextPoolId())
, table_id(table_id_)
, dm_context(dm_context_)
Expand All @@ -183,6 +184,13 @@ class SegmentReadTaskPool : private boost::noncopyable
, unordered_input_stream_ref_count(0)
, exception_happened(false)
, mem_tracker(current_memory_tracker == nullptr ? nullptr : current_memory_tracker->shared_from_this())
// If the queue is too short, only 1 in the extreme case, it may cause the computation thread
// to encounter empty queues frequently, resulting in too much waiting and thread context
// switching, so we limit the lower limit to 3, which provides two blocks of buffer space.
, block_slot_limit(std::max(num_streams_, 3))
// Limiting the minimum number of reading segments to 2 is to avoid, as much as possible,
// situations where the computation may be faster and the storage layer may not be able to keep up.
, active_segment_limit(std::max(num_streams_, 2))
{}

~SegmentReadTaskPool()
Expand Down Expand Up @@ -223,9 +231,10 @@ class SegmentReadTaskPool : private boost::noncopyable
const std::unordered_map<uint64_t, std::vector<uint64_t>> & segments,
uint64_t expected_merge_count);

int64_t increaseUnorderedInputStreamRefCount();
int64_t decreaseUnorderedInputStreamRefCount();
int64_t getFreeBlockSlots() const;
Int64 increaseUnorderedInputStreamRefCount();
Int64 decreaseUnorderedInputStreamRefCount();
Int64 getFreeBlockSlots() const;
Int64 getFreeActiveSegments() const;
bool valid() const;
void setException(const DB::Exception & e);

Expand All @@ -240,7 +249,7 @@ class SegmentReadTaskPool : private boost::noncopyable
}

private:
int64_t getFreeActiveSegmentCountUnlock();
Int64 getFreeActiveSegmentsUnlock() const;
bool exceptionHappened() const;
void finishSegment(const SegmentPtr & seg);
void pushBlock(Block && block);
Expand All @@ -255,13 +264,13 @@ class SegmentReadTaskPool : private boost::noncopyable
const ReadMode read_mode;
SegmentReadTasksWrapper tasks_wrapper;
AfterSegmentRead after_segment_read;
std::mutex mutex;
mutable std::mutex mutex;
std::unordered_set<uint64_t> active_segment_ids;
WorkQueue<Block> q;
BlockStat blk_stat;
LoggerPtr log;

std::atomic<int64_t> unordered_input_stream_ref_count;
std::atomic<Int64> unordered_input_stream_ref_count;

std::atomic<bool> exception_happened;
DB::Exception exception;
Expand All @@ -275,6 +284,9 @@ class SegmentReadTaskPool : private boost::noncopyable
// std::once_flag and std::call_once to prevent duplicated add.
std::once_flag add_to_scheduler;

const Int64 block_slot_limit;
const Int64 active_segment_limit;

inline static std::atomic<uint64_t> pool_id_gen{1};
inline static BlockStat global_blk_stat;
static uint64_t nextPoolId()
Expand Down

0 comments on commit 9c3181d

Please sign in to comment.