@@ -141,6 +141,7 @@ class TKqpResourceManager : public IKqpResourceManager {
141141 : Config(config)
142142 , Counters(counters)
143143 , ExecutionUnitsResource(Config.GetComputeActorsCount())
144+ , ExecutionUnitsLimit(Config.GetComputeActorsCount())
144145 , ScanQueryMemoryResource(Config.GetQueryMemoryLimit())
145146 , PublishResourcesByExchanger(Config.GetEnablePublishResourcesByExchanger()) {
146147
@@ -174,20 +175,39 @@ class TKqpResourceManager : public IKqpResourceManager {
174175 }
175176 }
176177
178+ bool AllocateExecutionUnits (ui32 cnt) override {
179+ i32 prev = ExecutionUnitsResource.fetch_sub (cnt);
180+ if (prev < (i32 )cnt) {
181+ ExecutionUnitsResource.fetch_add (cnt);
182+ return false ;
183+ } else {
184+ Counters->RmComputeActors ->Add (cnt);
185+ return true ;
186+ }
187+ }
188+
189+ void FreeExecutionUnits (ui32 cnt) override {
190+ if (cnt == 0 ) {
191+ return ;
192+ }
193+
194+ ExecutionUnitsResource.fetch_add (cnt);
195+ Counters->RmComputeActors ->Sub (cnt);
196+ }
197+
177198 bool AllocateResources (ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources,
178199 TKqpNotEnoughResources* details = nullptr ) override {
179200 if (resources.MemoryPool == EKqpMemoryPool::DataQuery) {
180201 NotifyExternalResourcesAllocated (txId, taskId, resources);
181202 return true ;
182203 }
183204 Y_ABORT_UNLESS (resources.MemoryPool == EKqpMemoryPool::ScanQuery);
184- if (Y_UNLIKELY (resources.Memory == 0 && resources. ExecutionUnits == 0 )) {
205+ if (Y_UNLIKELY (resources.Memory == 0 )) {
185206 return true ;
186207 }
187208
188209 auto now = ActorSystem->Timestamp ();
189210 bool hasScanQueryMemory = true ;
190- bool hasExecutionUnits = true ;
191211 ui64 queryMemoryLimit = 0 ;
192212
193213 with_lock (Lock) {
@@ -200,11 +220,8 @@ class TKqpResourceManager : public IKqpResourceManager {
200220 }
201221
202222 hasScanQueryMemory = ScanQueryMemoryResource.Has (resources.Memory );
203- hasExecutionUnits = ExecutionUnitsResource.Has (resources.ExecutionUnits );
204-
205- if (hasScanQueryMemory && hasExecutionUnits) {
223+ if (hasScanQueryMemory) {
206224 ScanQueryMemoryResource.Acquire (resources.Memory );
207- ExecutionUnitsResource.Acquire (resources.ExecutionUnits );
208225 queryMemoryLimit = Config.GetQueryMemoryLimit ();
209226 }
210227 } // with_lock (Lock)
@@ -218,15 +235,6 @@ class TKqpResourceManager : public IKqpResourceManager {
218235 return false ;
219236 }
220237
221- if (!hasExecutionUnits) {
222- Counters->RmNotEnoughComputeActors ->Inc ();
223- LOG_AS_N (" TxId: " << txId << " , taskId: " << taskId << " . Not enough ExecutionUnits, requested: " << resources.ExecutionUnits );
224- if (details) {
225- details->SetExecutionUnits ();
226- }
227- return false ;
228- }
229-
230238 ui64 rbTaskId = LastResourceBrokerTaskId.fetch_add (1 ) + 1 ;
231239 TString rbTaskName = TStringBuilder () << " kqp-" << txId << ' -' << taskId << ' -' << rbTaskId;
232240 bool extraAlloc = false ;
@@ -239,7 +247,6 @@ class TKqpResourceManager : public IKqpResourceManager {
239247
240248 with_lock (Lock) {
241249 ScanQueryMemoryResource.Release (resources.Memory );
242- ExecutionUnitsResource.Release (resources.ExecutionUnits );
243250 } // with_lock (Lock)
244251
245252 Counters->RmNotEnoughMemory ->Inc ();
@@ -261,7 +268,6 @@ class TKqpResourceManager : public IKqpResourceManager {
261268
262269 with_lock (Lock) {
263270 ScanQueryMemoryResource.Release (resources.Memory );
264- ExecutionUnitsResource.Release (resources.ExecutionUnits );
265271 } // with_lock (Lock)
266272
267273 Counters->RmNotEnoughMemory ->Inc ();
@@ -276,14 +282,12 @@ class TKqpResourceManager : public IKqpResourceManager {
276282 auto & txState = txBucket.Txs [txId];
277283
278284 txState.TxScanQueryMemory += resources.Memory ;
279- txState.TxExecutionUnits += resources.ExecutionUnits ;
280285 if (!txState.CreatedAt ) {
281286 txState.CreatedAt = now;
282287 }
283288
284289 auto & taskState = txState.Tasks [taskId];
285290 taskState.ScanQueryMemory += resources.Memory ;
286- taskState.ExecutionUnits += resources.ExecutionUnits ;
287291 if (!taskState.CreatedAt ) {
288292 taskState.CreatedAt = now;
289293 }
@@ -299,7 +303,6 @@ class TKqpResourceManager : public IKqpResourceManager {
299303
300304 LOG_AS_D (" TxId: " << txId << " , taskId: " << taskId << " . Allocated " << resources.ToString ());
301305
302- Counters->RmComputeActors ->Add (resources.ExecutionUnits );
303306 Counters->RmMemory ->Add (resources.Memory );
304307 if (extraAlloc) {
305308 Counters->RmExtraMemAllocs ->Inc ();
@@ -340,20 +343,16 @@ class TKqpResourceManager : public IKqpResourceManager {
340343 }
341344
342345 taskIt->second .ScanQueryMemory -= resources.Memory ;
343- taskIt->second .ExecutionUnits -= resources.ExecutionUnits ;
344346
345347 bool reduced = ResourceBroker->ReduceTaskResourcesInstant (
346348 taskIt->second .ResourceBrokerTaskId , {0 , resources.Memory }, SelfId);
347349 Y_DEBUG_ABORT_UNLESS (reduced);
348350
349351 txIt->second .TxScanQueryMemory -= resources.Memory ;
350- txIt->second .TxExecutionUnits -= resources.ExecutionUnits ;
351352
352353 ScanQueryMemoryResource.Release (resources.Memory );
353- ExecutionUnitsResource.Release (resources.ExecutionUnits );
354354 }
355355
356- Counters->RmComputeActors ->Sub (resources.ExecutionUnits );
357356 Counters->RmMemory ->Sub (resources.Memory );
358357
359358 Y_DEBUG_ABORT_UNLESS (Counters->RmComputeActors ->Val () >= 0 );
@@ -364,7 +363,6 @@ class TKqpResourceManager : public IKqpResourceManager {
364363
365364 void FreeResources (ui64 txId, ui64 taskId) override {
366365 ui64 releaseScanQueryMemory = 0 ;
367- ui32 releaseExecutionUnits = 0 ;
368366 ui32 remainsTasks = 0 ;
369367
370368 auto & txBucket = TxBucket (txId);
@@ -389,7 +387,6 @@ class TKqpResourceManager : public IKqpResourceManager {
389387 }
390388
391389 releaseScanQueryMemory = taskIt->second .ScanQueryMemory ;
392- releaseExecutionUnits = taskIt->second .ExecutionUnits ;
393390
394391 bool finished = ResourceBroker->FinishTaskInstant (
395392 TEvResourceBroker::TEvFinishTask (taskIt->second .ResourceBrokerTaskId ), SelfId);
@@ -402,20 +399,17 @@ class TKqpResourceManager : public IKqpResourceManager {
402399 } else {
403400 txIt->second .Tasks .erase (taskIt);
404401 txIt->second .TxScanQueryMemory -= releaseScanQueryMemory;
405- txIt->second .TxExecutionUnits -= releaseExecutionUnits;
406402 }
407403 } // with_lock (txBucket.Lock)
408404
409405 with_lock (Lock) {
410406 ScanQueryMemoryResource.Release (releaseScanQueryMemory);
411- ExecutionUnitsResource.Release (releaseExecutionUnits);
412407 } // with_lock (Lock)
413408
414409 LOG_AS_D (" TxId: " << txId << " , taskId: " << taskId << " . Released resources, "
415- << " ScanQueryMemory: " << releaseScanQueryMemory << " , ExecutionUnits: " << releaseExecutionUnits << " . "
410+ << " ScanQueryMemory: " << releaseScanQueryMemory << " . "
416411 << " Remains " << remainsTasks << " tasks in this tx." );
417412
418- Counters->RmComputeActors ->Sub (releaseExecutionUnits);
419413 Counters->RmMemory ->Sub (releaseScanQueryMemory);
420414
421415 Y_DEBUG_ABORT_UNLESS (Counters->RmComputeActors ->Val () >= 0 );
@@ -426,7 +420,6 @@ class TKqpResourceManager : public IKqpResourceManager {
426420
427421 void FreeResources (ui64 txId) override {
428422 ui64 releaseScanQueryMemory = 0 ;
429- ui32 releaseExecutionUnits = 0 ;
430423
431424 auto & txBucket = TxBucket (txId);
432425
@@ -450,21 +443,17 @@ class TKqpResourceManager : public IKqpResourceManager {
450443 }
451444
452445 releaseScanQueryMemory = txIt->second .TxScanQueryMemory ;
453- releaseExecutionUnits = txIt->second .TxExecutionUnits ;
454446
455447 txBucket.Txs .erase (txIt);
456448 } // with_lock (txBucket.Lock)
457449
458450 with_lock (Lock) {
459451 ScanQueryMemoryResource.Release (releaseScanQueryMemory);
460- ExecutionUnitsResource.Release (releaseExecutionUnits);
461452 } // with_lock (Lock)
462453
463454 LOG_AS_D (" TxId: " << txId << " . Released resources, "
464- << " ScanQueryMemory: " << releaseScanQueryMemory << " , ExecutionUnits: " << releaseExecutionUnits << " . "
465- << " Tx completed." );
455+ << " ScanQueryMemory: " << releaseScanQueryMemory << " , ExecutionUnits: " << " Tx completed." );
466456
467- Counters->RmComputeActors ->Sub (releaseExecutionUnits);
468457 Counters->RmMemory ->Sub (releaseScanQueryMemory);
469458
470459 Y_DEBUG_ABORT_UNLESS (Counters->RmComputeActors ->Val () >= 0 );
@@ -635,7 +624,7 @@ class TKqpResourceManager : public IKqpResourceManager {
635624 result.Memory .fill (0 );
636625
637626 with_lock (Lock) {
638- result.ExecutionUnits = ExecutionUnitsResource.Available ();
627+ result.ExecutionUnits = ExecutionUnitsResource.load ();
639628 result.Memory [EKqpMemoryPool::ScanQuery] = ScanQueryMemoryResource.Available ();
640629 }
641630
@@ -695,7 +684,8 @@ class TKqpResourceManager : public IKqpResourceManager {
695684 TAdaptiveLock Lock;
696685
697686 // limits (guarded by Lock)
698- TLimitedResource<ui32> ExecutionUnitsResource;
687+ std::atomic<i32 > ExecutionUnitsResource;
688+ std::atomic<i32 > ExecutionUnitsLimit;
699689 TLimitedResource<ui64> ScanQueryMemoryResource;
700690 ui64 ExternalDataQueryMemory = 0 ;
701691
@@ -974,7 +964,9 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
974964 LOG_I (" Updated table service config: " << config.DebugString ());
975965
976966 with_lock (ResourceManager->Lock ) {
977- ResourceManager->ExecutionUnitsResource .SetNewLimit (config.GetComputeActorsCount ());
967+ i32 prev = ResourceManager->ExecutionUnitsLimit .load ();
968+ ResourceManager->ExecutionUnitsLimit .store (config.GetComputeActorsCount ());
969+ ResourceManager->ExecutionUnitsResource .fetch_add ((i32 )config.GetComputeActorsCount () - prev);
978970 ResourceManager->Config .Swap (&config);
979971 }
980972
@@ -1024,7 +1016,7 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
10241016 with_lock (ResourceManager->Lock ) {
10251017 str << " ScanQuery memory resource: " << ResourceManager->ScanQueryMemoryResource .ToString () << Endl;
10261018 str << " External DataQuery memory: " << ResourceManager->ExternalDataQueryMemory << Endl;
1027- str << " ExecutionUnits resource: " << ResourceManager->ExecutionUnitsResource .ToString () << Endl;
1019+ str << " ExecutionUnits resource: " << ResourceManager->ExecutionUnitsResource .load () << Endl;
10281020 }
10291021 str << " Last resource broker task id: " << ResourceManager->LastResourceBrokerTaskId .load () << Endl;
10301022 if (WbState.LastPublishTime ) {
@@ -1160,11 +1152,11 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
11601152 }
11611153 ActorIdToProto (MakeKqpResourceManagerServiceID (SelfId ().NodeId ()), payload.MutableResourceManagerActorId ()); // legacy
11621154 with_lock (ResourceManager->Lock ) {
1163- payload.SetAvailableComputeActors (ResourceManager->ExecutionUnitsResource .Available ()); // legacy
1155+ payload.SetAvailableComputeActors (ResourceManager->ExecutionUnitsResource .load ()); // legacy
11641156 payload.SetTotalMemory (ResourceManager->ScanQueryMemoryResource .GetLimit ()); // legacy
11651157 payload.SetUsedMemory (ResourceManager->ScanQueryMemoryResource .GetLimit () - ResourceManager->ScanQueryMemoryResource .Available ()); // legacy
11661158
1167- payload.SetExecutionUnits (ResourceManager->ExecutionUnitsResource .Available ());
1159+ payload.SetExecutionUnits (ResourceManager->ExecutionUnitsResource .load ());
11681160 auto * pool = payload.MutableMemory ()->Add ();
11691161 pool->SetPool (EKqpMemoryPool::ScanQuery);
11701162 pool->SetAvailable (ResourceManager->ScanQueryMemoryResource .Available ());
0 commit comments