@@ -408,86 +408,48 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
408408 }
409409 }
410410
411+ YQL_ENSURE (Planner);
412+ bool populateChannels = Planner->AcknowledgeCA (taskId, computeActor, &state);
413+
411414 switch (state.GetState ()) {
412415 case NYql::NDqProto::COMPUTE_STATE_UNKNOWN: {
413416 YQL_ENSURE (false , " unexpected state from " << computeActor << " , task: " << taskId);
414417 return ;
415418 }
416419
417- case NYql::NDqProto::COMPUTE_STATE_FAILURE: {
418- ReplyErrorAndDie (NYql::NDq::DqStatusToYdbStatus (state.GetStatusCode ()), state.MutableIssues ());
419- return ;
420- }
421-
422420 case NYql::NDqProto::COMPUTE_STATE_EXECUTING: {
423- // initial TEvState event from Compute Actor
424- // there can be race with RM answer
425- if (Planner) {
426- if (Planner->GetPendingComputeTasks ().erase (taskId)) {
427- auto it = Planner->GetPendingComputeActors ().emplace (computeActor, TProgressStat ());
428- YQL_ENSURE (it.second );
429-
430- if (state.HasStats ()) {
431- it.first ->second .Set (state.GetStats ());
432- }
433-
434- auto & task = TasksGraph.GetTask (taskId);
435- task.ComputeActorId = computeActor;
436-
437- THashMap<TActorId, THashSet<ui64>> updates;
438- CollectTaskChannelsUpdates (task, updates);
439- PropagateChannelsUpdates (updates);
440- } else {
441- auto it = Planner->GetPendingComputeActors ().find (computeActor);
442- if (it != Planner->GetPendingComputeActors ().end ()) {
443- if (state.HasStats ()) {
444- it->second .Set (state.GetStats ());
445- }
446- }
447- }
421+ if (populateChannels) {
422+ auto & task = TasksGraph.GetTask (taskId);
423+ THashMap<TActorId, THashSet<ui64>> updates;
424+ CollectTaskChannelsUpdates (task, updates);
425+ PropagateChannelsUpdates (updates);
448426 }
449427 break ;
450428 }
451429
430+ case NYql::NDqProto::COMPUTE_STATE_FAILURE:
452431 case NYql::NDqProto::COMPUTE_STATE_FINISHED: {
432+ ExtraData[computeActor].Swap (state.MutableExtraData ());
453433 if (Stats) {
454434 Stats->AddComputeActorStats (
455435 computeActor.NodeId (),
456436 std::move (*state.MutableStats ()),
457437 TDuration::MilliSeconds (AggregationSettings.GetCollectLongTasksStatsTimeoutMs ())
458438 );
459439 }
460- ExtraData[computeActor].Swap (state.MutableExtraData ());
461440
462441 LastTaskId = taskId;
463442 LastComputeActorId = computeActor.ToString ();
464-
465- if (Planner) {
466- auto it = Planner->GetPendingComputeActors ().find (computeActor);
467- if (it == Planner->GetPendingComputeActors ().end ()) {
468- LOG_W (" Got execution state for compute actor: " << computeActor
469- << " , task: " << taskId
470- << " , state: " << NYql::NDqProto::EComputeState_Name ((NYql::NDqProto::EComputeState) state.GetState ())
471- << " , too early (waiting reply from RM)" );
472-
473- if (Planner && Planner->GetPendingComputeTasks ().erase (taskId)) {
474- LOG_E (" Got execution state for compute actor: " << computeActor
475- << " , for unknown task: " << state.GetTaskId ()
476- << " , state: " << NYql::NDqProto::EComputeState_Name ((NYql::NDqProto::EComputeState) state.GetState ()));
477- return ;
478- }
479- } else {
480- if (state.HasStats ()) {
481- it->second .Set (state.GetStats ());
482- }
483- LastStats.emplace_back (std::move (it->second ));
484- Planner->GetPendingComputeActors ().erase (it);
485- YQL_ENSURE (Planner->GetPendingComputeTasks ().find (taskId) == Planner->GetPendingComputeTasks ().end ());
486- }
487- }
443+ YQL_ENSURE (Planner);
444+ Planner->CompletedCA (taskId, computeActor);
488445 }
489446 }
490447
448+ if (state.GetState () == NYql::NDqProto::COMPUTE_STATE_FAILURE) {
449+ ReplyErrorAndDie (NYql::NDq::DqStatusToYdbStatus (state.GetStatusCode ()), state.MutableIssues ());
450+ return ;
451+ }
452+
491453 static_cast <TDerived*>(this )->CheckExecutionComplete ();
492454 }
493455
@@ -683,20 +645,14 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
683645 auto taskId = startedTask.GetTaskId ();
684646 auto & task = TasksGraph.GetTask (taskId);
685647
686- task.ComputeActorId = ActorIdFromProto (startedTask.GetActorId ());
687-
688- LOG_D (" Executing task: " << taskId << " on compute actor: " << task.ComputeActorId );
689-
690- if (Planner) {
691- if (Planner->GetPendingComputeTasks ().erase (taskId) == 0 ) {
692- LOG_D (" Executing task: " << taskId << " , compute actor: " << task.ComputeActorId << " , already finished" );
693- } else {
694- auto result = Planner->GetPendingComputeActors ().emplace (std::make_pair (task.ComputeActorId , TProgressStat ()));
695- YQL_ENSURE (result.second );
696-
697- CollectTaskChannelsUpdates (task, channelsUpdates);
698- }
648+ TActorId computeActorId = ActorIdFromProto (startedTask.GetActorId ());
649+ LOG_D (" Executing task: " << taskId << " on compute actor: " << computeActorId);
650+ YQL_ENSURE (Planner);
651+ bool channelUpdates = Planner->AcknowledgeCA (taskId, computeActorId, nullptr );
652+ if (channelUpdates) {
653+ CollectTaskChannelsUpdates (task, channelsUpdates);
699654 }
655+
700656 }
701657
702658 PropagateChannelsUpdates (channelsUpdates);
@@ -789,16 +745,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
789745 LastResourceUsageUpdate = now;
790746
791747 TProgressStat::TEntry consumption;
792- if (Planner) {
793- for (const auto & p : Planner->GetPendingComputeActors ()) {
794- const auto & t = p.second .GetLastUsage ();
795- consumption += t;
796- }
797- }
798748
799- for (const auto & p : LastStats) {
800- const auto & t = p.GetLastUsage ();
801- consumption += t;
749+ if (Planner) {
750+ consumption += Planner->CalculateConsumptionUpdate ();
802751 }
803752
804753 auto ru = NRuCalc::CalcRequestUnit (consumption);
@@ -811,13 +760,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
811760 return ;
812761
813762 if (Planner) {
814- for (auto & p : Planner->GetPendingComputeActors ()) {
815- p.second .Update ();
816- }
817- }
818-
819- for (auto & p : LastStats) {
820- p.Update ();
763+ Planner->ShiftConsumption ();
821764 }
822765
823766 if (Request.RlPath ) {
@@ -1754,7 +1697,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17541697 ExecuterSpan.EndError (TStringBuilder () << NYql::NDqProto::StatusIds_StatusCode_Name (status));
17551698 }
17561699
1757- static_cast <TDerived*>( this )-> FillResponseStats (Ydb::StatusIds::TIMEOUT);
1700+ FillResponseStats (Ydb::StatusIds::TIMEOUT);
17581701
17591702 // TEvAbortExecution can come from either ComputeActor or SessionActor (== Target).
17601703 if (abortSender != Target) {
@@ -1771,6 +1714,34 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17711714 this ->PassAway ();
17721715 }
17731716
1717+ void FillResponseStats (Ydb::StatusIds::StatusCode status) {
1718+ auto & response = *ResponseEv->Record .MutableResponse ();
1719+
1720+ response.SetStatus (status);
1721+
1722+ if (Stats) {
1723+ ReportEventElapsedTime ();
1724+
1725+ Stats->FinishTs = TInstant::Now ();
1726+ Stats->Finish ();
1727+
1728+ if (Stats->CollectStatsByLongTasks || CollectFullStats (Request.StatsMode )) {
1729+ for (ui32 txId = 0 ; txId < Request.Transactions .size (); ++txId) {
1730+ const auto & tx = Request.Transactions [txId].Body ;
1731+ auto planWithStats = AddExecStatsToTxPlan (tx->GetPlan (), response.GetResult ().GetStats ());
1732+ response.MutableResult ()->MutableStats ()->AddTxPlansWithStats (planWithStats);
1733+ }
1734+ }
1735+
1736+ if (Stats->CollectStatsByLongTasks ) {
1737+ const auto & txPlansWithStats = response.GetResult ().GetStats ().GetTxPlansWithStats ();
1738+ if (!txPlansWithStats.empty ()) {
1739+ LOG_N (" Full stats: " << txPlansWithStats);
1740+ }
1741+ }
1742+ }
1743+ }
1744+
17741745 virtual void ReplyErrorAndDie (Ydb::StatusIds::StatusCode status,
17751746 google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues)
17761747 {
@@ -1790,7 +1761,8 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17901761 AlreadyReplied = true ;
17911762 auto & response = *ResponseEv->Record .MutableResponse ();
17921763
1793- response.SetStatus (status);
1764+ FillResponseStats (status);
1765+
17941766 response.MutableIssues ()->Swap (issues);
17951767
17961768 LOG_T (" ReplyErrorAndDie. Response: " << response.DebugString ()
@@ -1968,8 +1940,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
19681940 TActorId KqpShardsResolverId;
19691941 THashMap<TActorId, NYql::NDqProto::TComputeActorExtraData> ExtraData;
19701942
1971- TVector<TProgressStat> LastStats;
1972-
19731943 TInstant StartResolveTime;
19741944 TInstant LastResourceUsageUpdate;
19751945
0 commit comments