Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage: Avoid wasting CPU with inefficient scheduling in SegmentReadTaskScheduler (#6496) #6634

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,8 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
std::move(tasks),
after_segment_read,
req_info,
enable_read_thread);
enable_read_thread,
final_num_stream);

BlockInputStreams res;
if (enable_read_thread)
Expand Down Expand Up @@ -1006,7 +1007,8 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
std::move(tasks),
after_segment_read,
log_tracing_id,
enable_read_thread);
enable_read_thread,
final_num_stream);

BlockInputStreams res;
if (enable_read_thread)
Expand Down
13 changes: 13 additions & 0 deletions dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,17 @@ void MergedTaskPool::push(const MergedTaskPtr & t)
std::lock_guard lock(mtx);
merged_task_pool.push_back(t);
}

bool MergedTaskPool::has(UInt64 pool_id)
{
std::lock_guard lock(mtx);
for (const auto & t : merged_task_pool)
{
if (t->containPool(pool_id))
{
return true;
}
}
return false;
}
} // namespace DB::DM
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ class MergedTaskPool

MergedTaskPtr pop(uint64_t pool_id);
void push(const MergedTaskPtr & t);
bool has(UInt64 pool_id);

private:
std::mutex mtx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,21 @@ SegmentReadTaskPools SegmentReadTaskScheduler::getPoolsUnlock(const std::vector<
return pools;
}

bool SegmentReadTaskScheduler::needScheduleToRead(const SegmentReadTaskPoolPtr & pool)
{
return pool->getFreeBlockSlots() > 0 && // Block queue is not full and
(merged_task_pool.has(pool->poolId()) || // can schedule a segment from MergedTaskPool or
pool->getFreeActiveSegments() > 0); // schedule a new segment.
}

SegmentReadTaskPoolPtr SegmentReadTaskScheduler::scheduleSegmentReadTaskPoolUnlock()
{
int64_t pool_count = read_pools.size(); // All read task pool need to be scheduled, including invalid read task pool.
for (int64_t i = 0; i < pool_count; i++)
{
auto pool = read_pools.next();
// If pool->getFreeBlockSlots() > 0, schedule it for read blocks.
// If !pool->valid(), schedule it for clean MergedTaskPool.
if (pool != nullptr && (pool->getFreeBlockSlots() > 0 || !pool->valid()))
if (pool != nullptr && (needScheduleToRead(pool) || !pool->valid()))
{
return pool;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class SegmentReadTaskScheduler
bool isStop() const;
bool schedule();
void schedLoop();

bool needScheduleToRead(const SegmentReadTaskPoolPtr & pool);
SegmentReadTaskPools getPoolsUnlock(const std::vector<uint64_t> & pool_ids);
// <seg_id, pool_ids>
std::optional<std::pair<uint64_t, std::vector<uint64_t>>> scheduleSegmentUnlock(const SegmentReadTaskPoolPtr & pool);
Expand Down
30 changes: 13 additions & 17 deletions dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ std::unordered_map<uint64_t, std::vector<uint64_t>>::const_iterator SegmentReadT
{
auto target = segments.end();
std::lock_guard lock(mutex);
if (getFreeActiveSegmentCountUnlock() <= 0)
if (getFreeActiveSegmentsUnlock() <= 0)
{
return target;
}
Expand Down Expand Up @@ -269,33 +269,29 @@ void SegmentReadTaskPool::pushBlock(Block && block)
q.push(std::move(block), nullptr);
}

int64_t SegmentReadTaskPool::increaseUnorderedInputStreamRefCount()
Int64 SegmentReadTaskPool::increaseUnorderedInputStreamRefCount()
{
return unordered_input_stream_ref_count.fetch_add(1, std::memory_order_relaxed);
}
int64_t SegmentReadTaskPool::decreaseUnorderedInputStreamRefCount()
Int64 SegmentReadTaskPool::decreaseUnorderedInputStreamRefCount()
{
return unordered_input_stream_ref_count.fetch_sub(1, std::memory_order_relaxed);
}

int64_t SegmentReadTaskPool::getFreeBlockSlots() const
Int64 SegmentReadTaskPool::getFreeBlockSlots() const
{
auto block_slots = unordered_input_stream_ref_count.load(std::memory_order_relaxed);
if (block_slots < 3)
{
block_slots = 3;
}
return block_slots - blk_stat.pendingCount();
return block_slot_limit - blk_stat.pendingCount();
}

int64_t SegmentReadTaskPool::getFreeActiveSegmentCountUnlock()
Int64 SegmentReadTaskPool::getFreeActiveSegments() const
{
auto active_segment_limit = unordered_input_stream_ref_count.load(std::memory_order_relaxed);
if (active_segment_limit < 2)
{
active_segment_limit = 2;
}
return active_segment_limit - static_cast<int64_t>(active_segment_ids.size());
std::lock_guard lock(mutex);
return getFreeActiveSegmentsUnlock();
}

Int64 SegmentReadTaskPool::getFreeActiveSegmentsUnlock() const
{
return active_segment_limit - static_cast<Int64>(active_segment_ids.size());
}

bool SegmentReadTaskPool::exceptionHappened() const
Expand Down
26 changes: 19 additions & 7 deletions dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ class SegmentReadTaskPool : private boost::noncopyable
SegmentReadTasks && tasks_,
AfterSegmentRead after_segment_read_,
const String & tracing_id,
bool enable_read_thread_)
bool enable_read_thread_,
Int64 num_streams_)
: pool_id(nextPoolId())
, table_id(table_id_)
, dm_context(dm_context_)
Expand All @@ -183,6 +184,13 @@ class SegmentReadTaskPool : private boost::noncopyable
, unordered_input_stream_ref_count(0)
, exception_happened(false)
, mem_tracker(current_memory_tracker == nullptr ? nullptr : current_memory_tracker->shared_from_this())
// If the queue is too short, only 1 in the extreme case, it may cause the computation thread
// to encounter empty queues frequently, resulting in too much waiting and thread context
// switching, so we limit the lower limit to 3, which provides two blocks of buffer space.
, block_slot_limit(std::max(num_streams_, 3))
// Limiting the minimum number of reading segments to 2 is to avoid, as much as possible,
// situations where the computation may be faster and the storage layer may not be able to keep up.
, active_segment_limit(std::max(num_streams_, 2))
{}

~SegmentReadTaskPool()
Expand Down Expand Up @@ -223,9 +231,10 @@ class SegmentReadTaskPool : private boost::noncopyable
const std::unordered_map<uint64_t, std::vector<uint64_t>> & segments,
uint64_t expected_merge_count);

