@@ -107,18 +107,29 @@ struct TTxState {
107107 ui32 TxExecutionUnits = 0 ;
108108 TInstant CreatedAt;
109109
110- bool IsDataQuery = false ;
111-
112- TTaskState& Allocated (ui64 taskId, TInstant now, const TKqpResourcesRequest& resources) {
110+ TTaskState& Allocated (ui64 taskId, TInstant now, const TKqpResourcesRequest& resources, bool memoryAsExternal = false ) {
111+ ui64 externalMemory = resources.ExternalMemory ;
112+ ui64 resourceBrokerMemory = 0 ;
113+ if (memoryAsExternal) {
114+ externalMemory += resources.Memory ;
115+ } else {
116+ resourceBrokerMemory = resources.Memory ;
117+ }
113118
114- TxScanQueryMemory += resources.Memory ;
119+ TxExternalDataQueryMemory += externalMemory;
120+ TxScanQueryMemory += resourceBrokerMemory;
115121 if (!CreatedAt) {
116122 CreatedAt = now;
117123 }
118124
125+ if (resources.ExecutionUnits ) {
126+ Y_ABORT_UNLESS (!Tasks.contains (taskId));
127+ }
128+
119129 auto & taskState = Tasks[taskId];
120130 taskState.ExecutionUnits += resources.ExecutionUnits ;
121- taskState.ScanQueryMemory += resources.Memory ;
131+ taskState.ScanQueryMemory += resourceBrokerMemory;
132+ taskState.ExternalDataQueryMemory += externalMemory;
122133 if (!taskState.CreatedAt ) {
123134 taskState.CreatedAt = now;
124135 }
@@ -233,19 +244,28 @@ class TKqpResourceManager : public IKqpResourceManager {
233244 }
234245 };
235246
236- if (resources.MemoryPool == EKqpMemoryPool::DataQuery) {
237- NotifyExternalResourcesAllocated (txId, taskId, resources);
238- return result;
239- }
240-
241- Y_ABORT_UNLESS (resources.MemoryPool == EKqpMemoryPool::ScanQuery);
242247 if (Y_UNLIKELY (resources.Memory == 0 )) {
243248 return result;
244249 }
245250
246251 auto now = ActorSystem->Timestamp ();
247252 bool hasScanQueryMemory = true ;
248253 ui64 queryMemoryLimit = 0 ;
254+ // NOTE(gvit): the first memory request from the data query pool always satisfied.
255+ // all other requests are not guaranteed to be satisfied.
256+ // In the nearest future we need to implement several layers of memory requests.
257+ bool isFirstAllocationRequest = (resources.ExecutionUnits > 0 && resources.MemoryPool == EKqpMemoryPool::DataQuery);
258+ if (isFirstAllocationRequest) {
259+ auto & txBucket = TxBucket (txId);
260+ with_lock (txBucket.Lock ) {
261+ auto & tx = txBucket.Txs [txId];
262+ tx.Allocated (taskId, now, resources, /* memoryAsExternal=*/ true );
263+ ExternalDataQueryMemory.fetch_add (resources.Memory + resources.ExternalMemory );
264+ Counters->RmExternalMemory ->Add (resources.Memory + resources.ExternalMemory );
265+ }
266+
267+ return result;
268+ }
249269
250270 with_lock (Lock) {
251271 if (Y_UNLIKELY (!ResourceBroker)) {
@@ -286,15 +306,14 @@ class TKqpResourceManager : public IKqpResourceManager {
286306 }
287307 };
288308
289- if (auto it = txBucket.Txs .find (txId); it != txBucket.Txs .end ()) {
290- ui64 txTotalRequestedMemory = it->second .TxScanQueryMemory + resources.Memory ;
291- if (txTotalRequestedMemory > queryMemoryLimit) {
292- TStringBuilder reason;
293- reason << " TxId: " << txId << " , taskId: " << taskId << " . Query memory limit exceeded: "
294- << " requested " << txTotalRequestedMemory;
295- result.SetError (NKikimrKqp::TEvStartKqpTasksResponse::QUERY_MEMORY_LIMIT_EXCEEDED, reason);
296- return result;
297- }
309+ auto & tx = txBucket.Txs [txId];
310+ ui64 txTotalRequestedMemory = tx.TxScanQueryMemory + resources.Memory ;
311+ if (txTotalRequestedMemory > queryMemoryLimit) {
312+ TStringBuilder reason;
313+ reason << " TxId: " << txId << " , taskId: " << taskId << " . Query memory limit exceeded: "
314+ << " requested " << txTotalRequestedMemory;
315+ result.SetError (NKikimrKqp::TEvStartKqpTasksResponse::QUERY_MEMORY_LIMIT_EXCEEDED, reason);
316+ return result;
298317 }
299318
300319 bool allocated = ResourceBroker->SubmitTaskInstant (
@@ -310,7 +329,7 @@ class TKqpResourceManager : public IKqpResourceManager {
310329 return result;
311330 }
312331
313- auto & taskState = txBucket. Txs [txId]. Allocated (Counters, taskId, now, resources);
332+ auto & taskState = tx. Allocated (taskId, now, resources);
314333 if (!taskState.ResourceBrokerTaskId ) {
315334 taskState.ResourceBrokerTaskId = rbTaskId;
316335 } else {
@@ -334,7 +353,6 @@ class TKqpResourceManager : public IKqpResourceManager {
334353 void FreeResources (ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) override {
335354
336355 if (resources.MemoryPool == EKqpMemoryPool::DataQuery) {
337- NotifyExternalResourcesFreed (txId, taskId, resources);
338356 return ;
339357 }
340358
@@ -374,6 +392,7 @@ class TKqpResourceManager : public IKqpResourceManager {
374392
375393 void FreeResources (ui64 txId, ui64 taskId) override {
376394 ui64 releaseScanQueryMemory = 0 ;
395+ ui64 releaseExternalDataQueryMemory = 0 ;
377396 ui32 remainsTasks = 0 ;
378397
379398 auto & txBucket = TxBucket (txId);
@@ -387,34 +406,39 @@ class TKqpResourceManager : public IKqpResourceManager {
387406 return ;
388407 }
389408
390- auto taskIt = txIt->second .Tasks .find (taskId);
391- if (taskIt == txIt->second .Tasks .end ()) {
409+ auto & tx = txIt->second ;
410+ auto taskIt = tx.Tasks .find (taskId);
411+ if (taskIt == tx.Tasks .end ()) {
392412 return ;
393413 }
394414
395- if (taskIt->second .ExecutionUnits ) {
396- FreeExecutionUnits (std::exchange (taskIt->second .ExecutionUnits , 0 ));
397- }
398-
399- if (txIt->second .IsDataQuery ) {
400- guard.Clear ();
401- return NotifyExternalResourcesFreed (txId, taskId);
415+ auto & task = taskIt->second ;
416+ if (task.ExecutionUnits ) {
417+ FreeExecutionUnits (task.ExecutionUnits );
402418 }
403419
404- releaseScanQueryMemory = taskIt->second .ScanQueryMemory ;
420+ releaseExternalDataQueryMemory = task.ExternalDataQueryMemory ;
421+ releaseScanQueryMemory = task.ScanQueryMemory ;
405422
406- bool finished = ResourceBroker->FinishTaskInstant (
407- TEvResourceBroker::TEvFinishTask (taskIt->second .ResourceBrokerTaskId ), SelfId);
408- Y_DEBUG_ABORT_UNLESS (finished);
423+ if (task.ResourceBrokerTaskId ) {
424+ bool finished = ResourceBroker->FinishTaskInstant (
425+ TEvResourceBroker::TEvFinishTask (task.ResourceBrokerTaskId ), SelfId);
426+ Y_DEBUG_ABORT_UNLESS (finished);
427+ }
409428
410- remainsTasks = txIt-> second .Tasks .size () - 1 ;
429+ remainsTasks = tx .Tasks .size () - 1 ;
411430
412431 if (remainsTasks == 0 ) {
413432 txBucket.Txs .erase (txIt);
414433 } else {
415- txIt->second .Tasks .erase (taskIt);
416- txIt->second .TxScanQueryMemory -= releaseScanQueryMemory;
434+ tx.Tasks .erase (taskIt);
435+ tx.TxScanQueryMemory -= releaseScanQueryMemory;
436+ tx.TxExternalDataQueryMemory -= releaseExternalDataQueryMemory;
417437 }
438+
439+ i64 prev = ExternalDataQueryMemory.fetch_sub (releaseExternalDataQueryMemory);
440+ Counters->RmExternalMemory ->Sub (releaseExternalDataQueryMemory);
441+ Y_DEBUG_ABORT_UNLESS (prev >= 0 );
418442 } // with_lock (txBucket.Lock)
419443
420444 with_lock (Lock) {
@@ -442,101 +466,11 @@ class TKqpResourceManager : public IKqpResourceManager {
442466
443467 auto & txBucket = TxBucket (txId);
444468 with_lock (txBucket.Lock ) {
445- auto & tx = txBucket.Txs [txId];
446- tx.IsDataQuery = true ;
447- auto & task = tx.Tasks [taskId];
448-
449- task.ExternalDataQueryMemory = resources.Memory ;
450- tx.TxExternalDataQueryMemory += resources.Memory ;
469+ txBucket.Txs [txId].Allocated (taskId, TInstant (), resources);
470+ ExternalDataQueryMemory.fetch_add (resources.ExternalMemory );
471+ Counters->RmExternalMemory ->Add (resources.ExternalMemory );
451472 } // with_lock (txBucket.Lock)
452473
453- with_lock (Lock) {
454- ExternalDataQueryMemory += resources.Memory ;
455- } // with_lock (Lock)
456-
457- Counters->RmExternalMemory ->Add (resources.Memory );
458-
459- FireResourcesPublishing ();
460- }
461-
462- void NotifyExternalResourcesFreed (ui64 txId, ui64 taskId, const TKqpResourcesRequest& resources) override {
463- LOG_AS_D (" TxId: " << txId << " , taskId: " << taskId << " . External free: " << resources.ToString ());
464-
465- YQL_ENSURE (resources.MemoryPool == EKqpMemoryPool::DataQuery);
466-
467- ui64 releaseMemory = 0 ;
468-
469- auto & txBucket = TxBucket (txId);
470- with_lock (txBucket.Lock ) {
471- auto txIt = txBucket.Txs .find (txId);
472- if (txIt == txBucket.Txs .end ()) {
473- return ;
474- }
475-
476- auto taskIt = txIt->second .Tasks .find (taskId);
477- if (taskIt == txIt->second .Tasks .end ()) {
478- return ;
479- }
480-
481- if (taskIt->second .ExternalDataQueryMemory <= resources.Memory ) {
482- releaseMemory = taskIt->second .ExternalDataQueryMemory ;
483- if (txIt->second .Tasks .size () == 1 ) {
484- txBucket.Txs .erase (txId);
485- } else {
486- txIt->second .Tasks .erase (taskIt);
487- txIt->second .TxExternalDataQueryMemory -= releaseMemory;
488- }
489- } else {
490- releaseMemory = resources.Memory ;
491- taskIt->second .ExternalDataQueryMemory -= resources.Memory ;
492- }
493- } // with_lock (txBucket.Lock)
494-
495- with_lock (Lock) {
496- Y_DEBUG_ABORT_UNLESS (ExternalDataQueryMemory >= releaseMemory);
497- ExternalDataQueryMemory -= releaseMemory;
498- } // with_lock (Lock)
499-
500- Counters->RmExternalMemory ->Sub (releaseMemory);
501- Y_DEBUG_ABORT_UNLESS (Counters->RmExternalMemory ->Val () >= 0 );
502-
503- FireResourcesPublishing ();
504- }
505-
506- void NotifyExternalResourcesFreed (ui64 txId, ui64 taskId) override {
507- LOG_AS_D (" TxId: " << txId << " , taskId: " << taskId << " . External free." );
508-
509- ui64 releaseMemory = 0 ;
510-
511- auto & txBucket = TxBucket (txId);
512- with_lock (txBucket.Lock ) {
513- auto txIt = txBucket.Txs .find (txId);
514- if (txIt == txBucket.Txs .end ()) {
515- return ;
516- }
517-
518- auto taskIt = txIt->second .Tasks .find (taskId);
519- if (taskIt == txIt->second .Tasks .end ()) {
520- return ;
521- }
522-
523- releaseMemory = taskIt->second .ExternalDataQueryMemory ;
524-
525- if (txIt->second .Tasks .size () == 1 ) {
526- txBucket.Txs .erase (txId);
527- } else {
528- txIt->second .Tasks .erase (taskIt);
529- txIt->second .TxExternalDataQueryMemory -= releaseMemory;
530- }
531- } // with_lock (txBucket.Lock)
532-
533- with_lock (Lock) {
534- Y_DEBUG_ABORT_UNLESS (ExternalDataQueryMemory >= releaseMemory);
535- ExternalDataQueryMemory -= releaseMemory;
536- } // with_lock (Lock)
537-
538- Counters->RmExternalMemory ->Sub (releaseMemory);
539- Y_DEBUG_ABORT_UNLESS (Counters->RmExternalMemory ->Val () >= 0 );
540474
541475 FireResourcesPublishing ();
542476 }
@@ -629,7 +563,7 @@ class TKqpResourceManager : public IKqpResourceManager {
629563 std::atomic<i32 > ExecutionUnitsResource;
630564 std::atomic<i32 > ExecutionUnitsLimit;
631565 TLimitedResource<ui64> ScanQueryMemoryResource;
632- ui64 ExternalDataQueryMemory = 0 ;
566+ std::atomic< i64 > ExternalDataQueryMemory = 0 ;
633567
634568 // current state
635569 std::array<TTxStatesBucket, BucketsCount> Buckets;
@@ -957,7 +891,7 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
957891 str << " State storage key: " << WbState.Tenant << Endl;
958892 with_lock (ResourceManager->Lock ) {
959893 str << " ScanQuery memory resource: " << ResourceManager->ScanQueryMemoryResource .ToString () << Endl;
960- str << " External DataQuery memory: " << ResourceManager->ExternalDataQueryMemory << Endl;
894+ str << " External DataQuery memory: " << ResourceManager->ExternalDataQueryMemory . load () << Endl;
961895 str << " ExecutionUnits resource: " << ResourceManager->ExecutionUnitsResource .load () << Endl;
962896 }
963897 str << " Last resource broker task id: " << ResourceManager->LastResourceBrokerTaskId .load () << Endl;
0 commit comments