@@ -177,10 +177,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
177177 }
178178
179179 void ReportEventElapsedTime () {
180- if (Stats) {
181- ui64 elapsedMicros = TlsActivationContext-> GetCurrentEventTicksAsSeconds () * 1'000'000 ;
182- Stats-> ExecuterCpuTime += TDuration::MicroSeconds (elapsedMicros) ;
183- }
180+ YQL_ENSURE (Stats);
181+
182+ ui64 elapsedMicros = TlsActivationContext-> GetCurrentEventTicksAsSeconds () * 1'000'000 ;
183+ Stats-> ExecuterCpuTime += TDuration::MicroSeconds (elapsedMicros);
184184 }
185185
186186protected:
@@ -330,11 +330,10 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
330330 }
331331
332332 YQL_ENSURE (channel.DstTask == 0 );
333+ YQL_ENSURE (Stats);
333334
334- if (Stats) {
335- Stats->ResultBytes += batch.Size ();
336- Stats->ResultRows += batch.RowCount ();
337- }
335+ Stats->ResultBytes += batch.Size ();
336+ Stats->ResultRows += batch.RowCount ();
338337
339338 LOG_T (" Got result, channelId: " << channel.Id << " , shardId: " << task.Meta .ShardId
340339 << " , inputIndex: " << channel.DstInputIndex << " , from: " << ev->Sender
@@ -391,7 +390,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
391390 << " , state: " << NYql::NDqProto::EComputeState_Name ((NYql::NDqProto::EComputeState) state.GetState ())
392391 << " , stats: " << state.GetStats ());
393392
394- if (Stats && state.HasStats () && Request.ProgressStatsPeriod ) {
393+ YQL_ENSURE (Stats);
394+
395+ if (state.HasStats () && Request.ProgressStatsPeriod ) {
395396 Stats->UpdateTaskStats (taskId, state.GetStats ());
396397 auto now = TInstant::Now ();
397398 if (LastProgressStats + Request.ProgressStatsPeriod <= now) {
@@ -401,7 +402,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
401402 for (ui32 txId = 0 ; txId < Request.Transactions .size (); ++txId) {
402403 const auto & tx = Request.Transactions [txId].Body ;
403404 auto planWithStats = AddExecStatsToTxPlan (tx->GetPlan (), execStats);
404- execStats.AddTxPlansWithStats (planWithStats) ;
405+ (* execStats.MutableTxPlansWithStats ())[txId] = planWithStats ;
405406 }
406407 this ->Send (Target, progress.Release ());
407408 LastProgressStats = now;
@@ -418,13 +419,11 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
418419 if (Planner->CompletedCA (taskId, computeActor)) {
419420 ExtraData[computeActor].Swap (state.MutableExtraData ());
420421
421- if (Stats) {
422- Stats->AddComputeActorStats (
423- computeActor.NodeId (),
424- std::move (*state.MutableStats ()),
425- TDuration::MilliSeconds (AggregationSettings.GetCollectLongTasksStatsTimeoutMs ())
426- );
427- }
422+ Stats->AddComputeActorStats (
423+ computeActor.NodeId (),
424+ std::move (*state.MutableStats ()),
425+ TDuration::MilliSeconds (AggregationSettings.GetCollectLongTasksStatsTimeoutMs ())
426+ );
428427
429428 LastTaskId = taskId;
430429 LastComputeActorId = computeActor.ToString ();
@@ -512,9 +511,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
512511 auto now = TAppData::TimeProvider->Now ();
513512 StartResolveTime = now;
514513
515- if (Stats) {
516- Stats-> StartTs = now;
517- }
514+ YQL_ENSURE (Stats);
515+
516+ Stats-> StartTs = now;
518517 }
519518
520519 TMaybe<size_t > FindReadRangesSource (const NKqpProto::TKqpPhyStage& stage) {
@@ -1167,8 +1166,9 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
11671166 : Nothing ();
11681167
11691168 YQL_ENSURE (!shardsResolved || nodeId);
1169+ YQL_ENSURE (Stats);
11701170
1171- if (shardId && Stats ) {
1171+ if (shardId) {
11721172 Stats->AffectedShards .insert (*shardId);
11731173 }
11741174
@@ -1236,11 +1236,13 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
12361236
12371237 if (partitions.size () > 0 && source.GetSequentialInFlightShards () > 0 && partitions.size () > source.GetSequentialInFlightShards ()) {
12381238 auto [startShard, shardInfo] = MakeVirtualTablePartition (source, stageInfo, HolderFactory (), TypeEnv ());
1239- if (Stats) {
1240- for (auto & [shardId, _] : partitions) {
1241- Stats->AffectedShards .insert (shardId);
1242- }
1239+
1240+ YQL_ENSURE (Stats);
1241+
1242+ for (auto & [shardId, _] : partitions) {
1243+ Stats->AffectedShards .insert (shardId);
12431244 }
1245+
12441246 if (shardInfo.KeyReadRanges ) {
12451247 addPartiton (startShard, {}, shardInfo, source.GetSequentialInFlightShards ());
12461248 fillRangesForTasks ();
@@ -1507,6 +1509,8 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
15071509 THashMap<ui64, ui64> assignedShardsCount;
15081510 auto & stage = stageInfo.Meta .GetStage (stageInfo.Id );
15091511
1512+ YQL_ENSURE (Stats);
1513+
15101514 const auto & tableInfo = stageInfo.Meta .TableConstInfo ;
15111515 const auto & keyTypes = tableInfo->KeyColumnTypes ;
15121516 ui32 metaId = 0 ;
@@ -1535,7 +1539,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
15351539 nodeShards[nodeId].emplace_back (TShardInfoWithId (i.first , std::move (i.second )));
15361540 }
15371541
1538- if (Stats && CollectProfileStats (Request.StatsMode )) {
1542+ if (CollectProfileStats (Request.StatsMode )) {
15391543 for (auto && i : nodeShards) {
15401544 Stats->AddNodeShardsCount (stageInfo.Id .StageId , i.first , i.second .size ());
15411545 }
@@ -1720,7 +1724,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17201724 ExecuterSpan.EndError (TStringBuilder () << NYql::NDqProto::StatusIds_StatusCode_Name (status));
17211725 }
17221726
1723- FillResponseStats (Ydb::StatusIds::TIMEOUT);
1727+ ResponseEv-> Record . MutableResponse ()-> SetStatus (Ydb::StatusIds::TIMEOUT);
17241728
17251729 // TEvAbortExecution can come from either ComputeActor or SessionActor (== Target).
17261730 if (abortSender != Target) {
@@ -1730,34 +1734,31 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17301734
17311735 LOG_E (" Sending timeout response to: " << Target);
17321736
1733- Request.Transactions .crop (0 );
17341737 this ->Shutdown ();
17351738 }
17361739
1737- void FillResponseStats (Ydb::StatusIds::StatusCode status ) {
1740+ void FillResponseStats () {
17381741 auto & response = *ResponseEv->Record .MutableResponse ();
17391742
1740- response. SetStatus (status );
1743+ YQL_ENSURE (Stats );
17411744
1742- if (Stats) {
1743- ReportEventElapsedTime ();
1745+ ReportEventElapsedTime ();
17441746
1745- Stats->FinishTs = TInstant::Now ();
1746- Stats->Finish ();
1747+ Stats->FinishTs = TInstant::Now ();
1748+ Stats->Finish ();
17471749
1748- if (Stats->CollectStatsByLongTasks || CollectFullStats (Request.StatsMode )) {
1749- for (ui32 txId = 0 ; txId < Request.Transactions .size (); ++txId) {
1750- const auto & tx = Request.Transactions [txId].Body ;
1751- auto planWithStats = AddExecStatsToTxPlan (tx->GetPlan (), response.GetResult ().GetStats ());
1752- response.MutableResult ()->MutableStats ()->AddTxPlansWithStats (planWithStats);
1753- }
1750+ if (Stats->CollectStatsByLongTasks || CollectFullStats (Request.StatsMode )) {
1751+ for (ui32 txId = 0 ; txId < Request.Transactions .size (); ++txId) {
1752+ const auto & tx = Request.Transactions [txId].Body ;
1753+ auto planWithStats = AddExecStatsToTxPlan (tx->GetPlan (), response.GetResult ().GetStats ());
1754+ (*response.MutableResult ()->MutableStats ()->MutableTxPlansWithStats ())[txId] = planWithStats;
17541755 }
1756+ }
17551757
1756- if (Stats->CollectStatsByLongTasks ) {
1757- const auto & txPlansWithStats = response.GetResult ().GetStats ().GetTxPlansWithStats ();
1758- if (!txPlansWithStats.empty ()) {
1759- LOG_N (" Full stats: " << txPlansWithStats);
1760- }
1758+ if (Stats->CollectStatsByLongTasks ) {
1759+ const auto & txPlansWithStats = response.GetResult ().GetStats ().GetTxPlansWithStats ();
1760+ if (!txPlansWithStats.empty ()) {
1761+ LOG_N (" Full stats: " << response.GetResult ().GetStats ());
17611762 }
17621763 }
17631764 }
@@ -1775,8 +1776,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17751776 AlreadyReplied = true ;
17761777 auto & response = *ResponseEv->Record .MutableResponse ();
17771778
1778- FillResponseStats (status);
1779-
1779+ response.SetStatus (status);
17801780 response.MutableIssues ()->Swap (issues);
17811781
17821782 LOG_T (" ReplyErrorAndDie. Response: " << response.DebugString ()
@@ -1795,7 +1795,6 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
17951795 ExecuterSpan.EndError (response.DebugString ());
17961796 ExecuterStateSpan.EndError (response.DebugString ());
17971797
1798- Request.Transactions .crop (0 );
17991798 this ->Shutdown ();
18001799 }
18011800
@@ -1872,9 +1871,8 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
18721871
18731872 void PassAway () override {
18741873 YQL_ENSURE (AlreadyReplied && ResponseEv);
1875-
1876- // Actualize stats with the last stats from terminated CAs, but keep the status.
1877- FillResponseStats (ResponseEv->Record .GetResponse ().GetStatus ());
1874+ FillResponseStats ();
1875+ Request.Transactions .crop (0 );
18781876 this ->Send (Target, ResponseEv.release ());
18791877
18801878 for (auto channelPair: ResultChannelProxies) {
0 commit comments