Skip to content

Commit bb4992d

Browse files
authored
Decompressed bytes in stats (#5853)
1 parent 648068e commit bb4992d

File tree

16 files changed

+64
-8
lines changed

16 files changed

+64
-8
lines changed

ydb/core/fq/libs/compute/common/ut/utils_ut.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ Y_UNIT_TEST_SUITE(StatsFormat) {
6363

6464
Y_UNIT_TEST(AggregateStat) {
6565
auto res = NFq::AggregateStats(NResource::Find("plan.json"));
66-
UNIT_ASSERT_VALUES_EQUAL(res.size(), 13);
66+
UNIT_ASSERT_VALUES_EQUAL(res.size(), 14);
6767
UNIT_ASSERT_VALUES_EQUAL(res["IngressBytes"], 6333256);
6868
UNIT_ASSERT_VALUES_EQUAL(res["EgressBytes"], 0);
6969
UNIT_ASSERT_VALUES_EQUAL(res["InputBytes"], 1044);
@@ -77,5 +77,6 @@ Y_UNIT_TEST_SUITE(StatsFormat) {
7777
UNIT_ASSERT_VALUES_EQUAL(res["Operator.Limit"], 2);
7878
UNIT_ASSERT_VALUES_EQUAL(res["Format.parquet"], 1);
7979
UNIT_ASSERT_VALUES_EQUAL(res["Operator.s3"], 1);
80+
UNIT_ASSERT_VALUES_EQUAL(res["IngressDecompressedBytes"], 0);
8081
}
8182
}

ydb/core/fq/libs/compute/common/utils.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ struct TTotalStatistics {
7070
TAggregate ResultBytes;
7171
TAggregate ResultRows;
7272
TAggregate IngressBytes;
73+
TAggregate IngressDecompressedBytes;
7374
TAggregate IngressRows;
7475
TAggregate EgressBytes;
7576
TAggregate EgressRows;
@@ -288,6 +289,8 @@ void WriteNamedNode(NYson::TYsonWriter& writer, NJson::TJsonValue& node, const T
288289
totals.ResultRows.Add(*sum);
289290
} else if (name == "IngressBytes") {
290291
totals.IngressBytes.Add(*sum);
292+
} else if (name == "IngressDecompressedBytes") {
293+
totals.IngressDecompressedBytes.Add(*sum);
291294
} else if (name == "IngressRows") {
292295
totals.IngressRows.Add(*sum);
293296
} else if (name == "EgressBytes") {
@@ -457,6 +460,7 @@ TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage) {
457460
totals.ResultBytes.Write(writer, "ResultBytes");
458461
totals.ResultRows.Write(writer, "ResultRows");
459462
totals.IngressBytes.Write(writer, "IngressBytes");
463+
totals.IngressDecompressedBytes.Write(writer, "IngressDecompressedBytes");
460464
totals.IngressRows.Write(writer, "IngressRows");
461465
totals.EgressBytes.Write(writer, "EgressBytes");
462466
totals.EgressRows.Write(writer, "EgressRows");
@@ -504,6 +508,11 @@ struct TStatsAggregator {
504508
Aggregates[source + ".Bytes"] += ingress->GetIntegerSafe();
505509
success = true;
506510
}
511+
if (auto ingress = node.GetValueByPath("Ingress.DecompressedBytes.Sum")) {
512+
auto source = name.substr(prefix.size());
513+
Aggregates[source + ".DecompressedBytes"] += ingress->GetIntegerSafe();
514+
success = true;
515+
}
507516
if (auto ingress = node.GetValueByPath("Ingress.Rows.Sum")) {
508517
auto source = name.substr(prefix.size());
509518
Aggregates[source + ".Rows"] += ingress->GetIntegerSafe();
@@ -519,6 +528,7 @@ struct TStatsAggregator {
519528

520529
THashMap<TString, i64> Aggregates{std::pair<TString, i64>
521530
{"IngressBytes", 0},
531+
{"IngressDecompressedBytes", 0},
522532
{"EgressBytes", 0},
523533
{"IngressRows", 0},
524534
{"EgressRows", 0},
@@ -959,6 +969,7 @@ TString GetPrettyStatistics(const TString& statistics) {
959969
RemapNode(writer, p.second, "TaskRunner.Stage=Total.Tasks", "Tasks");
960970
RemapNode(writer, p.second, "TaskRunner.Stage=Total.CpuTimeUs", "CpuTimeUs");
961971
RemapNode(writer, p.second, "TaskRunner.Stage=Total.IngressBytes", "IngressBytes");
972+
RemapNode(writer, p.second, "TaskRunner.Stage=Total.DecompressedBytes", "DecompressedBytes");
962973
RemapNode(writer, p.second, "TaskRunner.Stage=Total.IngressRows", "IngressRows");
963974
RemapNode(writer, p.second, "TaskRunner.Stage=Total.InputBytes", "InputBytes");
964975
RemapNode(writer, p.second, "TaskRunner.Stage=Total.InputRows", "InputRows");
@@ -979,6 +990,7 @@ TString GetPrettyStatistics(const TString& statistics) {
979990
RemapNode(writer, p.second, "Tasks", "Tasks");
980991
RemapNode(writer, p.second, "CpuTimeUs", "CpuTimeUs");
981992
RemapNode(writer, p.second, "IngressBytes", "IngressBytes");
993+
RemapNode(writer, p.second, "IngressDecompressedBytes", "IngressDecompressedBytes");
982994
RemapNode(writer, p.second, "IngressRows", "IngressRows");
983995
RemapNode(writer, p.second, "InputBytes", "InputBytes");
984996
RemapNode(writer, p.second, "InputRows", "InputRows");

ydb/core/kqp/executer_actor/kqp_executer_stats.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ using namespace NYql::NDq;
88

99
void TAsyncStats::Resize(ui32 taskCount) {
1010
Bytes.resize(taskCount);
11+
DecompressedBytes.resize(taskCount);
1112
Rows.resize(taskCount);
1213
Chunks.resize(taskCount);
1314
Splits.resize(taskCount);
@@ -49,6 +50,7 @@ void TStageExecutionStats::Resize(ui32 taskCount) {
4950
ResultBytes.resize(taskCount);
5051
IngressRows.resize(taskCount);
5152
IngressBytes.resize(taskCount);
53+
IngressDecompressedBytes.resize(taskCount);
5254
EgressRows.resize(taskCount);
5355
EgressBytes.resize(taskCount);
5456

@@ -74,6 +76,7 @@ void SetNonZero(ui64& target, ui64 source) {
7476

7577
void TStageExecutionStats::UpdateAsyncStats(i32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats) {
7678
SetNonZero(aggrAsyncStats.Bytes[index], asyncStats.GetBytes());
79+
SetNonZero(aggrAsyncStats.DecompressedBytes[index], asyncStats.GetDecompressedBytes());
7780
SetNonZero(aggrAsyncStats.Rows[index], asyncStats.GetRows());
7881
SetNonZero(aggrAsyncStats.Chunks[index], asyncStats.GetChunks());
7982
SetNonZero(aggrAsyncStats.Splits[index], asyncStats.GetSplits());
@@ -117,6 +120,7 @@ void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
117120
SetNonZero(ResultBytes[index], taskStats.GetResultBytes());
118121
SetNonZero(IngressRows[index], taskStats.GetIngressRows());
119122
SetNonZero(IngressBytes[index], taskStats.GetIngressBytes());
123+
SetNonZero(IngressDecompressedBytes[index], taskStats.GetIngressDecompressedBytes());
120124
SetNonZero(EgressRows[index], taskStats.GetEgressRows());
121125
SetNonZero(EgressBytes[index], taskStats.GetEgressBytes());
122126

@@ -208,6 +212,7 @@ void UpdateAggr(NDqProto::TDqStatsAggr* aggr, ui64 value) noexcept {
208212

209213
struct TAsyncGroupStat {
210214
ui64 Bytes = 0;
215+
ui64 DecompressedBytes = 0;
211216
ui64 Rows = 0;
212217
ui64 Chunks = 0;
213218
ui64 Splits = 0;
@@ -222,6 +227,7 @@ struct TAsyncGroupStat {
222227

223228
void UpdateAsyncAggr(NDqProto::TDqAsyncStatsAggr& asyncAggr, const NDqProto::TDqAsyncBufferStats& asyncStat) noexcept {
224229
UpdateAggr(asyncAggr.MutableBytes(), asyncStat.GetBytes());
230+
UpdateAggr(asyncAggr.MutableDecompressedBytes(), asyncStat.GetDecompressedBytes());
225231
UpdateAggr(asyncAggr.MutableRows(), asyncStat.GetRows());
226232
UpdateAggr(asyncAggr.MutableChunks(), asyncStat.GetChunks());
227233
UpdateAggr(asyncAggr.MutableSplits(), asyncStat.GetSplits());
@@ -355,6 +361,7 @@ void TQueryExecutionStats::AddComputeActorFullStatsByTask(
355361
UpdateAggr(stageStats->MutableResultBytes(), task.GetResultBytes());
356362
UpdateAggr(stageStats->MutableIngressRows(), task.GetIngressRows());
357363
UpdateAggr(stageStats->MutableIngressBytes(), task.GetIngressBytes());
364+
UpdateAggr(stageStats->MutableIngressDecompressedBytes(), task.GetIngressDecompressedBytes());
358365
UpdateAggr(stageStats->MutableEgressRows(), task.GetEgressRows());
359366
UpdateAggr(stageStats->MutableEgressBytes(), task.GetEgressBytes());
360367

@@ -729,6 +736,7 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st
729736
ExportAggStats(p.second.ResultBytes, *stageStats.MutableResultBytes());
730737
ExportAggStats(p.second.IngressRows, *stageStats.MutableIngressRows());
731738
ExportAggStats(p.second.IngressBytes, *stageStats.MutableIngressBytes());
739+
ExportAggStats(p.second.IngressDecompressedBytes, *stageStats.MutableIngressDecompressedBytes());
732740
ExportAggStats(p.second.EgressRows, *stageStats.MutableEgressRows());
733741
ExportAggStats(p.second.EgressBytes, *stageStats.MutableEgressBytes());
734742

ydb/core/kqp/executer_actor/kqp_executer_stats.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ bool CollectProfileStats(Ydb::Table::QueryStatsCollection::Mode statsMode);
1717
struct TAsyncStats {
1818
// Data
1919
std::vector<ui64> Bytes;
20+
std::vector<ui64> DecompressedBytes;
2021
std::vector<ui64> Rows;
2122
std::vector<ui64> Chunks;
2223
std::vector<ui64> Splits;
@@ -83,6 +84,7 @@ struct TStageExecutionStats {
8384
std::vector<ui64> ResultBytes;
8485
std::vector<ui64> IngressRows;
8586
std::vector<ui64> IngressBytes;
87+
std::vector<ui64> IngressDecompressedBytes;
8688
std::vector<ui64> EgressRows;
8789
std::vector<ui64> EgressBytes;
8890

ydb/core/kqp/opt/kqp_query_plan.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2369,6 +2369,9 @@ void FillAsyncAggrStat(NJson::TJsonValue& node, const NYql::NDqProto::TDqAsyncSt
23692369
if (asyncAggr.HasBytes()) {
23702370
FillAggrStat(node, asyncAggr.GetBytes(), "Bytes");
23712371
}
2372+
if (asyncAggr.HasDecompressedBytes()) {
2373+
FillAggrStat(node, asyncAggr.GetDecompressedBytes(), "DecompressedBytes");
2374+
}
23722375
if (asyncAggr.HasRows()) {
23732376
FillAggrStat(node, asyncAggr.GetRows(), "Rows");
23742377
}
@@ -2472,6 +2475,7 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD
24722475
SetNonZero(node, "ResultBytes", taskStats.GetResultBytes());
24732476
SetNonZero(node, "IngressRows", taskStats.GetIngressRows());
24742477
SetNonZero(node, "IngressBytes", taskStats.GetIngressBytes());
2478+
SetNonZero(node, "IngressDecompressedBytes", taskStats.GetIngressDecompressedBytes());
24752479
SetNonZero(node, "EgressRows", taskStats.GetEgressRows());
24762480
SetNonZero(node, "EgressBytes", taskStats.GetEgressBytes());
24772481

@@ -2568,6 +2572,9 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD
25682572
if ((*stat)->HasIngressBytes()) {
25692573
FillAggrStat(stats, (*stat)->GetIngressBytes(), "IngressBytes");
25702574
}
2575+
if ((*stat)->HasIngressDecompressedBytes()) {
2576+
FillAggrStat(stats, (*stat)->GetIngressDecompressedBytes(), "IngressDecompressedBytes");
2577+
}
25712578
if ((*stat)->HasEgressRows()) {
25722579
FillAggrStat(stats, (*stat)->GetEgressRows(), "EgressRows");
25732580
}

ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
257257
html << "DqInputChannel.PushStats.CurrentPauseTs: " << (pushStats.CurrentPauseTs ? pushStats.CurrentPauseTs->ToString() : TString{}) << "<br />";
258258
html << "DqInputChannel.PushStats.MergeWaitPeriod: " << pushStats.MergeWaitPeriod << "<br />";
259259
html << "DqInputChannel.PushStats.Bytes: " << pushStats.Bytes << "<br />";
260+
html << "DqInputChannel.PushStats.DecompressedBytes: " << pushStats.DecompressedBytes << "<br />";
260261
html << "DqInputChannel.PushStats.Rows: " << pushStats.Rows << "<br />";
261262
html << "DqInputChannel.PushStats.Chunks: " << pushStats.Chunks << "<br />";
262263
html << "DqInputChannel.PushStats.Splits: " << pushStats.Splits << "<br />";
@@ -268,6 +269,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
268269

269270
const auto& popStats = info.Channel->GetPopStats();
270271
html << "DqInputChannel.PopStats.Bytes: " << popStats.Bytes << "<br />";
272+
html << "DqInputChannel.PopStats.DecompressedBytes: " << popStats.DecompressedBytes << "<br />";
271273
html << "DqInputChannel.PopStats.Rows: " << popStats.Rows << "<br />";
272274
html << "DqInputChannel.PopStats.Chunks: " << popStats.Chunks << "<br />";
273275
html << "DqInputChannel.PopStats.Splits: " << popStats.Splits << "<br />";

ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1641,6 +1641,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
16411641

16421642
ui64 ingressBytes = 0;
16431643
ui64 ingressRows = 0;
1644+
ui64 ingressDecompressedBytes = 0;
16441645
auto startTimeMs = protoTask->GetStartTimeMs();
16451646

16461647
if (RuntimeSettings.CollectFull()) {
@@ -1655,6 +1656,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
16551656
ingressBytes += ingressStats.Bytes;
16561657
// ingress rows are usually not reported, so we count rows in task runner input
16571658
ingressRows += ingressStats.Rows ? ingressStats.Rows : taskStats->Sources.at(inputIndex)->GetPopStats().Rows;
1659+
ingressDecompressedBytes += ingressStats.DecompressedBytes;
16581660
if (ingressStats.FirstMessageTs) {
16591661
auto firstMessageMs = ingressStats.FirstMessageTs.MilliSeconds();
16601662
if (!startTimeMs || startTimeMs > firstMessageMs) {
@@ -1670,6 +1672,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
16701672
ingressBytes += ingressStats.Bytes;
16711673
// ingress rows are usually not reported, so we count rows in task runner input
16721674
ingressRows += ingressStats.Rows ? ingressStats.Rows : taskStats->Sources.at(inputIndex)->GetPopStats().Rows;
1675+
ingressDecompressedBytes += ingressStats.DecompressedBytes;
16731676
}
16741677
}
16751678

@@ -1679,6 +1682,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
16791682
protoTask->SetStartTimeMs(startTimeMs);
16801683
protoTask->SetIngressBytes(ingressBytes);
16811684
protoTask->SetIngressRows(ingressRows);
1685+
protoTask->SetIngressDecompressedBytes(ingressDecompressedBytes);
16821686

16831687
ui64 egressBytes = 0;
16841688
ui64 egressRows = 0;

ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ namespace NDq {
99
void FillAsyncStats(NDqProto::TDqAsyncBufferStats& proto, TDqAsyncStats stats) {
1010
if (stats.CollectBasic()) {
1111
proto.SetBytes(stats.Bytes);
12+
proto.SetDecompressedBytes(stats.DecompressedBytes);
1213
proto.SetRows(stats.Rows);
1314
proto.SetChunks(stats.Chunks);
1415
proto.SetSplits(stats.Splits);

ydb/library/yql/dq/actors/protos/dq_stats.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ enum EDqStatsMode {
1919
message TDqAsyncBufferStats {
2020
// Data
2121
uint64 Bytes = 1; // physical bytes
22+
uint64 DecompressedBytes = 11; // decompressed bytes
2223
uint64 Rows = 2; // logical rows (if applicable)
2324
uint64 Chunks = 3; // chunk is group of sent/received bytes in single batch
2425
uint64 Splits = 4; // logical partitioning (if applicable)
@@ -182,6 +183,7 @@ message TDqTaskStats {
182183
uint64 ResultBytes = 21;
183184

184185
uint64 IngressBytes = 16;
186+
uint64 IngressDecompressedBytes = 22;
185187
uint64 IngressRows = 17;
186188
uint64 EgressBytes = 18;
187189
uint64 EgressRows = 19;
@@ -258,6 +260,7 @@ message TDqStatsMinMax {
258260
message TDqAsyncStatsAggr {
259261
// Data
260262
TDqStatsAggr Bytes = 1;
263+
TDqStatsAggr DecompressedBytes = 12;
261264
TDqStatsAggr Rows = 2;
262265
TDqStatsAggr Chunks = 3;
263266
TDqStatsAggr Splits = 4;
@@ -312,6 +315,7 @@ message TDqStageStats {
312315
TDqStatsAggr ResultBytes = 27;
313316

314317
TDqStatsAggr IngressBytes = 28;
318+
TDqStatsAggr IngressDecompressedBytes = 37;
315319
TDqStatsAggr IngressRows = 29;
316320
TDqStatsAggr EgressBytes = 30;
317321
TDqStatsAggr EgressRows = 31;

ydb/library/yql/dq/runtime/dq_async_stats.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ struct TDqAsyncStats {
5555

5656
// basic stats
5757
ui64 Bytes = 0;
58+
ui64 DecompressedBytes = 0;
5859
ui64 Rows = 0;
5960
ui64 Chunks = 0;
6061
ui64 Splits = 0;
@@ -69,6 +70,7 @@ struct TDqAsyncStats {
6970

7071
void MergeData(const TDqAsyncStats& other) {
7172
Bytes += other.Bytes;
73+
DecompressedBytes += other.DecompressedBytes;
7274
Rows += other.Rows;
7375
Chunks += other.Chunks;
7476
Splits += other.Splits;

0 commit comments

Comments
 (0)