@@ -81,12 +81,13 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
8181 if (config.HasIteratorReadQuotaSettings ()) {
8282 SetIteratorReadsQuotaSettings (config.GetIteratorReadQuotaSettings ());
8383 }
84- if (config.HasPoolsConfiguration ()) {
85- SetPriorities (config.GetPoolsConfiguration ());
86- }
87- Scheduler.ReportCounters (counters);
88- AdvanceTimeInterval = TDuration::MicroSeconds (config.GetComputeSchedulerSettings ().GetAdvanceTimeIntervalUsec ());
89- Scheduler.SetForgetInterval (TDuration::MicroSeconds (config.GetComputeSchedulerSettings ().GetForgetOverflowTimeoutUsec ()));
84+
85+ SchedulerOptions = {
86+ .AdvanceTimeInterval = TDuration::MicroSeconds (config.GetComputeSchedulerSettings ().GetAdvanceTimeIntervalUsec ()),
87+ .ForgetOverflowTimeout = TDuration::MicroSeconds (config.GetComputeSchedulerSettings ().GetForgetOverflowTimeoutUsec ()),
88+ .ActivePoolPollingTimeout = TDuration::Seconds (config.GetComputeSchedulerSettings ().GetActivePoolPollingSec ()),
89+ .Counters = counters,
90+ };
9091 }
9192
9293 void Bootstrap () {
@@ -105,15 +106,13 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
105106 TlsActivationContext->ExecutorThread .ActorSystem , SelfId ());
106107 }
107108
108- Schedule (TDuration::Seconds (1 ), new TEvents::TEvWakeup (WakeCleaunupTag));
109- Schedule (AdvanceTimeInterval, new TEvents::TEvWakeup (WakeAdvanceTimeTag));
109+ Schedule (TDuration::Seconds (1 ), new TEvents::TEvWakeup ());
110110 Become (&TKqpNodeService::WorkState);
111- }
112111
113- enum {
114- WakeCleaunupTag,
115- WakeAdvanceTimeTag
116- };
112+ Scheduler = std::make_shared<TComputeScheduler>();
113+ SchedulerOptions. Scheduler = Scheduler;
114+ SchedulerActorId = RegisterWithSameMailbox ( CreateSchedulerActor (SchedulerOptions));
115+ }
117116
118117private:
119118 STATEFN (WorkState) {
@@ -128,8 +127,6 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
128127 hFunc (TEvents::TEvUndelivered, HandleWork);
129128 hFunc (TEvents::TEvPoison, HandleWork);
130129 hFunc (NMon::TEvHttpInfo, HandleWork);
131- // sheduling
132- hFunc (TEvSchedulerDeregister, HandleWork);
133130 default : {
134131 Y_ABORT (" Unexpected event 0x%x for TKqpResourceManagerService" , ev->GetTypeRewrite ());
135132 }
@@ -138,12 +135,6 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
138135
139136 static constexpr double SecToUsec = 1e6 ;
140137
141- void HandleWork (TEvSchedulerDeregister::TPtr& ev) {
142- if (ev->Get ()->SchedulerEntity ) {
143- Scheduler.Deregister (*ev->Get ()->SchedulerEntity , TMonotonic::Now ());
144- }
145- }
146-
147138 void HandleWork (TEvKqpNode::TEvStartKqpTasksRequest::TPtr& ev) {
148139 NWilson::TSpan sendTasksSpan (TWilsonKqp::KqpNodeSendTasks, NWilson::TTraceId (ev->TraceId ), " KqpNode.SendTasks" , NWilson::EFlags::AUTO_END);
149140
@@ -198,7 +189,7 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
198189 const TString& serializedGUCSettings = ev->Get ()->Record .HasSerializedGUCSettings () ?
199190 ev->Get ()->Record .GetSerializedGUCSettings () : " " ;
200191
201- auto schedulerNow = TMonotonic::Now ();
192+ auto schedulerNow = TlsActivationContext-> Monotonic ();
202193
203194 // start compute actors
204195 TMaybe<NYql::NDqProto::TRlPath> rlPath = Nothing ();
@@ -213,20 +204,28 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
213204 for (auto & dqTask: *msg.MutableTasks ()) {
214205 TString group = msg.GetSchedulerGroup ();
215206
216- TComputeActorSchedulingOptions schedulingOptions {
207+ TComputeActorSchedulingOptions schedulingTaskOptions {
217208 .Now = schedulerNow,
218- .NodeService = SelfId () ,
219- .Scheduler = & Scheduler,
209+ .SchedulerActorId = SchedulerActorId ,
210+ .Scheduler = Scheduler. get () ,
220211 .Group = group,
221212 .Weight = 1 ,
222213 .NoThrottle = false ,
223214 .Counters = Counters
224215 };
225216
226- if (Scheduler.Disabled (schedulingOptions.Group )) {
227- schedulingOptions.NoThrottle = true ;
228- } else {
229- schedulingOptions.Handle = Scheduler.Enroll (schedulingOptions.Group , schedulingOptions.Weight , schedulingOptions.Now );
217+ if (SchedulerOptions.Scheduler ->Disabled (group)) {
218+ auto share = msg.GetMaxCpuShare ();
219+ if (share > 0 ) {
220+ Scheduler->UpdateMaxShare (group, share, schedulerNow);
221+ Send (SchedulerActorId, new TEvSchedulerNewPool (msg.GetDatabase (), group, share));
222+ } else {
223+ schedulingTaskOptions.NoThrottle = true ;
224+ }
225+ }
226+
227+ if (!schedulingTaskOptions.NoThrottle ) {
228+ schedulingTaskOptions.Handle = SchedulerOptions.Scheduler ->Enroll (schedulingTaskOptions.Group , schedulingTaskOptions.Weight , schedulingTaskOptions.Now );
230229 }
231230
232231 auto result = CaFactory_->CreateKqpComputeActor ({
@@ -248,7 +247,7 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
248247 .RlPath = rlPath,
249248 .ComputesByStages = &computesByStage,
250249 .State = State_,
251- .SchedulingOptions = std::move (schedulingOptions)
250+ .SchedulingOptions = std::move (schedulingTaskOptions),
252251 });
253252
254253 if (const auto * rmResult = std::get_if<NRm::TKqpRMAllocateResult>(&result)) {
@@ -341,17 +340,11 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
341340 }
342341
343342 void HandleWork (TEvents::TEvWakeup::TPtr& ev) {
344- if (ev->Get ()->Tag == WakeAdvanceTimeTag) {
345- Scheduler.AdvanceTime (TMonotonic::Now ());
346- Schedule (AdvanceTimeInterval, new TEvents::TEvWakeup (WakeAdvanceTimeTag));
347- }
348- if (ev->Get ()->Tag == WakeCleaunupTag) {
349- Schedule (TDuration::Seconds (1 ), ev->Release ().Release ());
350- for (auto & bucket : State_->Buckets ) {
351- auto expiredRequests = bucket.ClearExpiredRequests ();
352- for (auto & cxt : expiredRequests) {
353- TerminateTx (cxt.TxId , " reached execution deadline" , NYql::NDqProto::StatusIds::TIMEOUT);
354- }
343+ Schedule (TDuration::Seconds (1 ), ev->Release ().Release ());
344+ for (auto & bucket : State_->Buckets ) {
345+ auto expiredRequests = bucket.ClearExpiredRequests ();
346+ for (auto & cxt : expiredRequests) {
347+ TerminateTx (cxt.TxId , " reached execution deadline" , NYql::NDqProto::StatusIds::TIMEOUT);
355348 }
356349 }
357350 }
@@ -394,13 +387,6 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
394387 SetIteratorReadsQuotaSettings (event.GetConfig ().GetTableServiceConfig ().GetIteratorReadQuotaSettings ());
395388 }
396389
397- if (event.GetConfig ().GetTableServiceConfig ().HasPoolsConfiguration ()) {
398- SetPriorities (event.GetConfig ().GetTableServiceConfig ().GetPoolsConfiguration ());
399- }
400-
401- AdvanceTimeInterval = TDuration::MicroSeconds (event.GetConfig ().GetTableServiceConfig ().GetComputeSchedulerSettings ().GetAdvanceTimeIntervalUsec ());
402- Scheduler.SetForgetInterval (TDuration::MicroSeconds (event.GetConfig ().GetTableServiceConfig ().GetComputeSchedulerSettings ().GetForgetOverflowTimeoutUsec ()));
403-
404390 auto responseEv = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationResponse>(event);
405391 Send (ev->Sender , responseEv.Release (), IEventHandle::FlagTrackDelivery, ev->Cookie );
406392 }
@@ -409,35 +395,6 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
409395 SetDefaultIteratorQuotaSettings (settings.GetMaxRows (), settings.GetMaxBytes ());
410396 }
411397
412- void SetPriorities (const NKikimrConfig::TTableServiceConfig::TComputePoolConfiguration& conf) {
413- std::function<TComputeScheduler::TDistributionRule (const NKikimrConfig::TTableServiceConfig::TComputePoolConfiguration&)> convert
414- = [&](const NKikimrConfig::TTableServiceConfig::TComputePoolConfiguration& conf)
415- {
416- if (conf.HasName ()) {
417- return TComputeScheduler::TDistributionRule{.Share = conf.GetMaxCpuShare (), .Name = conf.GetName ()};
418- } else if (conf.HasSubPoolsConfiguration ()) {
419- auto res = TComputeScheduler::TDistributionRule{.Share = conf.GetMaxCpuShare ()};
420- for (auto & subConf : conf.GetSubPoolsConfiguration ().GetSubPools ()) {
421- res.SubRules .push_back (convert (subConf));
422- }
423- return res;
424- } else {
425- Y_ENSURE (false , " unknown case" );
426- }
427- };
428- SetPriorities (convert (conf));
429- }
430-
431- void SetPriorities (TComputeScheduler::TDistributionRule rule) {
432- NActors::TExecutorPoolStats poolStats;
433- TVector<NActors::TExecutorThreadStats> threadsStats;
434- TlsActivationContext->ActorSystem ()->GetPoolStats (SelfId ().PoolID (), poolStats, threadsStats);
435- Y_ENSURE (poolStats.MaxThreadCount > 0 );
436- Counters->SchedulerCapacity ->Set (poolStats.MaxThreadCount );
437-
438- Scheduler.SetPriorities (rule, poolStats.MaxThreadCount , TMonotonic::Now ());
439- }
440-
441398 void SetIteratorReadsRetrySettings (const NKikimrConfig::TTableServiceConfig::TIteratorReadsRetrySettings& settings) {
442399 auto ptr = MakeIntrusive<NKikimr::NKqp::TIteratorReadBackoffSettings>();
443400 ptr->StartRetryDelay = TDuration::MilliSeconds (settings.GetStartDelayMs ());
@@ -524,8 +481,9 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
524481 NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
525482 const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
526483
527- TComputeScheduler Scheduler;
528- TDuration AdvanceTimeInterval;
484+ std::shared_ptr<TComputeScheduler> Scheduler;
485+ TSchedulerActorOptions SchedulerOptions;
486+ TActorId SchedulerActorId;
529487
530488 // state sharded by TxId
531489 std::shared_ptr<TNodeServiceState> State_;
0 commit comments