@@ -12,12 +12,8 @@ ui64 NonZeroMin(ui64 a, ui64 b) {
1212 return (b == 0 ) ? a : ((a == 0 || a > b) ? b : a);
1313}
1414
15- void TTimeSeriesStats::ExportAggStats (NYql::NDqProto::TDqStatsAggr& stats) {
16- NKikimr::NKqp::ExportAggStats (Values, stats);
17- }
18-
1915void TTimeSeriesStats::ExportAggStats (ui64 baseTimeMs, NYql::NDqProto::TDqStatsAggr& stats) {
20- ExportAggStats (stats);
16+ NKikimr::NKqp:: ExportAggStats (Values, stats);
2117 ExportHistory (baseTimeMs, stats);
2218}
2319
@@ -32,20 +28,16 @@ void TTimeSeriesStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqStatsAg
3228 }
3329}
3430
35- void TTimeSeriesStats::Resize (ui32 count ) {
36- Values.resize (count );
31+ void TTimeSeriesStats::Resize (ui32 taskCount ) {
32+ Values.resize (taskCount );
3733}
3834
39- void TTimeSeriesStats::SetNonZero (ui32 index , ui64 value) {
35+ void TTimeSeriesStats::SetNonZero (ui32 taskIndex , ui64 value) {
4036 if (value) {
4137 Sum += value;
42- Sum -= Values[index ];
43- Values[index ] = value;
38+ Sum -= Values[taskIndex ];
39+ Values[taskIndex ] = value;
4440 }
45- AppendHistory ();
46- }
47-
48- void TTimeSeriesStats::AppendHistory () {
4941 if (HistorySampleCount) {
5042 auto nowMs = Now ().MilliSeconds ();
5143
@@ -105,62 +97,6 @@ void TTimeSeriesStats::Pack() {
10597 }
10698}
10799
108- void TPartitionedStats::ResizeByTasks (ui32 taskCount) {
109- for (auto & p : Parts) {
110- p.resize (taskCount);
111- }
112- }
113-
114- void TPartitionedStats::ResizeByParts (ui32 partCount, ui32 taskCount) {
115- auto oldPartCount = Parts.size ();
116- Parts.resize (partCount);
117- for (auto i = oldPartCount; i < partCount; i++) {
118- Parts[i].resize (taskCount);
119- }
120- Resize (partCount);
121- }
122-
123- void TPartitionedStats::SetNonZero (ui32 taskIndex, ui32 partIndex, ui64 value, bool recordTimeSeries) {
124- auto & part = Parts[partIndex];
125- auto delta = value - part[taskIndex];
126- part[taskIndex] = value;
127- Values[partIndex] += delta;
128- Sum += delta;
129- if (recordTimeSeries) {
130- AppendHistory ();
131- }
132- }
133-
134- void TTimeMultiSeriesStats::SetNonZero (TPartitionedStats& stats, ui32 taskIndex, const TString& key, ui64 value, bool recordTimeSeries) {
135- auto [it, inserted] = Indices.try_emplace (key);
136- if (inserted) {
137- it->second = Indices.size () - 1 ;
138- if (PartCount < Indices.size ()) {
139- PartCount += 4 ;
140- }
141- }
142- if (stats.Parts .size () < PartCount) {
143- stats.ResizeByParts (PartCount, TaskCount);
144- }
145- stats.SetNonZero (taskIndex, it->second , value, recordTimeSeries);
146- }
147-
148- void TExternalStats::Resize (ui32 taskCount) {
149- ExternalRows.ResizeByTasks (taskCount);
150- ExternalBytes.ResizeByTasks (taskCount);
151- TaskCount = taskCount;
152- }
153-
154- void TExternalStats::SetHistorySampleCount (ui32 historySampleCount) {
155- ExternalBytes.HistorySampleCount = historySampleCount;
156- }
157-
158- void TExternalStats::ExportHistory (ui64 baseTimeMs, NDqProto::TDqExternalAggrStats& stats) {
159- if (stats.HasExternalBytes ()) {
160- ExternalBytes.ExportHistory (baseTimeMs, *stats.MutableExternalBytes ());
161- }
162- }
163-
164100void TAsyncStats::Resize (ui32 taskCount) {
165101 Bytes.Resize (taskCount);
166102 DecompressedBytes.resize (taskCount);
@@ -191,25 +127,20 @@ void TAsyncStats::ExportHistory(ui64 baseTimeMs, NYql::NDqProto::TDqAsyncStatsAg
191127}
192128
193129void TAsyncBufferStats::Resize (ui32 taskCount) {
194- External.Resize (taskCount);
195130 Ingress.Resize (taskCount);
196131 Push.Resize (taskCount);
197132 Pop.Resize (taskCount);
198133 Egress.Resize (taskCount);
199134}
200135
201136void TAsyncBufferStats::SetHistorySampleCount (ui32 historySampleCount) {
202- External.SetHistorySampleCount (historySampleCount);
203137 Ingress.SetHistorySampleCount (historySampleCount);
204138 Push.SetHistorySampleCount (historySampleCount);
205139 Pop.SetHistorySampleCount (historySampleCount);
206140 Egress.SetHistorySampleCount (historySampleCount);
207141}
208142
209143void TAsyncBufferStats::ExportHistory (ui64 baseTimeMs, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) {
210- if (stats.HasExternal ()) {
211- External.ExportHistory (baseTimeMs, *stats.MutableExternal ());
212- }
213144 if (stats.HasIngress ()) {
214145 Ingress.ExportHistory (baseTimeMs, *stats.MutableIngress ());
215146 }
@@ -472,17 +403,6 @@ ui64 TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
472403 baseTimeMs = NonZeroMin (baseTimeMs, UpdateAsyncStats (index, asyncBufferStats.Ingress , sourceStat.GetIngress ()));
473404 baseTimeMs = NonZeroMin (baseTimeMs, UpdateAsyncStats (index, asyncBufferStats.Push , sourceStat.GetPush ()));
474405 baseTimeMs = NonZeroMin (baseTimeMs, UpdateAsyncStats (index, asyncBufferStats.Pop , sourceStat.GetPop ()));
475- for (auto & partitionStat : sourceStat.GetExternalPartitions ()) {
476- auto key = partitionStat.GetPartitionId ();
477- asyncBufferStats.External .SetNonZero (asyncBufferStats.External .ExternalRows ,
478- index, key, partitionStat.GetExternalRows (), false );
479- asyncBufferStats.External .SetNonZero (asyncBufferStats.External .ExternalBytes ,
480- index, key, partitionStat.GetExternalBytes (), true );
481- asyncBufferStats.External .SetNonZero (asyncBufferStats.External .FirstMessageMs ,
482- index, key, partitionStat.GetFirstMessageMs (), false );
483- asyncBufferStats.External .SetNonZero (asyncBufferStats.External .LastMessageMs ,
484- index, key, partitionStat.GetLastMessageMs (), false );
485- }
486406 }
487407 }
488408
@@ -1154,8 +1074,6 @@ void TQueryExecutionStats::UpdateTaskStats(ui64 taskId, const NYql::NDqProto::TD
11541074 BaseTimeMs = NonZeroMin (BaseTimeMs, it->second .UpdateStats (taskStats, state, stats.GetMaxMemoryUsage (), stats.GetDurationUs ()));
11551075}
11561076
1157- // SIMD-friendly aggregations are below. Compiler is able to vectorize sum/count, but needs help with min/max
1158-
11591077void ExportAggStats (std::vector<ui64>& data, NYql::NDqProto::TDqStatsMinMax& stats) {
11601078
11611079 Y_DEBUG_ABORT_UNLESS ((data.size () & 3 ) == 0 );
@@ -1297,12 +1215,6 @@ void TQueryExecutionStats::ExportAggAsyncStats(TAsyncStats& data, NYql::NDqProto
12971215}
12981216
12991217void TQueryExecutionStats::ExportAggAsyncBufferStats (TAsyncBufferStats& data, NYql::NDqProto::TDqAsyncBufferStatsAggr& stats) {
1300- auto & external = *stats.MutableExternal ();
1301- data.External .ExternalRows .ExportAggStats (*external.MutableExternalRows ());
1302- data.External .ExternalBytes .ExportAggStats (BaseTimeMs, *external.MutableExternalBytes ());
1303- ExportOffsetAggStats (data.External .FirstMessageMs .Values , *external.MutableFirstMessageMs (), BaseTimeMs);
1304- ExportOffsetAggStats (data.External .LastMessageMs .Values , *external.MutableLastMessageMs (), BaseTimeMs);
1305- external.SetPartitionCount (data.External .Indices .size ());
13061218 ExportAggAsyncStats (data.Ingress , *stats.MutableIngress ());
13071219 ExportAggAsyncStats (data.Push , *stats.MutablePush ());
13081220 ExportAggAsyncStats (data.Pop , *stats.MutablePop ());
0 commit comments