Skip to content

Commit 0656124

Browse files
authored
Merge f8fdd49 into 3ecb4ff
2 parents 3ecb4ff + f8fdd49 commit 0656124

File tree

2 files changed

+26
-3
lines changed

2 files changed

+26
-3
lines changed

ydb/core/kqp/runtime/kqp_compute_scheduler.cpp

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ class TSchedulerEntity {
126126
std::atomic<i64> DelayedSumBatches = 0;
127127
std::atomic<i64> DelayedCount = 0;
128128

129+
double Share;
130+
129131
TMultithreadPublisher<TGroupMutableStats> MutableStats;
130132
};
131133

@@ -215,13 +217,18 @@ struct TComputeScheduler::TImpl {
215217

216218
TDuration MaxDelay = TDuration::Seconds(10);
217219

218-
void AssignWeights() { }
220+
void AssignWeights() {
221+
for (auto& record : Records) {
222+
record->MutableStats.Next()->Weight = SumCores * record->Share;
223+
}
224+
}
219225

220226
void CreateGroup(TString groupName, double maxShare, NMonotonic::TMonotonic now) {
221227
PoolId[groupName] = Records.size();
222228
auto group = std::make_unique<TSchedulerEntity::TGroupRecord>();
229+
group->Share = maxShare;
223230
group->MutableStats.Next()->LastNowRecalc = now;
224-
group->MutableStats.Next()->Weight = maxShare;
231+
group->MutableStats.Next()->Weight = SumCores * maxShare;
225232
Records.push_back(std::move(group));
226233
}
227234
};
@@ -372,11 +379,14 @@ void TComputeScheduler::UpdateMaxShare(TString group, double share, TMonotonic n
372379
Impl->CreateGroup(group, share, now);
373380
} else {
374381
auto& record = Impl->Records[*ptr];
375-
record->MutableStats.Next()->Weight = share;
382+
record->Share = share;
376383
}
377384
AdvanceTime(now);
378385
}
379386

387+
void TComputeScheduler::SetCapacity(ui64 cores) {
388+
Impl->SumCores = cores;
389+
}
380390

381391
::NMonitoring::TDynamicCounters::TCounterPtr TComputeScheduler::GetGroupUsageCounter(TString group) const {
382392
return Impl->Counters
@@ -407,6 +417,7 @@ class TSchedulerActor : public TActorBootstrapped<TSchedulerActor> {
407417
}
408418
Opts.Scheduler->SetForgetInterval(Opts.ForgetOverflowTimeout);
409419
Opts.Scheduler->ReportCounters(Opts.Counters);
420+
SetCapacity();
410421
}
411422

412423
void Bootstrap() {
@@ -420,6 +431,15 @@ class TSchedulerActor : public TActorBootstrapped<TSchedulerActor> {
420431
Become(&TSchedulerActor::State);
421432
}
422433

434+
void SetCapacity() {
435+
NActors::TExecutorPoolStats poolStats;
436+
TVector<NActors::TExecutorThreadStats> threadsStats;
437+
TlsActivationContext->ActorSystem()->GetPoolStats(SelfId().PoolID(), poolStats, threadsStats);
438+
Y_ENSURE(poolStats.MaxThreadCount > 0);
439+
Opts.Counters->SchedulerCapacity->Set(poolStats.MaxThreadCount);
440+
Opts.Scheduler->SetCapacity(poolStats.MaxThreadCount);
441+
}
442+
423443
STATEFN(State) {
424444
switch (ev->GetTypeRewrite()) {
425445
hFunc(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, Handle);
@@ -467,6 +487,7 @@ class TSchedulerActor : public TActorBootstrapped<TSchedulerActor> {
467487
}
468488

469489
void Handle(TEvents::TEvWakeup::TPtr&) {
490+
SetCapacity();
470491
Opts.Scheduler->AdvanceTime(TlsActivationContext->Monotonic());
471492
Schedule(Opts.AdvanceTimeInterval, new TEvents::TEvWakeup());
472493
}

ydb/core/kqp/runtime/kqp_compute_scheduler.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ class TComputeScheduler {
6262
void ReportCounters(TIntrusivePtr<TKqpCounters>);
6363

6464
void UpdateMaxShare(TString, double, TMonotonic now);
65+
66+
void SetCapacity(ui64 cores);
6567

6668
void SetMaxDeviation(TDuration);
6769
void SetForgetInterval(TDuration);

0 commit comments

Comments
 (0)