int64_t increaseUnorderedInputStreamRefCount();
int64_t decreaseUnorderedInputStreamRefCount();
int64_t getFreeBlockSlots() const;
Int64 increaseUnorderedInputStreamRefCount();
Int64 decreaseUnorderedInputStreamRefCount();
Int64 getFreeBlockSlots() const;
Int64 getFreeActiveSegments() const;
bool valid() const;
void setException(const DB::Exception & e);

Expand All @@ -240,7 +249,7 @@ class SegmentReadTaskPool : private boost::noncopyable
}

private:
int64_t getFreeActiveSegmentCountUnlock();
Int64 getFreeActiveSegmentsUnlock() const;
bool exceptionHappened() const;
void finishSegment(const SegmentPtr & seg);
void pushBlock(Block && block);
Expand All @@ -255,13 +264,13 @@ class SegmentReadTaskPool : private boost::noncopyable
const ReadMode read_mode;
SegmentReadTasksWrapper tasks_wrapper;
AfterSegmentRead after_segment_read;
std::mutex mutex;
mutable std::mutex mutex;
std::unordered_set<uint64_t> active_segment_ids;
WorkQueue<Block> q;
BlockStat blk_stat;
LoggerPtr log;

std::atomic<int64_t> unordered_input_stream_ref_count;
std::atomic<Int64> unordered_input_stream_ref_count;

std::atomic<bool> exception_happened;
DB::Exception exception;
Expand All @@ -275,6 +284,9 @@ class SegmentReadTaskPool : private boost::noncopyable
// std::once_flag and std::call_once to prevent duplicated add.
std::once_flag add_to_scheduler;

const Int64 block_slot_limit;
const Int64 active_segment_limit;

inline static std::atomic<uint64_t> pool_id_gen{1};
inline static BlockStat global_blk_stat;
static uint64_t nextPoolId()
Expand Down