@@ -60,15 +60,8 @@ NKqpNode::TState& GetStateBucketByTx(std::shared_ptr<TBucketArray> buckets, ui64
6060}
6161
6262void FinishKqpTask (ui64 txId, ui64 taskId, bool success, NKqpNode::TState& bucket, std::shared_ptr<NRm::IKqpResourceManager> ResourceManager) {
63- auto ctx = bucket.RemoveTask (txId, taskId, success);
64- if (ctx) {
65- ResourceManager->FreeExecutionUnits (1 );
66- if (ctx->ComputeActorsNumber == 0 ) {
67- ResourceManager->FreeResources (txId);
68- } else {
69- ResourceManager->FreeResources (txId, taskId);
70- }
71- }
63+ bucket.RemoveTask (txId, taskId, success);
64+ ResourceManager->FreeResources (txId, taskId);
7265}
7366
7467struct TMemoryQuotaManager : public NYql ::NDq::TGuaranteeQuotaManager {
@@ -86,7 +79,8 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
8679 , Buckets(std::move(buckets))
8780 , TxId(txId)
8881 , TaskId(taskId)
89- , InstantAlloc(instantAlloc) {
82+ , InstantAlloc(instantAlloc)
83+ {
9084 }
9185
9286 ~TMemoryQuotaManager () override {
@@ -100,8 +94,10 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
10094 return false ;
10195 }
10296
103- if (!ResourceManager->AllocateResources (TxId, TaskId,
104- NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize})) {
97+ auto result = ResourceManager->AllocateResources (TxId, TaskId,
98+ NRm::TKqpResourcesRequest{.MemoryPool = MemoryPool, .Memory = extraSize});
99+
100+ if (!result) {
105101 LOG_W (" Can not allocate memory. TxId: " << TxId << " , taskId: " << TaskId << " , memory: +" << extraSize);
106102 return false ;
107103 }
@@ -292,8 +288,14 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
292288 LOG_D (" TxId: " << txId << " , new compute tasks request from " << requester
293289 << " with " << msg.GetTasks ().size () << " tasks: " << TasksIdsStr (msg.GetTasks ()));
294290
295- NKqpNode::TTasksRequest request;
296- request.Executer = ActorIdFromProto (msg.GetExecuterActorId ());
291+ auto now = TAppData::TimeProvider->Now ();
292+ NKqpNode::TTasksRequest request (txId, ev->Sender , now);
293+ auto & msgRtSettings = msg.GetRuntimeSettings ();
294+ if (msgRtSettings.GetTimeoutMs () > 0 ) {
295+ // compute actor should not arm timer since in case of timeout it will receive TEvAbortExecution from Executer
296+ auto timeout = TDuration::MilliSeconds (msgRtSettings.GetTimeoutMs ());
297+ request.Deadline = now + timeout + /* gap */ TDuration::Seconds (5 );
298+ }
297299
298300 auto & bucket = GetStateBucketByTx (Buckets, txId);
299301
@@ -311,16 +313,6 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
311313 memoryPool = NRm::EKqpMemoryPool::Unspecified;
312314 }
313315
314- size_t executionUnits = msg.GetTasks ().size ();
315- if (!ResourceManager ()->AllocateExecutionUnits (executionUnits)) {
316- Counters->RmNotEnoughComputeActors ->Inc ();
317- TStringBuilder error;
318- error << " TxId: " << txId << " , NodeId: " << SelfId ().NodeId () << " , not enough compute actors, requested " << msg.GetTasks ().size ();
319- LOG_N (error);
320- ReplyError (txId, request.Executer , msg, NKikimrKqp::TEvStartKqpTasksResponse::NOT_ENOUGH_EXECUTION_UNITS, error);
321- return ;
322- }
323-
324316 ui32 requestChannels = 0 ;
325317 for (auto & dqTask : *msg.MutableTasks ()) {
326318 auto estimation = EstimateTaskResources (dqTask, Config, msg.GetTasks ().size ());
@@ -348,20 +340,20 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
348340 for (auto & task : request.InFlyTasks ) {
349341 NRm::TKqpResourcesRequest resourcesRequest;
350342 resourcesRequest.MemoryPool = memoryPool;
343+ resourcesRequest.ExecutionUnits = 1 ;
351344
352345 // !!!!!!!!!!!!!!!!!!!!!
353346 // we have to allocate memory instead of reserve only. currently, this memory will not be used for request processing.
354347 resourcesRequest.Memory = Min<double >(task.second .Memory , 1 << 19 ) /* 512kb limit for check that memory exists for processing with minimal requirements */ ;
355348
356- NRm::TKqpNotEnoughResources resourcesResponse;
357- if (!ResourceManager ()->AllocateResources (txId, task.first , resourcesRequest, &resourcesResponse)) {
349+ auto result = ResourceManager ()->AllocateResources (txId, task.first , resourcesRequest);
350+
351+ if (!result) {
358352 for (ui64 taskId : allocatedTasks) {
359353 ResourceManager ()->FreeResources (txId, taskId);
360354 }
361355
362- ResourceManager ()->FreeExecutionUnits (executionUnits);
363-
364- ReplyError (txId, request.Executer , msg, resourcesResponse.GetStatus (), resourcesResponse.GetFailReason ());
356+ ReplyError (txId, request.Executer , msg, result.GetStatus (), result.GetFailReason ());
365357 return ;
366358 }
367359
@@ -377,14 +369,6 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
377369 memoryLimits.MkqlHeavyProgramMemoryLimit = Config.GetMkqlHeavyProgramMemoryLimit ();
378370
379371 NYql::NDq::TComputeRuntimeSettings runtimeSettingsBase;
380- auto & msgRtSettings = msg.GetRuntimeSettings ();
381- if (msgRtSettings.GetTimeoutMs () > 0 ) {
382- // compute actor should not arm timer since in case of timeout it will receive TEvAbortExecution from Executer
383- auto timeout = TDuration::MilliSeconds (msgRtSettings.GetTimeoutMs ());
384- request.Deadline = TAppData::TimeProvider->Now () + timeout + /* gap */ TDuration::Seconds (5 );
385- bucket.InsertExpiringRequest (request.Deadline , txId, requester);
386- }
387-
388372 runtimeSettingsBase.ExtraMemoryAllocationPool = memoryPool;
389373 runtimeSettingsBase.FailOnUndelivery = msgRtSettings.GetExecType () != NYql::NDqProto::TComputeRuntimeSettings::SCAN;
390374
@@ -502,7 +486,7 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
502486
503487 Counters->NodeServiceProcessTime ->Collect (NHPTimer::GetTimePassed (&workHandlerStart) * SecToUsec);
504488
505- bucket.NewRequest (txId, requester, std::move (request), memoryPool );
489+ bucket.NewRequest (std::move (request));
506490 }
507491
508492 // used only for unit tests
@@ -522,36 +506,30 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
522506 Counters->NodeServiceProcessCancelTime ->Collect (timer.Passed () * SecToUsec);
523507 }
524508
525- void TerminateTx (ui64 txId, const TString& reason) {
509+ void TerminateTx (ui64 txId, const TString& reason, NYql::NDqProto::StatusIds_StatusCode status = NYql::NDqProto::StatusIds::UNSPECIFIED ) {
526510 auto & bucket = GetStateBucketByTx (Buckets, txId);
527511 auto tasksToAbort = bucket.GetTasksByTxId (txId);
528512
529513 if (!tasksToAbort.empty ()) {
514+ TStringBuilder finalReason;
515+ finalReason << " node service cancelled the task, because it " << reason
516+ << " , NodeId: " << SelfId ().NodeId ()
517+ << " , TxId: " << txId;
518+
519+ LOG_E (finalReason);
530520 for (const auto & [taskId, computeActorId]: tasksToAbort) {
531- auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::UNSPECIFIED,
532- reason);
533- Send (computeActorId, abortEv.Release ());
521+ auto abortEv = std::make_unique<TEvKqp::TEvAbortExecution>(status, reason);
522+ Send (computeActorId, abortEv.release ());
534523 }
535524 }
536525 }
537526
538527 void HandleWork (TEvents::TEvWakeup::TPtr& ev) {
539528 Schedule (TDuration::Seconds (1 ), ev->Release ().Release ());
540- std::vector<ui64> txIdsToFree;
541529 for (auto & bucket : *Buckets) {
542530 auto expiredRequests = bucket.ClearExpiredRequests ();
543531 for (auto & cxt : expiredRequests) {
544- LOG_D (" txId: " << cxt.RequestId .TxId << " , requester: " << cxt.RequestId .Requester
545- << " , execution timeout, request: " << cxt.Exists );
546- if (!cxt.Exists ) {
547- // it is ok since in most cases requests is finished by exlicit TEvAbortExecution from their Executer
548- LOG_I (" txId: " << cxt.RequestId .TxId << " , requester: " << cxt.RequestId .Requester
549- << " , unknown request" );
550- continue ;
551- }
552- // don't send to executer and compute actors, they will be destroyed by TEvAbortExecution in that order:
553- // KqpProxy -> SessionActor -> Executer -> ComputeActor
554- ResourceManager ()->FreeResources (cxt.RequestId .TxId );
532+ TerminateTx (cxt.TxId , " reached execution deadline" , NYql::NDqProto::StatusIds::TIMEOUT);
555533 }
556534 }
557535 }
@@ -619,9 +597,9 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
619597 switch (ev->Get ()->SourceType ) {
620598 case TEvKqpNode::TEvStartKqpTasksResponse::EventType: {
621599 ui64 txId = ev->Cookie ;
622- LOG_E ( " TxId: " << txId << " , executer lost: " << ( int ) ev-> Get ()-> Reason ) ;
623-
624- TerminateTx (txId, " executer lost " );
600+ TStringBuilder reason ;
601+ reason << " executer lost: " << ( int ) ev-> Get ()-> Reason ;
602+ TerminateTx (txId, reason, NYql::NDqProto::StatusIds::ABORTED );
625603 break ;
626604 }
627605
0 commit comments