Skip to content

Commit 97a64df

Browse files
committed
feature(kqp): enable stream lookup for data query
1 parent f4755b6 commit 97a64df

17 files changed

+128
-51
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

+10-3
Original file line numberDiff line numberDiff line change
@@ -1650,11 +1650,18 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
16501650
}
16511651
}
16521652

1653-
bool HassDmlOperationOnOlap(NKqpProto::TKqpPhyTx_EType queryType, const NKqpProto::TKqpPhyStage& stage) {
1653+
bool HasDmlOperationOnOlap(NKqpProto::TKqpPhyTx_EType queryType, const NKqpProto::TKqpPhyStage& stage) {
16541654
if (queryType == NKqpProto::TKqpPhyTx::TYPE_DATA) {
16551655
return true;
16561656
}
1657-
for (const auto &tableOp : stage.GetTableOps()) {
1657+
1658+
for (const auto& input : stage.GetInputs()) {
1659+
if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) {
1660+
return true;
1661+
}
1662+
}
1663+
1664+
for (const auto& tableOp : stage.GetTableOps()) {
16581665
if (tableOp.GetTypeCase() != NKqpProto::TKqpPhyTableOperation::kReadOlapRange) {
16591666
return true;
16601667
}
@@ -1691,7 +1698,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
16911698
}
16921699
}
16931700

1694-
if (stageInfo.Meta.IsOlap() && HassDmlOperationOnOlap(tx.Body->GetType(), stage)) {
1701+
if (stageInfo.Meta.IsOlap() && HasDmlOperationOnOlap(tx.Body->GetType(), stage)) {
16951702
auto error = TStringBuilder() << "Data manipulation queries do not support column shard tables.";
16961703
LOG_E(error);
16971704
ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED,

ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGatew
112112
meta.TableId = MakeTableId(input.GetStreamLookup().GetTable());
113113
meta.TablePath = input.GetStreamLookup().GetTable().GetPath();
114114
meta.TableConstInfo = tx.Body->GetTableConstInfoById()->Map.at(meta.TableId);
115+
YQL_ENSURE(meta.TableConstInfo);
116+
meta.TableKind = meta.TableConstInfo->TableKind;
115117
}
116118

117119
if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kSequencer) {

ydb/core/kqp/ut/cost/kqp_cost_ut.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,11 @@ namespace NKqp {
1111
using namespace NYdb;
1212
using namespace NYdb::NTable;
1313

14-
static NKikimrConfig::TAppConfig GetAppConfig(bool sourceRead) {
14+
static NKikimrConfig::TAppConfig GetAppConfig(bool sourceRead, bool streamLookup = true) {
1515
auto app = NKikimrConfig::TAppConfig();
1616
app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(sourceRead);
1717
app.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(sourceRead);
18+
app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(streamLookup);
1819
return app;
1920
}
2021

@@ -43,7 +44,7 @@ Y_UNIT_TEST_SUITE(KqpCost) {
4344
//runtime->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG);
4445
}
4546
Y_UNIT_TEST_TWIN(PointLookup, SourceRead) {
46-
TKikimrRunner kikimr(GetAppConfig(SourceRead));
47+
TKikimrRunner kikimr(GetAppConfig(SourceRead, false));
4748
auto db = kikimr.GetTableClient();
4849
auto session = db.CreateSession().GetValueSync().GetSession();
4950

ydb/core/kqp/ut/effects/kqp_inplace_update_ut.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,7 @@ Y_UNIT_TEST_TWIN(BigRow, EnableInplaceUpdate) {
401401
// source read use iterator interface, that doesn't use datashard transactions
402402
NKikimrConfig::TAppConfig appConfig;
403403
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
404+
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
404405

405406
auto settings = TKikimrSettings()
406407
.SetAppConfig(appConfig)

ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp

+4-2
Original file line numberDiff line numberDiff line change
@@ -4030,9 +4030,9 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
40304030

40314031
auto& stats = NYdb::TProtoAccessor::GetProto(*result2.GetStats());
40324032

4033-
int readPhase = 1;
4033+
int readPhase = 0;
40344034
if (serverSettings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
4035-
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
4035+
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
40364036

40374037
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access().size(), 2);
40384038
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access(0).name(), "/Root/SecondaryComplexKeys");
@@ -4042,6 +4042,8 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
40424042
} else {
40434043
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
40444044

4045+
readPhase++;
4046+
40454047
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access().size(), 1);
40464048
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access(0).name(), "/Root/SecondaryComplexKeys/Index/indexImplTable");
40474049
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(readPhase).table_access(0).reads().rows(), 1);

ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ Y_UNIT_TEST_SUITE(KqpIndexLookupJoin) {
9090
void Test(const TString& query, const TString& answer, size_t rightTableReads, bool useStreamLookup = false) {
9191
NKikimrConfig::TAppConfig appConfig;
9292
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(useStreamLookup);
93+
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
9394

9495
auto settings = TKikimrSettings().SetAppConfig(appConfig);
9596
TKikimrRunner kikimr(settings);

ydb/core/kqp/ut/opt/kqp_extract_predicate_unpack_ut.cpp

+10-3
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,14 @@ void Test(const TString& query, const TString& answer, THashSet<TString> allowSc
158158
}
159159
}
160160

161-
void TestRange(const TString& query, const TString& answer, ui64 rowsRead, int stagesCount = 1) {
162-
TKikimrRunner kikimr;
161+
void TestRange(const TString& query, const TString& answer, ui64 rowsRead, int stagesCount = 1, bool streamLookup = true) {
162+
NKikimrConfig::TAppConfig appConfig;
163+
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(streamLookup);
164+
165+
auto settings = TKikimrSettings()
166+
.SetAppConfig(appConfig);
167+
168+
TKikimrRunner kikimr(settings);
163169
auto db = kikimr.GetTableClient();
164170
auto session = db.CreateSession().GetValueSync().GetSession();
165171

@@ -204,7 +210,8 @@ Y_UNIT_TEST(OverflowLookup) {
204210
)",
205211
R"([])",
206212
0,
207-
2);
213+
2,
214+
false);
208215

209216
TestRange(
210217
R"(

ydb/core/kqp/ut/opt/kqp_ne_ut.cpp

+7-2
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,12 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
205205

206206
auto explainResult = session.ExplainDataQuery(query).GetValueSync();
207207
UNIT_ASSERT_VALUES_EQUAL_C(explainResult.GetStatus(), EStatus::SUCCESS, explainResult.GetIssues().ToString());
208-
UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpLookupTable"), explainResult.GetAst());
208+
209+
if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
210+
UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpCnStreamLookup"), explainResult.GetAst());
211+
} else {
212+
UNIT_ASSERT_C(explainResult.GetAst().Contains("KqpLookupTable"), explainResult.GetAst());
213+
}
209214

210215
auto params = kikimr.GetTableClient().GetParamsBuilder()
211216
.AddParam("$group").OptionalUint32(1).Build()
@@ -1224,7 +1229,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
12241229
size_t phase = 0;
12251230
if (stats.query_phases().size() == 2) {
12261231
phase = 1;
1227-
} else if (stats.query_phases().size() == 0) {
1232+
} else if (stats.query_phases().size() == 1) {
12281233
phase = 0;
12291234
} else {
12301235
UNIT_ASSERT(false);

ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
278278

279279
Y_UNIT_TEST_QUAD(IndexLookupJoin, EnableStreamLookup, QueryService) {
280280
NKikimrConfig::TAppConfig appConfig;
281+
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
281282
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(EnableStreamLookup);
282283
auto settings = TKikimrSettings()
283284
.SetAppConfig(appConfig);

ydb/core/kqp/ut/query/kqp_explain_ut.cpp

+18-3
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,13 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
497497
UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue");
498498
node = FindPlanNodeByKv(plan, "Name", "TableFullScan");
499499
UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue");
500-
node = FindPlanNodeByKv(plan, "Name", "TablePointLookup");
500+
501+
if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
502+
node = FindPlanNodeByKv(plan, "Node Type", "TableLookup");
503+
} else {
504+
node = FindPlanNodeByKv(plan, "Name", "TablePointLookup");
505+
}
506+
501507
UNIT_ASSERT_EQUAL(node.GetMapSafe().at("Table").GetStringSafe(), "KeyValue");
502508
}
503509

@@ -533,7 +539,12 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
533539
UNIT_ASSERT_VALUES_EQUAL(rangeScansCount, 1);
534540

535541
ui32 lookupsCount = 0;
536-
lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TablePointLookup-ConstantExpr");
542+
if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
543+
lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TableLookup");
544+
} else {
545+
lookupsCount = CountPlanNodesByKv(plan, "Node Type", "TablePointLookup-ConstantExpr");
546+
}
547+
537548
UNIT_ASSERT_VALUES_EQUAL(lookupsCount, 1);
538549

539550
/* check tables section */
@@ -899,7 +910,11 @@ Y_UNIT_TEST_SUITE(KqpExplain) {
899910
}
900911

901912
Y_UNIT_TEST(MultiJoinCteLinks) {
902-
TKikimrRunner kikimr;
913+
NKikimrConfig::TAppConfig appConfig;
914+
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
915+
auto settings = TKikimrSettings()
916+
.SetAppConfig(appConfig);
917+
TKikimrRunner kikimr{settings};
903918
auto db = kikimr.GetTableClient();
904919
auto session = db.CreateSession().GetValueSync().GetSession();
905920

ydb/core/kqp/ut/query/kqp_query_ut.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,7 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
351351
Y_UNIT_TEST(QueryTimeoutImmediate) {
352352
NKikimrConfig::TAppConfig appConfig;
353353
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
354+
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
354355
auto settings = TKikimrSettings()
355356
.SetAppConfig(appConfig);
356357
TKikimrRunner kikimr{settings};
@@ -490,6 +491,7 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
490491

491492
Y_UNIT_TEST(QueryCancelImmediate) {
492493
NKikimrConfig::TAppConfig appConfig;
494+
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
493495
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
494496
auto settings = TKikimrSettings()
495497
.SetAppConfig(appConfig);

ydb/core/kqp/ut/query/kqp_stats_ut.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ TCollectedStreamResult JoinStatsBasic(
102102
std::function<Iterator(TKikimrRunner&, ECollectQueryStatsMode, const TString&)> getIter) {
103103
NKikimrConfig::TAppConfig appConfig;
104104
appConfig.MutableTableServiceConfig()->SetEnableKqpScanQueryStreamLookup(false);
105+
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
105106
appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true);
106107
auto settings = TKikimrSettings()
107108
.SetAppConfig(appConfig);

ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) {
5050
if (result.GetStatus() == EStatus::SUCCESS)
5151
continue;
5252

53-
if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQuerySourceRead() && false) {
53+
if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
5454
UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::DEFAULT_ERROR,
5555
[](const NYql::TIssue& issue){
5656
return issue.GetMessage().Contains("has no snapshot at");

ydb/core/protos/table_service_config.proto

+1-1
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ message TTableServiceConfig {
224224
optional uint64 SessionIdleDurationSeconds = 28 [default = 600];
225225
optional TAggregationConfig AggregationConfig = 29;
226226
optional bool EnableKqpScanQueryStreamLookup = 30 [default = true];
227-
optional bool EnableKqpDataQueryStreamLookup = 31 [default = false];
227+
optional bool EnableKqpDataQueryStreamLookup = 31 [default = true];
228228
optional TExecuterRetriesConfig ExecuterRetriesConfig = 32;
229229
reserved 33; // optional bool EnableKqpDataQueryStreamPointLookup = 33 [default = false];
230230
optional bool EnablePublishKqpProxyByRM = 34 [default = true];

ydb/core/tx/datashard/datashard_ut_order.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -1526,6 +1526,7 @@ Y_UNIT_TEST(TestMvccReadDoesntBlockWrites) {
15261526
TPortManager pm;
15271527
NKikimrConfig::TAppConfig app;
15281528
app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
1529+
app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
15291530
TServerSettings serverSettings(pm.GetPort(2134));
15301531
serverSettings.SetEnableMvccSnapshotReads(false);
15311532
serverSettings.SetDomainName("Root")
@@ -1863,9 +1864,12 @@ Y_UNIT_TEST_TWIN(TestOutOfOrderNonConflictingWrites, StreamLookup) {
18631864

18641865
Y_UNIT_TEST(MvccTestOutOfOrderRestartLocksSingleWithoutBarrier) {
18651866
TPortManager pm;
1867+
NKikimrConfig::TAppConfig app;
1868+
app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
18661869
TServerSettings serverSettings(pm.GetPort(2134));
18671870
serverSettings.SetEnableMvccSnapshotReads(false);
18681871
serverSettings.SetDomainName("Root")
1872+
.SetAppConfig(app)
18691873
.SetUseRealThreads(false);
18701874

18711875
Tests::TServer::TPtr server = new TServer(serverSettings);
@@ -3507,6 +3511,7 @@ Y_UNIT_TEST(MvccTestSnapshotRead) {
35073511
TPortManager pm;
35083512
NKikimrConfig::TAppConfig app;
35093513
app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(false);
3514+
app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
35103515
TServerSettings serverSettings(pm.GetPort(2134));
35113516
serverSettings.SetDomainName("Root")
35123517
.SetAppConfig(app)

0 commit comments

Comments
 (0)