Skip to content

Commit 06b7c08

Browse files
authored
fix various problems in stream lookup (#12218)
1 parent 76b2fb9 commit 06b7c08

File tree

3 files changed

+100
-10
lines changed

3 files changed

+100
-10
lines changed

ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,8 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
268268
RuntimeError(TStringBuilder() << "Unexpected event: " << ev->GetTypeRewrite(),
269269
NYql::NDqProto::StatusIds::INTERNAL_ERROR);
270270
}
271+
} catch (const NKikimr::TMemoryLimitExceededException& e) {
272+
RuntimeError("Memory limit exceeded at stream lookup", NYql::NDqProto::StatusIds::PRECONDITION_FAILED);
271273
} catch (const yexception& e) {
272274
RuntimeError(e.what(), NYql::NDqProto::StatusIds::INTERNAL_ERROR);
273275
}

ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ struct THashableKey {
8282
struct TKeyHash {
8383
using is_transparent = void;
8484

85-
bool operator()(TConstArrayRef<TCell> key) const {
85+
size_t operator()(TConstArrayRef<TCell> key) const {
8686
return absl::Hash<THashableKey>()(THashableKey{ key });
8787
}
8888
};
@@ -364,14 +364,16 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {
364364
}
365365
}
366366

367-
if (rowSize > freeSpace - (i64)resultStats.ResultBytesCount) {
368-
row.DeleteUnreferenced();
367+
if (rowSize + (i64)resultStats.ResultBytesCount > freeSpace) {
369368
sizeLimitExceeded = true;
369+
}
370+
371+
if (resultStats.ResultRowsCount && sizeLimitExceeded) {
372+
row.DeleteUnreferenced();
370373
break;
371374
}
372375

373376
batch.push_back(std::move(row));
374-
375377
storageRowSize = std::max(storageRowSize, (i64)8);
376378

377379
resultStats.ReadRowsCount += 1;
@@ -680,7 +682,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
680682

681683
for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) {
682684
const auto& row = result.ReadResult->Get()->GetCells(result.UnprocessedResultRow);
683-
// result can contain fewer columns because of system columns
685+
// result can contain fewer columns because of system columns
684686
YQL_ENSURE(row.size() <= ReadColumns.size(), "Result columns mismatch");
685687

686688
std::vector<TCell> joinKeyCells(LookupKeyColumns.size());
@@ -805,7 +807,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
805807
for (; result.FirstUnprocessedRow < result.Rows.size(); ++result.FirstUnprocessedRow) {
806808
auto& row = result.Rows[result.FirstUnprocessedRow];
807809

808-
if (resultStats.ResultBytesCount + row.Stats.ResultBytesCount > (ui64)freeSpace) {
810+
if (resultStats.ResultRowsCount && resultStats.ResultBytesCount + row.Stats.ResultBytesCount > (ui64)freeSpace) {
809811
sizeLimitExceeded = true;
810812
break;
811813
}

ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp

Lines changed: 90 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3830,7 +3830,7 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
38303830
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
38313831
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/TestTable1");
38323832
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 3);
3833-
3833+
38343834
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 2);
38353835
for (const auto& ta : stats.query_phases(1).table_access()) {
38363836
if (ta.name() == "/Root/TestTable2") {
@@ -4338,6 +4338,92 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
43384338
SelectFromAsyncIndexedTable();
43394339
}
43404340

4341+
Y_UNIT_TEST(SelectFromIndexesAndFreeSpaceLogicDoesntTimeout) {
4342+
auto setting = NKikimrKqp::TKqpSetting();
4343+
setting.SetName("_KqpYqlSyntaxVersion");
4344+
setting.SetValue("1");
4345+
auto serverSettings = TKikimrSettings()
4346+
.SetKqpSettings({setting});
4347+
4348+
NKikimrConfig::TAppConfig appConfig;
4349+
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true);
4350+
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(true);
4351+
// setting channel buffer size so small to make sure that we will be able to transfer at least
4352+
// one row in stream lookup.
4353+
appConfig.MutableTableServiceConfig()->MutableResourceManager()->SetChannelBufferSize(1_KB);
4354+
// setting string a bit larger than size of the channel buffer.
4355+
const int payloadSize = 5000;
4356+
4357+
serverSettings.SetAppConfig(appConfig);
4358+
TKikimrRunner kikimr(serverSettings);
4359+
auto db = kikimr.GetTableClient();
4360+
auto session = db.CreateSession().GetValueSync().GetSession();
4361+
CreateSampleTablesWithIndex(session);
4362+
4363+
NYdb::NTable::TExecDataQuerySettings execSettings;
4364+
execSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
4365+
4366+
{
4367+
const TString query(Q_(R"(
4368+
DECLARE $Payload AS String;
4369+
REPLACE INTO `/Root/SecondaryComplexKeys` (Key, Fk1, Fk2, Value) VALUES
4370+
(1, 1, "Fk1", $Payload);
4371+
)"));
4372+
4373+
TString largeString(payloadSize, 'a');
4374+
4375+
auto params = TParamsBuilder()
4376+
.AddParam("$Payload")
4377+
.String(largeString)
4378+
.Build()
4379+
.Build();
4380+
4381+
auto result = session.ExecuteDataQuery(
4382+
query,
4383+
TTxControl::BeginTx().CommitTx(),
4384+
params,
4385+
execSettings).ExtractValueSync();
4386+
4387+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
4388+
}
4389+
4390+
{
4391+
const TString query1(Q_(R"(
4392+
SELECT *
4393+
FROM `/Root/SecondaryComplexKeys` VIEW Index
4394+
WHERE Fk1 = 1
4395+
LIMIT 10;
4396+
)"));
4397+
4398+
auto result2 = session.ExecuteDataQuery(
4399+
query1,
4400+
TTxControl::BeginTx().CommitTx(),
4401+
execSettings).ExtractValueSync();
4402+
4403+
UNIT_ASSERT_C(result2.IsSuccess(), result2.GetIssues().ToString());
4404+
// UNIT_ASSERT(result2.GetIssues().Empty());
4405+
}
4406+
4407+
{
4408+
const TString query1(Q_(R"(
4409+
SELECT q.Value as V1, t.Value as V2
4410+
FROM `/Root/SecondaryComplexKeys` VIEW Index as t
4411+
LEFT JOIN `/Root/SecondaryComplexKeys` as q
4412+
ON q.Key = t.Key
4413+
WHERE t.Key = 1
4414+
LIMIT 10;
4415+
)"));
4416+
4417+
auto result2 = session.ExecuteDataQuery(
4418+
query1,
4419+
TTxControl::BeginTx().CommitTx(),
4420+
execSettings).ExtractValueSync();
4421+
4422+
UNIT_ASSERT_C(result2.IsSuccess(), result2.GetIssues().ToString());
4423+
// UNIT_ASSERT(result2.GetIssues().Empty());
4424+
}
4425+
}
4426+
43414427
Y_UNIT_TEST(InnerJoinWithNonIndexWherePredicate) {
43424428
auto setting = NKikimrKqp::TKqpSetting();
43434429
setting.SetName("_KqpYqlSyntaxVersion");
@@ -5344,7 +5430,7 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
53445430
auto result = session.ExecuteSchemeQuery(createTableSql).GetValueSync();
53455431
UNIT_ASSERT_C(result.GetIssues().Empty(), result.GetIssues().ToString());
53465432
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
5347-
}
5433+
}
53485434

53495435
{
53505436
const auto& yson = ReadTablePartToYson(session, "/Root/table");
@@ -5360,9 +5446,9 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
53605446
auto result = session.ExecuteDataQuery(selectSql, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync();
53615447
UNIT_ASSERT_C(result.GetIssues().Empty(), result.GetIssues().ToString());
53625448
UNIT_ASSERT(result.IsSuccess());
5363-
UNIT_ASSERT_VALUES_EQUAL(NYdb::FormatResultSetYson(result.GetResultSet(0)), R"([[[100u];[101u]];[[200u];[201u]]])");
5449+
UNIT_ASSERT_VALUES_EQUAL(NYdb::FormatResultSetYson(result.GetResultSet(0)), R"([[[100u];[101u]];[[200u];[201u]]])");
53645450
}
5365-
}
5451+
}
53665452
}
53675453

53685454
}

0 commit comments

Comments
 (0)