Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ydb/core/fq/libs/compute/common/ut/utils_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Y_UNIT_TEST_SUITE(StatsFormat) {

Y_UNIT_TEST(AggregateStat) {
auto res = NFq::AggregateStats(NResource::Find("plan.json"));
UNIT_ASSERT_VALUES_EQUAL(res.size(), 13);
UNIT_ASSERT_VALUES_EQUAL(res.size(), 14);
UNIT_ASSERT_VALUES_EQUAL(res["IngressBytes"], 6333256);
UNIT_ASSERT_VALUES_EQUAL(res["EgressBytes"], 0);
UNIT_ASSERT_VALUES_EQUAL(res["InputBytes"], 1044);
Expand All @@ -77,5 +77,6 @@ Y_UNIT_TEST_SUITE(StatsFormat) {
UNIT_ASSERT_VALUES_EQUAL(res["Operator.Limit"], 2);
UNIT_ASSERT_VALUES_EQUAL(res["Format.parquet"], 1);
UNIT_ASSERT_VALUES_EQUAL(res["Operator.s3"], 1);
UNIT_ASSERT_VALUES_EQUAL(res["IngressDecompressedBytes"], 0);
}
}
12 changes: 12 additions & 0 deletions ydb/core/fq/libs/compute/common/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ struct TTotalStatistics {
TAggregate ResultBytes;
TAggregate ResultRows;
TAggregate IngressBytes;
TAggregate IngressDecompressedBytes;
TAggregate IngressRows;
TAggregate EgressBytes;
TAggregate EgressRows;
Expand Down Expand Up @@ -288,6 +289,8 @@ void WriteNamedNode(NYson::TYsonWriter& writer, NJson::TJsonValue& node, const T
totals.ResultRows.Add(*sum);
} else if (name == "IngressBytes") {
totals.IngressBytes.Add(*sum);
} else if (name == "IngressDecompressedBytes") {
totals.IngressDecompressedBytes.Add(*sum);
} else if (name == "IngressRows") {
totals.IngressRows.Add(*sum);
} else if (name == "EgressBytes") {
Expand Down Expand Up @@ -457,6 +460,7 @@ TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage) {
totals.ResultBytes.Write(writer, "ResultBytes");
totals.ResultRows.Write(writer, "ResultRows");
totals.IngressBytes.Write(writer, "IngressBytes");
totals.IngressDecompressedBytes.Write(writer, "IngressDecompressedBytes");
totals.IngressRows.Write(writer, "IngressRows");
totals.EgressBytes.Write(writer, "EgressBytes");
totals.EgressRows.Write(writer, "EgressRows");
Expand Down Expand Up @@ -504,6 +508,11 @@ struct TStatsAggregator {
Aggregates[source + ".Bytes"] += ingress->GetIntegerSafe();
success = true;
}
if (auto ingress = node.GetValueByPath("Ingress.DecompressedBytes.Sum")) {
auto source = name.substr(prefix.size());
Aggregates[source + ".DecompressedBytes"] += ingress->GetIntegerSafe();
success = true;
}
if (auto ingress = node.GetValueByPath("Ingress.Rows.Sum")) {
auto source = name.substr(prefix.size());
Aggregates[source + ".Rows"] += ingress->GetIntegerSafe();
Expand All @@ -519,6 +528,7 @@ struct TStatsAggregator {

THashMap<TString, i64> Aggregates{std::pair<TString, i64>
{"IngressBytes", 0},
{"IngressDecompressedBytes", 0},
{"EgressBytes", 0},
{"IngressRows", 0},
{"EgressRows", 0},
Expand Down Expand Up @@ -959,6 +969,7 @@ TString GetPrettyStatistics(const TString& statistics) {
RemapNode(writer, p.second, "TaskRunner.Stage=Total.Tasks", "Tasks");
RemapNode(writer, p.second, "TaskRunner.Stage=Total.CpuTimeUs", "CpuTimeUs");
RemapNode(writer, p.second, "TaskRunner.Stage=Total.IngressBytes", "IngressBytes");
RemapNode(writer, p.second, "TaskRunner.Stage=Total.DecompressedBytes", "DecompressedBytes");
RemapNode(writer, p.second, "TaskRunner.Stage=Total.IngressRows", "IngressRows");
RemapNode(writer, p.second, "TaskRunner.Stage=Total.InputBytes", "InputBytes");
RemapNode(writer, p.second, "TaskRunner.Stage=Total.InputRows", "InputRows");
Expand All @@ -979,6 +990,7 @@ TString GetPrettyStatistics(const TString& statistics) {
RemapNode(writer, p.second, "Tasks", "Tasks");
RemapNode(writer, p.second, "CpuTimeUs", "CpuTimeUs");
RemapNode(writer, p.second, "IngressBytes", "IngressBytes");
RemapNode(writer, p.second, "IngressDecompressedBytes", "IngressDecompressedBytes");
RemapNode(writer, p.second, "IngressRows", "IngressRows");
RemapNode(writer, p.second, "InputBytes", "InputBytes");
RemapNode(writer, p.second, "InputRows", "InputRows");
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_executer_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ using namespace NYql::NDq;

void TAsyncStats::Resize(ui32 taskCount) {
Bytes.resize(taskCount);
DecompressedBytes.resize(taskCount);
Rows.resize(taskCount);
Chunks.resize(taskCount);
Splits.resize(taskCount);
Expand Down Expand Up @@ -49,6 +50,7 @@ void TStageExecutionStats::Resize(ui32 taskCount) {
ResultBytes.resize(taskCount);
IngressRows.resize(taskCount);
IngressBytes.resize(taskCount);
IngressDecompressedBytes.resize(taskCount);
EgressRows.resize(taskCount);
EgressBytes.resize(taskCount);

Expand All @@ -74,6 +76,7 @@ void SetNonZero(ui64& target, ui64 source) {

void TStageExecutionStats::UpdateAsyncStats(i32 index, TAsyncStats& aggrAsyncStats, const NYql::NDqProto::TDqAsyncBufferStats& asyncStats) {
SetNonZero(aggrAsyncStats.Bytes[index], asyncStats.GetBytes());
SetNonZero(aggrAsyncStats.DecompressedBytes[index], asyncStats.GetDecompressedBytes());
SetNonZero(aggrAsyncStats.Rows[index], asyncStats.GetRows());
SetNonZero(aggrAsyncStats.Chunks[index], asyncStats.GetChunks());
SetNonZero(aggrAsyncStats.Splits[index], asyncStats.GetSplits());
Expand Down Expand Up @@ -117,6 +120,7 @@ void TStageExecutionStats::UpdateStats(const NYql::NDqProto::TDqTaskStats& taskS
SetNonZero(ResultBytes[index], taskStats.GetResultBytes());
SetNonZero(IngressRows[index], taskStats.GetIngressRows());
SetNonZero(IngressBytes[index], taskStats.GetIngressBytes());
SetNonZero(IngressDecompressedBytes[index], taskStats.GetIngressDecompressedBytes());
SetNonZero(EgressRows[index], taskStats.GetEgressRows());
SetNonZero(EgressBytes[index], taskStats.GetEgressBytes());

Expand Down Expand Up @@ -208,6 +212,7 @@ void UpdateAggr(NDqProto::TDqStatsAggr* aggr, ui64 value) noexcept {

struct TAsyncGroupStat {
ui64 Bytes = 0;
ui64 DecompressedBytes = 0;
ui64 Rows = 0;
ui64 Chunks = 0;
ui64 Splits = 0;
Expand All @@ -222,6 +227,7 @@ struct TAsyncGroupStat {

void UpdateAsyncAggr(NDqProto::TDqAsyncStatsAggr& asyncAggr, const NDqProto::TDqAsyncBufferStats& asyncStat) noexcept {
UpdateAggr(asyncAggr.MutableBytes(), asyncStat.GetBytes());
UpdateAggr(asyncAggr.MutableDecompressedBytes(), asyncStat.GetDecompressedBytes());
UpdateAggr(asyncAggr.MutableRows(), asyncStat.GetRows());
UpdateAggr(asyncAggr.MutableChunks(), asyncStat.GetChunks());
UpdateAggr(asyncAggr.MutableSplits(), asyncStat.GetSplits());
Expand Down Expand Up @@ -355,6 +361,7 @@ void TQueryExecutionStats::AddComputeActorFullStatsByTask(
UpdateAggr(stageStats->MutableResultBytes(), task.GetResultBytes());
UpdateAggr(stageStats->MutableIngressRows(), task.GetIngressRows());
UpdateAggr(stageStats->MutableIngressBytes(), task.GetIngressBytes());
UpdateAggr(stageStats->MutableIngressDecompressedBytes(), task.GetIngressDecompressedBytes());
UpdateAggr(stageStats->MutableEgressRows(), task.GetEgressRows());
UpdateAggr(stageStats->MutableEgressBytes(), task.GetEgressBytes());

Expand Down Expand Up @@ -729,6 +736,7 @@ void TQueryExecutionStats::ExportExecStats(NYql::NDqProto::TDqExecutionStats& st
ExportAggStats(p.second.ResultBytes, *stageStats.MutableResultBytes());
ExportAggStats(p.second.IngressRows, *stageStats.MutableIngressRows());
ExportAggStats(p.second.IngressBytes, *stageStats.MutableIngressBytes());
ExportAggStats(p.second.IngressDecompressedBytes, *stageStats.MutableIngressDecompressedBytes());
ExportAggStats(p.second.EgressRows, *stageStats.MutableEgressRows());
ExportAggStats(p.second.EgressBytes, *stageStats.MutableEgressBytes());

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_executer_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ bool CollectProfileStats(Ydb::Table::QueryStatsCollection::Mode statsMode);
struct TAsyncStats {
// Data
std::vector<ui64> Bytes;
std::vector<ui64> DecompressedBytes;
std::vector<ui64> Rows;
std::vector<ui64> Chunks;
std::vector<ui64> Splits;
Expand Down Expand Up @@ -83,6 +84,7 @@ struct TStageExecutionStats {
std::vector<ui64> ResultBytes;
std::vector<ui64> IngressRows;
std::vector<ui64> IngressBytes;
std::vector<ui64> IngressDecompressedBytes;
std::vector<ui64> EgressRows;
std::vector<ui64> EgressBytes;

Expand Down
7 changes: 7 additions & 0 deletions ydb/core/kqp/opt/kqp_query_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2366,6 +2366,9 @@ void FillAsyncAggrStat(NJson::TJsonValue& node, const NYql::NDqProto::TDqAsyncSt
if (asyncAggr.HasBytes()) {
FillAggrStat(node, asyncAggr.GetBytes(), "Bytes");
}
if (asyncAggr.HasDecompressedBytes()) {
FillAggrStat(node, asyncAggr.GetDecompressedBytes(), "DecompressedBytes");
}
if (asyncAggr.HasRows()) {
FillAggrStat(node, asyncAggr.GetRows(), "Rows");
}
Expand Down Expand Up @@ -2469,6 +2472,7 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD
SetNonZero(node, "ResultBytes", taskStats.GetResultBytes());
SetNonZero(node, "IngressRows", taskStats.GetIngressRows());
SetNonZero(node, "IngressBytes", taskStats.GetIngressBytes());
SetNonZero(node, "IngressDecompressedBytes", taskStats.GetIngressDecompressedBytes());
SetNonZero(node, "EgressRows", taskStats.GetEgressRows());
SetNonZero(node, "EgressBytes", taskStats.GetEgressBytes());

Expand Down Expand Up @@ -2565,6 +2569,9 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD
if ((*stat)->HasIngressBytes()) {
FillAggrStat(stats, (*stat)->GetIngressBytes(), "IngressBytes");
}
if ((*stat)->HasIngressDecompressedBytes()) {
FillAggrStat(stats, (*stat)->GetIngressDecompressedBytes(), "IngressDecompressedBytes");
}
if ((*stat)->HasEgressRows()) {
FillAggrStat(stats, (*stat)->GetEgressRows(), "EgressRows");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
html << "DqInputChannel.PushStats.CurrentPauseTs: " << (pushStats.CurrentPauseTs ? pushStats.CurrentPauseTs->ToString() : TString{}) << "<br />";
html << "DqInputChannel.PushStats.MergeWaitPeriod: " << pushStats.MergeWaitPeriod << "<br />";
html << "DqInputChannel.PushStats.Bytes: " << pushStats.Bytes << "<br />";
html << "DqInputChannel.PushStats.DecompressedBytes: " << pushStats.DecompressedBytes << "<br />";
html << "DqInputChannel.PushStats.Rows: " << pushStats.Rows << "<br />";
html << "DqInputChannel.PushStats.Chunks: " << pushStats.Chunks << "<br />";
html << "DqInputChannel.PushStats.Splits: " << pushStats.Splits << "<br />";
Expand All @@ -268,6 +269,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC

const auto& popStats = info.Channel->GetPopStats();
html << "DqInputChannel.PopStats.Bytes: " << popStats.Bytes << "<br />";
html << "DqInputChannel.PopStats.DecompressedBytes: " << popStats.DecompressedBytes << "<br />";
html << "DqInputChannel.PopStats.Rows: " << popStats.Rows << "<br />";
html << "DqInputChannel.PopStats.Chunks: " << popStats.Chunks << "<br />";
html << "DqInputChannel.PopStats.Splits: " << popStats.Splits << "<br />";
Expand Down
4 changes: 4 additions & 0 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1641,6 +1641,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>

ui64 ingressBytes = 0;
ui64 ingressRows = 0;
ui64 ingressDecompressedBytes = 0;
auto startTimeMs = protoTask->GetStartTimeMs();

if (RuntimeSettings.CollectFull()) {
Expand All @@ -1655,6 +1656,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
ingressBytes += ingressStats.Bytes;
// ingress rows are usually not reported, so we count rows in task runner input
ingressRows += ingressStats.Rows ? ingressStats.Rows : taskStats->Sources.at(inputIndex)->GetPopStats().Rows;
ingressDecompressedBytes += ingressStats.DecompressedBytes;
if (ingressStats.FirstMessageTs) {
auto firstMessageMs = ingressStats.FirstMessageTs.MilliSeconds();
if (!startTimeMs || startTimeMs > firstMessageMs) {
Expand All @@ -1670,6 +1672,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
ingressBytes += ingressStats.Bytes;
// ingress rows are usually not reported, so we count rows in task runner input
ingressRows += ingressStats.Rows ? ingressStats.Rows : taskStats->Sources.at(inputIndex)->GetPopStats().Rows;
ingressDecompressedBytes += ingressStats.DecompressedBytes;
}
}

Expand All @@ -1679,6 +1682,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped<TDerived>
protoTask->SetStartTimeMs(startTimeMs);
protoTask->SetIngressBytes(ingressBytes);
protoTask->SetIngressRows(ingressRows);
protoTask->SetIngressDecompressedBytes(ingressDecompressedBytes);

ui64 egressBytes = 0;
ui64 egressRows = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace NDq {
void FillAsyncStats(NDqProto::TDqAsyncBufferStats& proto, TDqAsyncStats stats) {
if (stats.CollectBasic()) {
proto.SetBytes(stats.Bytes);
proto.SetDecompressedBytes(stats.DecompressedBytes);
proto.SetRows(stats.Rows);
proto.SetChunks(stats.Chunks);
proto.SetSplits(stats.Splits);
Expand Down
4 changes: 4 additions & 0 deletions ydb/library/yql/dq/actors/protos/dq_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ enum EDqStatsMode {
message TDqAsyncBufferStats {
// Data
uint64 Bytes = 1; // physical bytes
uint64 DecompressedBytes = 11; // decompressed bytes
uint64 Rows = 2; // logical rows (if applicable)
uint64 Chunks = 3; // chunk is group of sent/received bytes in single batch
uint64 Splits = 4; // logical partitioning (if applicable)
Expand Down Expand Up @@ -182,6 +183,7 @@ message TDqTaskStats {
uint64 ResultBytes = 21;

uint64 IngressBytes = 16;
uint64 IngressDecompressedBytes = 22;
uint64 IngressRows = 17;
uint64 EgressBytes = 18;
uint64 EgressRows = 19;
Expand Down Expand Up @@ -258,6 +260,7 @@ message TDqStatsMinMax {
message TDqAsyncStatsAggr {
// Data
TDqStatsAggr Bytes = 1;
TDqStatsAggr DecompressedBytes = 12;
TDqStatsAggr Rows = 2;
TDqStatsAggr Chunks = 3;
TDqStatsAggr Splits = 4;
Expand Down Expand Up @@ -312,6 +315,7 @@ message TDqStageStats {
TDqStatsAggr ResultBytes = 27;

TDqStatsAggr IngressBytes = 28;
TDqStatsAggr IngressDecompressedBytes = 37;
TDqStatsAggr IngressRows = 29;
TDqStatsAggr EgressBytes = 30;
TDqStatsAggr EgressRows = 31;
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/yql/dq/runtime/dq_async_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ struct TDqAsyncStats {

// basic stats
ui64 Bytes = 0;
ui64 DecompressedBytes = 0;
ui64 Rows = 0;
ui64 Chunks = 0;
ui64 Splits = 0;
Expand All @@ -69,6 +70,7 @@ struct TDqAsyncStats {

void MergeData(const TDqAsyncStats& other) {
Bytes += other.Bytes;
DecompressedBytes += other.DecompressedBytes;
Rows += other.Rows;
Chunks += other.Chunks;
Splits += other.Splits;
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/dq/counters/task_counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ struct TTaskCounters : public TCounters {

void AddAsyncStats(const NDqProto::TDqAsyncBufferStats stats, const std::map<TString, TString>& l, const TString& p) {
if (auto v = stats.GetBytes(); v) SetCounter(GetCounterName("TaskRunner", l, p + "Bytes"), v);
if (auto v = stats.GetDecompressedBytes(); v) SetCounter(GetCounterName("TaskRunner", l, p + "DecompressedBytes"), v);
if (auto v = stats.GetRows(); v) SetCounter(GetCounterName("TaskRunner", l, p + "Rows"), v);
if (auto v = stats.GetChunks(); v) SetCounter(GetCounterName("TaskRunner", l, p + "Chunks"), v);
if (auto v = stats.GetSplits(); v) SetCounter(GetCounterName("TaskRunner", l, p + "Splits"), v);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ template<typename T>
void ToProto(T& proto, const NDq::TDqAsyncStats& stats)
{
proto.SetBytes(stats.Bytes);
proto.SetDecompressedBytes(stats.DecompressedBytes);
proto.SetRows(stats.Rows);
proto.SetChunks(stats.Chunks);
proto.SetSplits(stats.Splits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ class TPortoProcess: public TChildProcess
void LoadFromProto(TDqAsyncStats& stats, const NYql::NDqProto::TDqAsyncBufferStats& f)
{
stats.Bytes = f.GetBytes();
stats.DecompressedBytes = f.GetDecompressedBytes();
stats.Rows = f.GetRows();
stats.Chunks = f.GetChunks();
stats.Splits = f.GetSplits();
Expand Down
13 changes: 11 additions & 2 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
decompressorBuffer = MakeDecompressor(*buffer, ReadSpec->Compression);
YQL_ENSURE(decompressorBuffer, "Unsupported " << ReadSpec->Compression << " compression.");
buffer = decompressorBuffer.get();

}

auto stream = std::make_unique<NDB::InputStreamFromInputFormat>(
Expand All @@ -388,7 +389,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
while (NDB::Block batch = stream->read()) {
Paused = SourceContext->Add(batch.bytes(), SelfActorId);
const bool isCancelled = StopIfConsumedEnough(batch.rows());
Send(ParentActorId, new TEvS3Provider::TEvNextBlock(batch, PathIndex, TakeIngressDelta(), TakeCpuTimeDelta()));
Send(ParentActorId, new TEvS3Provider::TEvNextBlock(batch, PathIndex, TakeIngressDelta(), TakeCpuTimeDelta(), ReadSpec->Compression ? TakeIngressDecompressedDelta(buffer->count()) : 0ULL));
if (Paused) {
CpuTime += GetCpuTimeDelta();
auto ev = WaitForSpecificEvent<TEvS3Provider::TEvContinue>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
Expand Down Expand Up @@ -429,7 +430,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
while (NDB::Block batch = stream->read()) {
Paused = SourceContext->Add(batch.bytes(), SelfActorId);
const bool isCancelled = StopIfConsumedEnough(batch.rows());
Send(ParentActorId, new TEvS3Provider::TEvNextBlock(batch, PathIndex, TakeIngressDelta(), TakeCpuTimeDelta()));
Send(ParentActorId, new TEvS3Provider::TEvNextBlock(batch, PathIndex, TakeIngressDelta(), TakeCpuTimeDelta(), ReadSpec->Compression ? TakeIngressDecompressedDelta(buffer->count()) : 0ULL));
if (Paused) {
CpuTime += GetCpuTimeDelta();
auto ev = WaitForSpecificEvent<TEvS3Provider::TEvContinue>(&TS3ReadCoroImpl::ProcessUnexpectedEvent);
Expand Down Expand Up @@ -1012,6 +1013,12 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
return currentIngressBytes;
}

ui64 TakeIngressDecompressedDelta(ui64 current) {
ui64 delta = current - TotalIngressDecompressedBytes;
TotalIngressDecompressedBytes = current;
return delta;
}

TDuration TakeCpuTimeDelta() {
auto currentCpuTime = CpuTime;
CpuTime = TDuration::Zero();
Expand Down Expand Up @@ -1156,6 +1163,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
std::size_t LastOffset = 0;
TString LastData;
ui64 IngressBytes = 0;
ui64 TotalIngressDecompressedBytes = 0;
TDuration CpuTime;
ui64 StartCycleCount = 0;
TString InputBuffer;
Expand Down Expand Up @@ -1640,6 +1648,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
YQL_ENSURE(!ReadSpec->Arrow);
auto rows = next->Get()->Block.rows();
IngressStats.Bytes += next->Get()->IngressDelta;
IngressStats.DecompressedBytes += next->Get()->IngressDecompressedDelta;
IngressStats.Rows += rows;
IngressStats.Chunks++;
IngressStats.Resume();
Expand Down
Loading