Skip to content

Commit ee54ab3

Browse files
ulya-sidorinagridnevvvit
authored andcommitted
feat(kqp): add pragma for sequential reads (ydb-platform#11715)
1 parent 9dfa2f7 commit ee54ab3

File tree

4 files changed

+51
-0
lines changed

4 files changed

+51
-0
lines changed

ydb/core/kqp/opt/physical/kqp_opt_phy_source.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,11 @@ TExprBase KqpRewriteReadTable(TExprBase node, TExprContext& ctx, const TKqpOptim
110110
matched->Settings = settings.BuildNode(ctx, matched->Settings.Pos());
111111
}
112112

113+
if (kqpCtx.Config->HasMaxSequentialReadsInFlight()) {
114+
settings.SequentialInFlight = *kqpCtx.Config->MaxSequentialReadsInFlight.Get();
115+
matched->Settings = settings.BuildNode(ctx, matched->Settings.Pos());
116+
}
117+
113118
TVector<TExprBase> inputs;
114119
TVector<TCoArgument> args;
115120
TNodeOnNodeOwnedMap argReplaces;

ydb/core/kqp/provider/yql_kikimr_settings.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ TKikimrConfiguration::TKikimrConfiguration() {
9797
REGISTER_SETTING(*this, MaxDPccpDPTableSize);
9898

9999
REGISTER_SETTING(*this, MaxTasksPerStage);
100+
REGISTER_SETTING(*this, MaxSequentialReadsInFlight);
100101

101102
/* Runtime */
102103
REGISTER_SETTING(*this, ScanQuery);
@@ -153,6 +154,10 @@ bool TKikimrSettings::HasOptUseFinalizeByKey() const {
153154
return GetFlagValue(OptUseFinalizeByKey.Get().GetOrElse(true)) != EOptionalFlag::Disabled;
154155
}
155156

157+
bool TKikimrSettings::HasMaxSequentialReadsInFlight() const {
158+
return !MaxSequentialReadsInFlight.Get().Empty();
159+
}
160+
156161
EOptionalFlag TKikimrSettings::GetOptPredicateExtract() const {
157162
return GetOptionalFlagValue(OptEnablePredicateExtract.Get());
158163
}

ydb/core/kqp/provider/yql_kikimr_settings.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ struct TKikimrSettings {
7373

7474

7575
NCommon::TConfSetting<ui32, false> MaxTasksPerStage;
76+
NCommon::TConfSetting<ui32, false> MaxSequentialReadsInFlight;
7677

7778
/* Runtime */
7879
NCommon::TConfSetting<bool, true> ScanQuery;
@@ -89,6 +90,7 @@ struct TKikimrSettings {
8990
bool HasOptEnableOlapPushdown() const;
9091
bool HasOptEnableOlapProvideComputeSharding() const;
9192
bool HasOptUseFinalizeByKey() const;
93+
bool HasMaxSequentialReadsInFlight() const;
9294
bool OrderPreservingLookupJoinEnabled() const;
9395
EOptionalFlag GetOptPredicateExtract() const;
9496
EOptionalFlag GetUseLlvm() const;

ydb/core/kqp/ut/opt/kqp_ne_ut.cpp

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4365,7 +4365,46 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
43654365
}
43664366
}
43674367

4368+
Y_UNIT_TEST_TWIN(SequentialReadsPragma, Enabled) {
4369+
TKikimrRunner kikimr;
4370+
auto db = kikimr.GetTableClient();
4371+
auto session = db.CreateSession().GetValueSync().GetSession();
4372+
4373+
NYdb::NTable::TExecDataQuerySettings querySettings;
4374+
querySettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
4375+
4376+
TString query = R"(
4377+
SELECT Key, Data FROM `/Root/EightShard`
4378+
WHERE Text = "Value1"
4379+
ORDER BY Key
4380+
LIMIT 1;
4381+
)";
4382+
4383+
if (Enabled) {
4384+
TString pragma = TString(R"(
4385+
PRAGMA ydb.MaxSequentialReadsInFlight = "1";
4386+
)");
43684387

4388+
query = pragma + query;
4389+
}
4390+
4391+
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), querySettings).GetValueSync();
4392+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
4393+
CompareYson(R"([[[101u];[1]]])", FormatResultSetYson(result.GetResultSet(0)));
4394+
4395+
auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
4396+
for (const auto& phase : stats.query_phases()) {
4397+
for (const auto& access : phase.table_access()) {
4398+
if (access.name() == "/Root/EightShard") {
4399+
if (Enabled) {
4400+
UNIT_ASSERT_LT(access.partitions_count(), 8);
4401+
} else {
4402+
UNIT_ASSERT_EQUAL(access.partitions_count(), 8);
4403+
}
4404+
}
4405+
}
4406+
}
4407+
}
43694408

43704409
}
43714410

0 commit comments

Comments
 (0)