@@ -171,7 +171,7 @@ class TObservableUpdater : IObservable {
171
171
}
172
172
173
173
template <typename T>
174
- TParameter<T>* FindOrAddParameter (TParameterKey key, double def);
174
+ TParameter<T>* FindOrAddParameter (TParameterKey key, T def);
175
175
176
176
ui64 ValuesCount () {
177
177
return Params.size ();
@@ -223,29 +223,32 @@ class TParameter : public IObservableValue<T> {
223
223
Updater_->ToUpdate (this );
224
224
}
225
225
226
- void SetValue (T val) {
226
+ T SetValue (T val) {
227
+ auto oldValue = Value_;
227
228
Value_ = val;
228
229
Updater_->ToUpdate (this );
230
+ return oldValue;
229
231
}
230
232
231
233
protected:
232
234
double DoUpdateValue () override {
233
235
return Value_;
234
236
}
235
237
236
- private:
237
238
T Value_;
239
+
240
+ private:
238
241
TObservableUpdater* Updater_;
239
242
};
240
243
241
244
template <typename T>
242
- TParameter<T>* TObservableUpdater::FindOrAddParameter (TParameterKey key, double def) {
245
+ TParameter<T>* TObservableUpdater::FindOrAddParameter (TParameterKey key, T def) {
243
246
if (auto * ptr = FindValue<TParameter<T>>(key)) {
244
247
return ptr;
245
248
}
246
249
auto value = MakeHolder<TParameter<T>>(this , def);
247
250
auto * result = value.Get ();
248
- AddValue<TParameter<double >>(key, std::move (value));
251
+ AddValue<TParameter<T >>(key, std::move (value));
249
252
return result;
250
253
}
251
254
@@ -345,6 +348,62 @@ TSchedulerEntityHandle& TSchedulerEntityHandle::operator = (TSchedulerEntityHand
345
348
346
349
TSchedulerEntityHandle::~TSchedulerEntityHandle () = default ;
347
350
351
+ class TSumResourceWeightsHolder : public TParameter <double > {
352
+ public:
353
+ using TParameter<double >::TParameter;
354
+
355
+ TSumResourceWeightsHolder (TObservableUpdater* engine)
356
+ : TParameter(engine, 0 )
357
+ {
358
+ }
359
+
360
+ void HandleUpdate (double delta) {
361
+ SetValue (Value_ + delta);
362
+ }
363
+
364
+ };
365
+
366
+ class TResourceWeightsUpdater {
367
+ public:
368
+ TResourceWeightsUpdater (TParameter<double >* param, double initial)
369
+ : Param(param)
370
+ , Value_(initial)
371
+ {
372
+ param->SetValue (Value_);
373
+ }
374
+
375
+ void Track (TSumResourceWeightsHolder* holder) {
376
+ if (!Holder) {
377
+ Holder = holder;
378
+ Holder->HandleUpdate (GetValue ());
379
+ }
380
+ }
381
+
382
+ void Untrack () {
383
+ if (Holder) {
384
+ Holder->HandleUpdate (-GetValue ());
385
+ Holder = nullptr ;
386
+ }
387
+ }
388
+
389
+ double GetValue () {
390
+ return Value_;
391
+ }
392
+
393
+ void SetValue (double val) {
394
+ if (Holder) {
395
+ Holder->HandleUpdate (val - Value_);
396
+ }
397
+ Value_ = val;
398
+ Param->SetValue (Value_);
399
+ }
400
+
401
+ private:
402
+ TSumResourceWeightsHolder* Holder = nullptr ;
403
+ TParameter<double >* Param;
404
+ double Value_;
405
+ };
406
+
348
407
class TSchedulerEntity {
349
408
public:
350
409
TSchedulerEntity () {}
@@ -371,6 +430,7 @@ class TSchedulerEntity {
371
430
std::atomic<i64 > DelayedCount = 0 ;
372
431
373
432
THolder<IObservableValue<double >> Share;
433
+ THolder<TResourceWeightsUpdater> ResourceWeightUpdater;
374
434
375
435
::NMonitoring::TDynamicCounters::TCounterPtr Vtime;
376
436
::NMonitoring::TDynamicCounters::TCounterPtr EntitiesWeight;
@@ -495,10 +555,17 @@ struct TComputeScheduler::TImpl {
495
555
496
556
TObservableUpdater WeightsUpdater;
497
557
TParameter<double > SumCores{&WeightsUpdater, 1 };
558
+ TSumResourceWeightsHolder SumResourceWeights{&WeightsUpdater};
498
559
499
560
enum : ui32 {
500
- Share = 1 ,
561
+ TotalShare = 1 ,
562
+
501
563
PerQueryShare = 2 ,
564
+
565
+ ResourceWeight = 3 ,
566
+ ResourceWeightEnabled = 4 ,
567
+
568
+ CompositeShare = 5 ,
502
569
};
503
570
504
571
TIntrusivePtr<TKqpCounters> Counters;
@@ -565,6 +632,9 @@ void TComputeScheduler::AddToGroup(TMonotonic now, ui64 id, TSchedulerEntityHand
565
632
auto group = Impl->Records [id].get ();
566
633
(*handle).Groups .push_back (group);
567
634
group->MutableStats .Next ()->EntitiesWeight += (*handle).Weight ;
635
+ if ((*handle).Weight > 0 && group->ResourceWeightUpdater ) {
636
+ group->ResourceWeightUpdater ->Track (&Impl->SumResourceWeights );
637
+ }
568
638
Impl->AdvanceTime (now, group);
569
639
}
570
640
@@ -584,6 +654,7 @@ void TComputeScheduler::TImpl::AdvanceTime(TMonotonic now, TSchedulerEntity::TGr
584
654
if (Counters) {
585
655
record->InitCounters (Counters);
586
656
}
657
+ WeightsUpdater.UpdateAll ();
587
658
record->MutableStats .Next ()->Capacity = record->Share ->GetValue ();
588
659
auto & v = record->MutableStats ;
589
660
{
@@ -621,7 +692,6 @@ void TComputeScheduler::TImpl::AdvanceTime(TMonotonic now, TSchedulerEntity::TGr
621
692
}
622
693
623
694
void TComputeScheduler::AdvanceTime (TMonotonic now) {
624
- Impl->WeightsUpdater .UpdateAll ();
625
695
for (size_t i = 0 ; i < Impl->Records .size (); ++i) {
626
696
Impl->AdvanceTime (now, Impl->Records [i].get ());
627
697
}
@@ -636,6 +706,9 @@ void TComputeScheduler::Deregister(TSchedulerEntityHandle& self, TMonotonic now)
636
706
for (auto group : (*self).Groups ) {
637
707
auto * next = group->MutableStats .Next ();
638
708
next->EntitiesWeight -= (*self).Weight ;
709
+ if (next->EntitiesWeight <= 0 ) {
710
+ group->ResourceWeightUpdater ->Untrack ();
711
+ }
639
712
Impl->AdvanceTime (now, group);
640
713
}
641
714
}
@@ -701,14 +774,52 @@ void TComputeScheduler::Disable(TString group, TMonotonic now) {
701
774
}
702
775
}
703
776
704
- void TComputeScheduler::UpdateGroupShare (TString group, double share, TMonotonic now) {
777
+ class TCompositeGroupShare : public IObservableValue <double > {
778
+ protected:
779
+ double DoUpdateValue () override {
780
+ if (ResourceWeightEnabled->GetValue ()) {
781
+ return Min (TotalLimit->GetValue (), ResourceWeight->GetValue () / SumResourceWeights->GetValue ());
782
+ } else {
783
+ return TotalLimit->GetValue ();
784
+ }
785
+ }
786
+
787
+ public:
788
+ TCompositeGroupShare (IObservableValue<double >* resourceWeight, IObservableValue<bool >* resourceWeightEnabled, IObservableValue<double >* sumResourceWeights, IObservableValue<double >* totalLimit)
789
+ : ResourceWeight(resourceWeight)
790
+ , ResourceWeightEnabled(resourceWeightEnabled)
791
+ , SumResourceWeights(sumResourceWeights)
792
+ , TotalLimit(totalLimit)
793
+ {
794
+ Update ();
795
+ }
796
+
797
+ private:
798
+ IObservableValue<double >* ResourceWeight;
799
+ IObservableValue<bool >* ResourceWeightEnabled;
800
+ IObservableValue<double >* SumResourceWeights;
801
+ IObservableValue<double >* TotalLimit;
802
+ };
803
+
804
+ void TComputeScheduler::UpdateGroupShare (TString group, double share, TMonotonic now, std::optional<double > resourceWeight) {
705
805
auto ptr = Impl->GroupId .FindPtr (group);
706
806
707
- auto * shareValue = Impl->WeightsUpdater .FindOrAddParameter <double >({group, TImpl::Share }, share);
807
+ auto * shareValue = Impl->WeightsUpdater .FindOrAddParameter <double >({group, TImpl::TotalShare }, share);
708
808
shareValue->SetValue (share);
809
+
810
+ TParameter<bool >* weightEnabled = Impl->WeightsUpdater .FindOrAddParameter <bool >({group, TImpl::ResourceWeightEnabled}, resourceWeight.has_value ());
811
+ weightEnabled->SetValue (resourceWeight.has_value ());
812
+
813
+ TParameter<double >* resourceWeightValue = Impl->WeightsUpdater .FindOrAddParameter <double >({group, TImpl::ResourceWeight}, resourceWeight.value_or (0 ));
814
+
709
815
if (!ptr) {
710
- auto cap = MakeHolder<TShare>(&Impl->SumCores , shareValue);
816
+ auto compositeWeight = MakeHolder<TCompositeGroupShare>(resourceWeightValue, weightEnabled, &Impl->SumResourceWeights , shareValue);
817
+ auto resourceWeightsUpdater = MakeHolder<TResourceWeightsUpdater>(resourceWeightValue, resourceWeight.value_or (0 ));
818
+ auto cap = MakeHolder<TShare>(&Impl->SumCores , compositeWeight.Get ());
819
+ Impl->WeightsUpdater .AddValue ({group, TImpl::CompositeShare}, std::move (compositeWeight));
711
820
Impl->CreateGroup (std::move (cap), now, group);
821
+
822
+ Impl->Records .back ()->ResourceWeightUpdater = std::move (resourceWeightsUpdater);
712
823
} else {
713
824
auto & record = Impl->Records [*ptr];
714
825
record->MutableStats .Next ()->Disabled = false ;
@@ -816,16 +927,20 @@ class TSchedulerActor : public TActorBootstrapped<TSchedulerActor> {
816
927
if (ev->Get ()->Config .has_value ()) {
817
928
auto totalShare = ev->Get ()->Config ->TotalCpuLimitPercentPerNode / 100.0 ;
818
929
auto queryShare = ev->Get ()->Config ->QueryCpuLimitPercentPerNode / 100.0 ;
930
+ std::optional<double > resourceWeight;
931
+ if (ev->Get ()->Config ->ResourceWeight >= 0 ) {
932
+ resourceWeight = ev->Get ()->Config ->ResourceWeight ;
933
+ }
819
934
820
- if (totalShare <= 0 && queryShare > 0 ) {
935
+ if (totalShare <= 0 && ( queryShare > 0 || resourceWeight) ) {
821
936
totalShare = 1 ;
822
937
}
823
938
824
939
if (queryShare <= 0 ) {
825
940
queryShare = 1 ;
826
941
}
827
942
828
- Opts.Scheduler ->UpdateGroupShare (ev->Get ()->PoolId , totalShare, TlsActivationContext->Monotonic ());
943
+ Opts.Scheduler ->UpdateGroupShare (ev->Get ()->PoolId , totalShare, TlsActivationContext->Monotonic (), resourceWeight );
829
944
Opts.Scheduler ->UpdatePerQueryShare (ev->Get ()->PoolId , queryShare, TlsActivationContext->Monotonic ());
830
945
} else {
831
946
Opts.Scheduler ->Disable (ev->Get ()->PoolId , TlsActivationContext->Monotonic ());
0 commit comments