1414#include < ydb/core/kqp/rm_service/kqp_rm_service.h>
1515#include < ydb/core/kqp/runtime/kqp_read_actor.h>
1616#include < ydb/core/kqp/runtime/kqp_read_iterator_common.h>
17+ #include < ydb/core/kqp/runtime/kqp_compute_scheduler.h>
1718#include < ydb/core/kqp/common/kqp_resolve.h>
1819
1920#include < ydb/library/wilson_ids/wilson.h>
@@ -150,6 +151,9 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
150151 if (config.HasIteratorReadQuotaSettings ()) {
151152 SetIteratorReadsQuotaSettings (config.GetIteratorReadQuotaSettings ());
152153 }
154+ if (config.HasComputePoolsConfiguration ()) {
155+ SetPriorities (config.GetComputePoolsConfiguration ());
156+ }
153157 }
154158
155159 void Bootstrap () {
@@ -174,9 +178,12 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
174178
175179private:
176180 STATEFN (WorkState) {
181+ Y_DEFER {
182+ Scheduler.AdvanceTime (TlsActivationContext->Monotonic ());
183+ };
177184 switch (ev->GetTypeRewrite ()) {
178185 hFunc (TEvKqpNode::TEvStartKqpTasksRequest, HandleWork);
179- hFunc (TEvKqpNode:: TEvFinishKqpTask, HandleWork); // used only for unit tests
186+ hFunc (TEvFinishKqpTask, HandleWork); // used only for unit tests
180187 hFunc (TEvKqpNode::TEvCancelKqpTasksRequest, HandleWork);
181188 hFunc (TEvents::TEvWakeup, HandleWork);
182189 // misc
@@ -385,6 +392,8 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
385392
386393 TComputeStagesWithScan computesByStage;
387394
395+ auto now = TlsActivationContext->Monotonic ();
396+
388397 // start compute actors
389398 for (int i = 0 ; i < msg.GetTasks ().size (); ++i) {
390399 auto & dqTask = *msg.MutableTasks (i);
@@ -444,12 +453,27 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
444453 tableKind = tableKindExtract (meta);
445454 }
446455
456+ TComputeActorSchedulingOptions schedulingOptions {
457+ .Now = now,
458+ .NodeService = SelfId (),
459+ .Scheduler = &Scheduler,
460+ .Group = msg.GetRuntimeSettings ().GetExecType () == NYql::NDqProto::TComputeRuntimeSettings::SCAN ? " olap" : " " ,
461+ .Weight = 1 ,
462+ .NoThrottle = false ,
463+ };
464+
465+ if (msg.GetRuntimeSettings ().GetExecType () == NYql::NDqProto::TComputeRuntimeSettings::DATA) {
466+ schedulingOptions.NoThrottle = true ;
467+ schedulingOptions.Scheduler = nullptr ;
468+ }
469+
447470 IActor* computeActor;
448471 if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
449472 auto & info = computesByStage.UpsertTaskWithScan (dqTask, meta, !AppData ()->FeatureFlags .GetEnableSeparationComputeActorsFromRead ());
450473 computeActor = CreateKqpScanComputeActor (request.Executer , txId, &dqTask,
451474 AsyncIoFactory, runtimeSettings, memoryLimits,
452- NWilson::TTraceId (ev->TraceId ), ev->Get ()->Arena );
475+ NWilson::TTraceId (ev->TraceId ), ev->Get ()->Arena ,
476+ schedulingOptions);
453477 taskCtx.ComputeActorId = Register (computeActor);
454478 info.MutableActorIds ().emplace_back (taskCtx.ComputeActorId );
455479 } else {
@@ -459,7 +483,8 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
459483 }
460484 if (Y_LIKELY (!CaFactory)) {
461485 computeActor = CreateKqpComputeActor (request.Executer , txId, &dqTask, AsyncIoFactory,
462- runtimeSettings, memoryLimits, NWilson::TTraceId (ev->TraceId ), ev->Get ()->Arena , FederatedQuerySetup, GUCSettings);
486+ runtimeSettings, memoryLimits, NWilson::TTraceId (ev->TraceId ), ev->Get ()->Arena , FederatedQuerySetup, GUCSettings,
487+ schedulingOptions);
463488 taskCtx.ComputeActorId = Register (computeActor);
464489 } else {
465490 computeActor = CaFactory->CreateKqpComputeActor (request.Executer , txId, &dqTask,
@@ -490,8 +515,11 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
490515 }
491516
492517 // used only for unit tests
493- void HandleWork (TEvKqpNode:: TEvFinishKqpTask::TPtr& ev) {
518+ void HandleWork (TEvFinishKqpTask::TPtr& ev) {
494519 auto & msg = *ev->Get ();
520+ if (msg.SchedulerEntity ) {
521+ Scheduler.Deregister (*msg.SchedulerEntity );
522+ }
495523 FinishKqpTask (msg.TxId , msg.TaskId , msg.Success , GetStateBucketByTx (Buckets, msg.TxId ), GetKqpResourceManager ());
496524 }
497525
@@ -568,6 +596,10 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
568596 SetIteratorReadsQuotaSettings (event.GetConfig ().GetTableServiceConfig ().GetIteratorReadQuotaSettings ());
569597 }
570598
599+ if (event.GetConfig ().GetTableServiceConfig ().HasComputePoolsConfiguration ()) {
600+ SetPriorities (event.GetConfig ().GetTableServiceConfig ().GetComputePoolsConfiguration ());
601+ }
602+
571603 auto responseEv = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationResponse>(event);
572604 Send (ev->Sender , responseEv.Release (), IEventHandle::FlagTrackDelivery, ev->Cookie );
573605 }
@@ -576,6 +608,29 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
576608 SetDefaultIteratorQuotaSettings (settings.GetMaxRows (), settings.GetMaxBytes ());
577609 }
578610
611+ void SetPriorities (const NKikimrConfig::TTableServiceConfig::TComputePoolConfiguration& conf) {
612+ std::function<TComputeScheduler::TDistributionRule (const NKikimrConfig::TTableServiceConfig::TComputePoolConfiguration&)> convert
613+ = [&](const NKikimrConfig::TTableServiceConfig::TComputePoolConfiguration& conf)
614+ {
615+ if (conf.HasName ()) {
616+ return TComputeScheduler::TDistributionRule{.Share = conf.GetMaxCpuShare (), .Name = conf.GetName ()};
617+ } else if (conf.HasSubPoolsConfiguration ()) {
618+ auto res = TComputeScheduler::TDistributionRule{.Share = conf.GetMaxCpuShare ()};
619+ for (auto & subConf : conf.GetSubPoolsConfiguration ().GetSubPools ()) {
620+ res.SubRules .push_back (convert (subConf));
621+ }
622+ return res;
623+ } else {
624+ Y_ENSURE (false , " unknown case" );
625+ }
626+ };
627+ auto converted = convert (conf);
628+
629+ auto threads = TlsActivationContext->ActorSystem ()->GetPoolThreadsCount (SelfId ().PoolID ());
630+ Y_ENSURE (threads.has_value ());
631+ Scheduler.SetPriorities (converted.empty () ? TComputeScheduler::TDistributionRule{.Share = 1 , .Name = " olap" } : converted, threads.value (), TlsActivationContext->Monotonic ());
632+ }
633+
579634 void SetIteratorReadsRetrySettings (const NKikimrConfig::TTableServiceConfig::TIteratorReadsRetrySettings& settings) {
580635 auto ptr = MakeIntrusive<NKikimr::NKqp::TIteratorReadBackoffSettings>();
581636 ptr->StartRetryDelay = TDuration::MilliSeconds (settings.GetStartDelayMs ());
@@ -670,6 +725,8 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
670725 NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
671726 const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
672727
728+ TComputeScheduler Scheduler;
729+
673730 // state sharded by TxId
674731 std::shared_ptr<TBucketArray> Buckets;
675732};
0 commit comments