Skip to content

Commit b001939

Browse files
committed
fix stream lookup bytes calculation
1 parent 398fb41 commit b001939

File tree

3 files changed

+212
-9
lines changed

3 files changed

+212
-9
lines changed

ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
6767
return NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR;
6868
}
6969

70-
void FillExtraStats(NYql::NDqProto::TDqTaskStats* stats , bool last, const NYql::NDq::TDqMeteringStats*) override {
70+
void FillExtraStats(NYql::NDqProto::TDqTaskStats* stats , bool last, const NYql::NDq::TDqMeteringStats* mstats) override {
7171
if (last) {
7272
NYql::NDqProto::TDqTableStats* tableStats = nullptr;
7373
for (auto& table : *stats->MutableTables()) {
@@ -81,9 +81,12 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
8181
tableStats->SetTablePath(StreamLookupWorker->GetTablePath());
8282
}
8383

84+
ui64 consumedRows = mstats ? mstats->Inputs[InputIndex]->RowsConsumed : ReadRowsCount;
85+
ui64 consumedBytes = mstats ? mstats->Inputs[InputIndex]->BytesConsumed : ReadBytesCount;
86+
8487
// TODO: use evread statistics after KIKIMR-16924
85-
tableStats->SetReadRows(tableStats->GetReadRows() + ReadRowsCount);
86-
tableStats->SetReadBytes(tableStats->GetReadBytes() + ReadBytesCount);
88+
tableStats->SetReadRows(tableStats->GetReadRows() + consumedRows);
89+
tableStats->SetReadBytes(tableStats->GetReadBytes() + consumedBytes);
8790
tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + ReadsPerShard.size());
8891

8992
NKqpProto::TKqpTableExtraStats tableExtraStats;
@@ -148,7 +151,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
148151
};
149152

150153
struct TEvRetryRead : public TEventLocal<TEvRetryRead, EvRetryRead> {
151-
explicit TEvRetryRead(ui64 readId, ui64 lastSeqNo, bool instantStart = false)
154+
explicit TEvRetryRead(ui64 readId, ui64 lastSeqNo, bool instantStart = false)
152155
: ReadId(readId)
153156
, LastSeqNo(lastSeqNo)
154157
, InstantStart(instantStart) {
@@ -259,7 +262,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
259262
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
260263
CA_LOG_D("TEvResolveKeySetResult was received for table: " << StreamLookupWorker->GetTablePath());
261264
if (ev->Get()->Request->ErrorCount > 0) {
262-
TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: "
265+
TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: "
263266
<< StreamLookupWorker->GetTablePath();
264267
LookupActorStateSpan.EndError(errorMsg);
265268

@@ -419,7 +422,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
419422
auto readIt = Reads.find(ev->Get()->ReadId);
420423
YQL_ENSURE(readIt != Reads.end(), "Unexpected readId: " << ev->Get()->ReadId);
421424
auto& read = readIt->second;
422-
425+
423426
if (read.State == EReadState::Running && read.LastSeqNo <= ev->Get()->LastSeqNo) {
424427
if (ev->Get()->InstantStart) {
425428
read.SetFinished();
@@ -566,7 +569,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
566569
keyColumnTypes, TVector<TKeyDesc::TColumnOp>{}));
567570

568571
Counters->IteratorsShardResolve->Inc();
569-
LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(),
572+
LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(),
570573
"WaitForShardsResolve", NWilson::EFlags::AUTO_END);
571574

572575
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(StreamLookupWorker->GetTableId(), {}));

ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,13 +349,15 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {
349349
auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems);
350350

351351
i64 rowSize = 0;
352+
i64 storageRowSize = 0;
352353
for (size_t colIndex = 0, resultColIndex = 0; colIndex < Columns.size(); ++colIndex) {
353354
const auto& column = Columns[colIndex];
354355
if (IsSystemColumn(column.Name)) {
355356
NMiniKQL::FillSystemColumn(rowItems[colIndex], result.ShardId, column.Id, column.PType);
356357
rowSize += sizeof(NUdf::TUnboxedValue);
357358
} else {
358359
YQL_ENSURE(resultColIndex < resultRow.size());
360+
storageRowSize += resultRow[resultColIndex].Size();
359361
rowItems[colIndex] = NMiniKQL::GetCellValue(resultRow[resultColIndex], column.PType);
360362
rowSize += NMiniKQL::GetUnboxedValueSize(rowItems[colIndex], column.PType).AllocatedBytes;
361363
++resultColIndex;
@@ -370,10 +372,12 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {
370372

371373
batch.push_back(std::move(row));
372374

375+
storageRowSize = std::max(storageRowSize, (i64)8);
376+
373377
resultStats.ReadRowsCount += 1;
374-
resultStats.ReadBytesCount += rowSize;
378+
resultStats.ReadBytesCount += storageRowSize;
375379
resultStats.ResultRowsCount += 1;
376-
resultStats.ResultBytesCount += rowSize;
380+
resultStats.ResultBytesCount += storageRowSize;
377381
}
378382

379383
if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) {

ydb/core/kqp/ut/cost/kqp_cost_ut.cpp

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,202 @@ Y_UNIT_TEST_SUITE(KqpCost) {
4343
//runtime->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG);
4444
}
4545

46+
Y_UNIT_TEST_TWIN(IndexLookup, StreamLookup) {
47+
TKikimrRunner kikimr(GetAppConfig(true, StreamLookup));
48+
auto db = kikimr.GetTableClient();
49+
auto session = db.CreateSession().GetValueSync().GetSession();
50+
51+
session.ExecuteSchemeQuery(R"(
52+
--!syntax_v1
53+
CREATE TABLE `/Root/SecondaryKeys` (
54+
Key Int32,
55+
Fk Int32,
56+
Value String,
57+
ValueInt Int32,
58+
PRIMARY KEY (Key),
59+
INDEX Index GLOBAL ON (Fk)
60+
);
61+
62+
)").GetValueSync();
63+
64+
session.ExecuteDataQuery(R"(
65+
REPLACE INTO `/Root/SecondaryKeys` (Key, Fk, Value, ValueInt) VALUES
66+
(1, 1, "Payload1", 100),
67+
(2, 2, "Payload2", 200),
68+
(5, 5, "Payload5", 500),
69+
(NULL, 6, "Payload6", 600),
70+
(7, NULL, "Payload7", 700),
71+
(NULL, NULL, "Payload8", 800);
72+
)", TTxControl::BeginTx().CommitTx()).GetValueSync();
73+
74+
auto query = Q_(R"(
75+
SELECT Value FROM `/Root/SecondaryKeys` VIEW Index WHERE Fk = 1;
76+
)");
77+
78+
auto txControl = TTxControl::BeginTx().CommitTx();
79+
80+
auto result = session.ExecuteDataQuery(query, txControl, GetDataQuerySettings()).ExtractValueSync();
81+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
82+
83+
CompareYson(R"(
84+
[
85+
[["Payload1"]]
86+
]
87+
)", NYdb::FormatResultSetYson(result.GetResultSet(0)));
88+
89+
auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
90+
91+
std::unordered_map<TString, std::pair<int, int>> readsByTable;
92+
for(const auto& queryPhase : stats.query_phases()) {
93+
for(const auto& tableAccess: queryPhase.table_access()) {
94+
auto [it, success] = readsByTable.emplace(tableAccess.name(), std::make_pair(0, 0));
95+
it->second.first += tableAccess.reads().rows();
96+
it->second.second += tableAccess.reads().bytes();
97+
}
98+
}
99+
100+
for(const auto& [name, rowsAndBytes]: readsByTable) {
101+
Cerr << name << " " << rowsAndBytes.first << " " << rowsAndBytes.second << Endl;
102+
}
103+
104+
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").first, 1);
105+
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").second, 8);
106+
107+
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").first, 1);
108+
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").second, 8);
109+
}
110+
111+
Y_UNIT_TEST_TWIN(IndexLookupAtLeast8BytesInStorage, StreamLookup) {
112+
TKikimrRunner kikimr(GetAppConfig(true, StreamLookup));
113+
auto db = kikimr.GetTableClient();
114+
auto session = db.CreateSession().GetValueSync().GetSession();
115+
116+
session.ExecuteSchemeQuery(R"(
117+
--!syntax_v1
118+
CREATE TABLE `/Root/SecondaryKeys` (
119+
Key Int32,
120+
Fk Int32,
121+
Value String,
122+
ValueInt Int32,
123+
PRIMARY KEY (Key),
124+
INDEX Index GLOBAL ON (Fk)
125+
);
126+
127+
)").GetValueSync();
128+
129+
session.ExecuteDataQuery(R"(
130+
REPLACE INTO `/Root/SecondaryKeys` (Key, Fk, Value, ValueInt) VALUES
131+
(1, 1, "Payload1", 100),
132+
(2, 2, "Payload2", 200),
133+
(5, 5, "Payload5", 500),
134+
(NULL, 6, "Payload6", 600),
135+
(7, NULL, "Payload7", 700),
136+
(NULL, NULL, "Payload8", 800);
137+
)", TTxControl::BeginTx().CommitTx()).GetValueSync();
138+
139+
auto query = Q_(R"(
140+
SELECT ValueInt FROM `/Root/SecondaryKeys` VIEW Index WHERE Fk = 1;
141+
)");
142+
143+
auto txControl = TTxControl::BeginTx().CommitTx();
144+
145+
auto result = session.ExecuteDataQuery(query, txControl, GetDataQuerySettings()).ExtractValueSync();
146+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
147+
148+
CompareYson(R"(
149+
[
150+
[[100]]
151+
]
152+
)", NYdb::FormatResultSetYson(result.GetResultSet(0)));
153+
154+
auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
155+
156+
std::unordered_map<TString, std::pair<int, int>> readsByTable;
157+
for(const auto& queryPhase : stats.query_phases()) {
158+
for(const auto& tableAccess: queryPhase.table_access()) {
159+
auto [it, success] = readsByTable.emplace(tableAccess.name(), std::make_pair(0, 0));
160+
it->second.first += tableAccess.reads().rows();
161+
it->second.second += tableAccess.reads().bytes();
162+
}
163+
}
164+
165+
for(const auto& [name, rowsAndBytes]: readsByTable) {
166+
Cerr << name << " " << rowsAndBytes.first << " " << rowsAndBytes.second << Endl;
167+
}
168+
169+
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").first, 1);
170+
// 4 bytes is unexpected, because datashards has 8 bytes per row in storage.
171+
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").second, 8);
172+
173+
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").first, 1);
174+
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").second, 8);
175+
}
176+
177+
Y_UNIT_TEST_TWIN(IndexLookupAndTake, StreamLookup) {
178+
TKikimrRunner kikimr(GetAppConfig(true, StreamLookup));
179+
auto db = kikimr.GetTableClient();
180+
auto session = db.CreateSession().GetValueSync().GetSession();
181+
182+
session.ExecuteSchemeQuery(R"(
183+
--!syntax_v1
184+
CREATE TABLE `/Root/SecondaryKeys` (
185+
Key Int32,
186+
Fk Int32,
187+
Value String,
188+
ValueInt Int32,
189+
PRIMARY KEY (Key),
190+
INDEX Index GLOBAL ON (Fk)
191+
);
192+
193+
)").GetValueSync();
194+
195+
session.ExecuteDataQuery(R"(
196+
REPLACE INTO `/Root/SecondaryKeys` (Key, Fk, Value, ValueInt) VALUES
197+
(1, 1, "Payload1", 100),
198+
(2, 2, "Payload2", 200),
199+
(5, 5, "Payload5", 500),
200+
(NULL, 6, "Payload6", 600),
201+
(7, NULL, "Payload7", 700),
202+
(NULL, NULL, "Payload8", 800);
203+
)", TTxControl::BeginTx().CommitTx()).GetValueSync();
204+
205+
auto query = Q_(R"(
206+
SELECT Value FROM `/Root/SecondaryKeys` VIEW Index WHERE Fk >= 1 and Fk <= 2 AND StartsWith(Value, "Payload") LIMIT 1;
207+
)");
208+
209+
auto txControl = TTxControl::BeginTx().CommitTx();
210+
211+
auto result = session.ExecuteDataQuery(query, txControl, GetDataQuerySettings()).ExtractValueSync();
212+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
213+
214+
CompareYson(R"(
215+
[
216+
[["Payload1"]]
217+
]
218+
)", NYdb::FormatResultSetYson(result.GetResultSet(0)));
219+
220+
auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
221+
222+
std::unordered_map<TString, std::pair<int, int>> readsByTable;
223+
for(const auto& queryPhase : stats.query_phases()) {
224+
for(const auto& tableAccess: queryPhase.table_access()) {
225+
auto [it, success] = readsByTable.emplace(tableAccess.name(), std::make_pair(0, 0));
226+
it->second.first += tableAccess.reads().rows();
227+
it->second.second += tableAccess.reads().bytes();
228+
}
229+
}
230+
231+
for(const auto& [name, rowsAndBytes]: readsByTable) {
232+
Cerr << name << " " << rowsAndBytes.first << " " << rowsAndBytes.second << Endl;
233+
}
234+
235+
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").first, 1);
236+
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").second, 8);
237+
238+
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").first, 2);
239+
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").second, 16);
240+
}
241+
46242
Y_UNIT_TEST(PointLookup) {
47243
TKikimrRunner kikimr(GetAppConfig(false, false));
48244
auto db = kikimr.GetTableClient();

0 commit comments

Comments
 (0)