@@ -126,6 +126,8 @@ class TSchedulerEntity {
126126 std::atomic<i64 > DelayedSumBatches = 0 ;
127127 std::atomic<i64 > DelayedCount = 0 ;
128128
129+ double Share;
130+
129131 TMultithreadPublisher<TGroupMutableStats> MutableStats;
130132 };
131133
@@ -215,13 +217,16 @@ struct TComputeScheduler::TImpl {
215217
216218 TDuration MaxDelay = TDuration::Seconds(10 );
217219
218- void AssignWeights () { }
220+ void AssignWeights () {
221+ for (auto & record : Records) {
222+ record->MutableStats .Next ()->Weight = SumCores * record->Share ;
223+ }
224+ }
219225
220- void CreateGroup (TString groupName, double maxShare, NMonotonic::TMonotonic now ) {
226+ void CreateGroup (TString groupName, double maxShare) {
221227 PoolId[groupName] = Records.size ();
222228 auto group = std::make_unique<TSchedulerEntity::TGroupRecord>();
223- group->MutableStats .Next ()->LastNowRecalc = now;
224- group->MutableStats .Next ()->Weight = maxShare;
229+ group->Share = maxShare;
225230 Records.push_back (std::move (group));
226231 }
227232};
@@ -369,14 +374,18 @@ bool TComputeScheduler::Disable(TString group, TMonotonic now) {
369374void TComputeScheduler::UpdateMaxShare (TString group, double share, TMonotonic now) {
370375 auto ptr = Impl->PoolId .FindPtr (group);
371376 if (!ptr) {
372- Impl->CreateGroup (group, share, now );
377+ Impl->CreateGroup (group, share);
373378 } else {
374379 auto & record = Impl->Records [*ptr];
375- record->MutableStats . Next ()-> Weight = share;
380+ record->Share = share;
376381 }
382+ Impl->AssignWeights ();
377383 AdvanceTime (now);
378384}
379385
386+ void TComputeScheduler::SetCapacity (ui64 cores) {
387+ Impl->SumCores = cores;
388+ }
380389
381390::NMonitoring::TDynamicCounters::TCounterPtr TComputeScheduler::GetGroupUsageCounter (TString group) const {
382391 return Impl->Counters
@@ -407,6 +416,7 @@ class TSchedulerActor : public TActorBootstrapped<TSchedulerActor> {
407416 }
408417 Opts.Scheduler ->SetForgetInterval (Opts.ForgetOverflowTimeout );
409418 Opts.Scheduler ->ReportCounters (Opts.Counters );
419+ SetCapacity ();
410420 }
411421
412422 void Bootstrap () {
@@ -420,6 +430,15 @@ class TSchedulerActor : public TActorBootstrapped<TSchedulerActor> {
420430 Become (&TSchedulerActor::State);
421431 }
422432
433+ void SetCapacity () {
434+ NActors::TExecutorPoolStats poolStats;
435+ TVector<NActors::TExecutorThreadStats> threadsStats;
436+ TlsActivationContext->ActorSystem ()->GetPoolStats (SelfId ().PoolID (), poolStats, threadsStats);
437+ Y_ENSURE (poolStats.MaxThreadCount > 0 );
438+ Opts.Counters ->SchedulerCapacity ->Set (poolStats.MaxThreadCount );
439+ Opts.Scheduler ->SetCapacity (poolStats.MaxThreadCount );
440+ }
441+
423442 STATEFN (State) {
424443 switch (ev->GetTypeRewrite ()) {
425444 hFunc (NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse, Handle);
@@ -467,6 +486,7 @@ class TSchedulerActor : public TActorBootstrapped<TSchedulerActor> {
467486 }
468487
469488 void Handle (TEvents::TEvWakeup::TPtr&) {
489+ SetCapacity ();
470490 Opts.Scheduler ->AdvanceTime (TlsActivationContext->Monotonic ());
471491 Schedule (Opts.AdvanceTimeInterval , new TEvents::TEvWakeup ());
472492 }
0 commit comments