Skip to content

Commit d3bbac0

Browse files
authored
merge to 24-3: Fix harmonizer's work with shared threads (#9139)
1 parent 6302bdc commit d3bbac0

File tree

2 files changed

+7
-6
lines changed

2 files changed

+7
-6
lines changed

ydb/library/actors/core/executor_thread.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ namespace NActors {
487487
ExecutorPool->ScheduleActivation(capturedActivation);
488488
}
489489
if (!activation) {
490-
return {IsSharedThread, wasWorking};
490+
break;
491491
}
492492
executeActivation(activation, false);
493493
}

ydb/library/actors/core/harmonizer.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ Y_FORCE_INLINE bool IsStarved(double consumed, double booked) {
424424
}
425425

426426
Y_FORCE_INLINE bool IsHoggish(double booked, double currentThreadCount) {
427-
return booked < currentThreadCount - 1;
427+
return booked < currentThreadCount - 0.5;
428428
}
429429

430430
void THarmonizer::HarmonizeImpl(ui64 ts) {
@@ -435,7 +435,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
435435
i64 beingStopped = 0;
436436
double total = 0;
437437
TStackVec<size_t, 8> needyPools;
438-
TStackVec<size_t, 8> hoggishPools;
438+
TStackVec<std::pair<size_t, double>, 8> hoggishPools;
439439
TStackVec<bool, 8> isNeedyByPool;
440440

441441
size_t sumOfAdditionalThreads = 0;
@@ -577,7 +577,7 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
577577
bool isHoggish = IsHoggish(poolBooked, currentThreadCount)
578578
|| IsHoggish(lastSecondPoolBooked, currentThreadCount);
579579
if (isHoggish) {
580-
hoggishPools.push_back(poolIdx);
580+
hoggishPools.push_back({poolIdx, std::max(poolBooked - currentThreadCount, lastSecondPoolBooked - currentThreadCount)});
581581
}
582582
booked += poolBooked;
583583
consumed += poolConsumed;
@@ -714,18 +714,19 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
714714
}
715715
}
716716

717-
for (size_t hoggishPoolIdx : hoggishPools) {
717+
for (auto &[hoggishPoolIdx, freeCpu] : hoggishPools) {
718718
TPoolInfo &pool = *Pools[hoggishPoolIdx];
719719
i64 threadCount = pool.GetFullThreadCount();
720720
if (hasBorrowedSharedThread[hoggishPoolIdx]) {
721721
Shared->ReturnBorrowedHalfThread(hoggishPoolIdx);
722+
freeCpu -= 0.5;
722723
continue;
723724
}
724725
if (pool.BasicPool && pool.LocalQueueSize > NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE) {
725726
pool.LocalQueueSize = std::min<ui16>(NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE, pool.LocalQueueSize / 2);
726727
pool.BasicPool->SetLocalQueueSize(pool.LocalQueueSize);
727728
}
728-
if (threadCount > pool.MinFullThreadCount) {
729+
if (threadCount > pool.MinFullThreadCount && freeCpu >= 1) {
729730
AtomicIncrement(pool.DecreasingThreadsByHoggishState);
730731
LWPROBE(HarmonizeOperation, hoggishPoolIdx, pool.Pool->GetName(), "decrease by hoggish", threadCount - 1, pool.DefaultFullThreadCount, pool.MaxFullThreadCount);
731732
pool.SetFullThreadCount(threadCount - 1);

0 commit comments

Comments
 (0)