Skip to content

Commit 36dbfe5

Browse files
committed
feat(kqp): add UseSequentialReads pragma
1 parent 55a88ca commit 36dbfe5

File tree

5 files changed

+50
-2
lines changed

5 files changed

+50
-2
lines changed

ydb/core/kqp/opt/kqp_query_plan.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1789,7 +1789,7 @@ class TxPlanSerializer {
17891789
op.Properties["Reverse"] = true;
17901790
}
17911791

1792-
if (settings.SequentialInFlight) {
1792+
if (settings.SequentialInFlight || SerializerCtx.Config->UseSequentialReadsEnabled()) {
17931793
op.Properties["Scan"] = "Sequential";
17941794
} else {
17951795
op.Properties["Scan"] = "Parallel";

ydb/core/kqp/provider/yql_kikimr_settings.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ TKikimrConfiguration::TKikimrConfiguration() {
9595
REGISTER_SETTING(*this, MaxDPccpDPTableSize);
9696

9797
REGISTER_SETTING(*this, MaxTasksPerStage);
98+
REGISTER_SETTING(*this, UseSequentialReads);
9899

99100
/* Runtime */
100101
REGISTER_SETTING(*this, ScanQuery);
@@ -164,6 +165,10 @@ NDq::EHashJoinMode TKikimrSettings::GetHashJoinMode() const {
164165
return maybeHashJoinMode ? *maybeHashJoinMode : NDq::EHashJoinMode::Off;
165166
}
166167

168+
bool TKikimrSettings::UseSequentialReadsEnabled() const {
169+
return GetOptionalFlagValue(UseSequentialReads.Get()) == EOptionalFlag::Enabled;
170+
}
171+
167172
TKikimrSettings::TConstPtr TKikimrConfiguration::Snapshot() const {
168173
return std::make_shared<const TKikimrSettings>(*this);
169174
}

ydb/core/kqp/provider/yql_kikimr_settings.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ struct TKikimrSettings {
7272

7373

7474
NCommon::TConfSetting<ui32, false> MaxTasksPerStage;
75+
NCommon::TConfSetting<bool, false> UseSequentialReads;
7576

7677
/* Runtime */
7778
NCommon::TConfSetting<bool, true> ScanQuery;
@@ -82,6 +83,7 @@ struct TKikimrSettings {
8283
bool SystemColumnsEnabled() const;
8384
bool SpillingEnabled() const;
8485
bool DisableLlvmForUdfStages() const;
86+
bool UseSequentialReadsEnabled() const;
8587

8688
bool HasOptDisableTopSort() const;
8789
bool HasOptDisableSqlInToJoin() const;

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1009,7 +1009,9 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
10091009
readProto.SetSorted(readSettings.Sorted);
10101010
YQL_ENSURE(readSettings.SkipNullKeys.empty());
10111011

1012-
if (readSettings.SequentialInFlight) {
1012+
if (Config->UseSequentialReadsEnabled()) {
1013+
readProto.SetSequentialInFlightShards(1);
1014+
} else if (readSettings.SequentialInFlight) {
10131015
readProto.SetSequentialInFlightShards(*readSettings.SequentialInFlight);
10141016
}
10151017

ydb/core/kqp/ut/opt/kqp_ne_ut.cpp

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4235,7 +4235,46 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
42354235
}
42364236
}
42374237

4238+
Y_UNIT_TEST_TWIN(SequentialReadsPragma, Enabled) {
4239+
TKikimrRunner kikimr;
4240+
auto db = kikimr.GetTableClient();
4241+
auto session = db.CreateSession().GetValueSync().GetSession();
4242+
4243+
NYdb::NTable::TExecDataQuerySettings querySettings;
4244+
querySettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
4245+
4246+
TString query = R"(
4247+
SELECT Key, Data FROM `/Root/EightShard`
4248+
WHERE Text = "Value1"
4249+
ORDER BY Key
4250+
LIMIT 1;
4251+
)";
4252+
4253+
if (Enabled) {
4254+
TString pragma = TString(R"(
4255+
PRAGMA ydb.UseSequentialReads = "true";
4256+
)");
42384257

4258+
query = pragma + query;
4259+
}
4260+
4261+
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), querySettings).GetValueSync();
4262+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
4263+
CompareYson(R"([[[101u];[1]]])", FormatResultSetYson(result.GetResultSet(0)));
4264+
4265+
auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
4266+
for (const auto& phase : stats.query_phases()) {
4267+
for (const auto& access : phase.table_access()) {
4268+
if (access.name() == "/Root/EightShard") {
4269+
if (Enabled) {
4270+
UNIT_ASSERT_LT(access.partitions_count(), 8);
4271+
} else {
4272+
UNIT_ASSERT_EQUAL(access.partitions_count(), 8);
4273+
}
4274+
}
4275+
}
4276+
}
4277+
}
42394278

42404279
}
42414280

0 commit comments

Comments
 (0)