Skip to content

Commit 9019b62

Browse files
Merge 36dbfe5 into 965aa2d
2 parents 965aa2d + 36dbfe5 commit 9019b62

File tree

5 files changed

+50
-2
lines changed

5 files changed

+50
-2
lines changed

ydb/core/kqp/opt/kqp_query_plan.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1835,7 +1835,7 @@ class TxPlanSerializer {
18351835
op.Properties["Reverse"] = true;
18361836
}
18371837

1838-
if (settings.SequentialInFlight) {
1838+
if (settings.SequentialInFlight || SerializerCtx.Config->UseSequentialReadsEnabled()) {
18391839
op.Properties["Scan"] = "Sequential";
18401840
} else {
18411841
op.Properties["Scan"] = "Parallel";

ydb/core/kqp/provider/yql_kikimr_settings.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ TKikimrConfiguration::TKikimrConfiguration() {
9595
REGISTER_SETTING(*this, MaxDPccpDPTableSize);
9696

9797
REGISTER_SETTING(*this, MaxTasksPerStage);
98+
REGISTER_SETTING(*this, UseSequentialReads);
9899

99100
/* Runtime */
100101
REGISTER_SETTING(*this, ScanQuery);
@@ -164,6 +165,10 @@ NDq::EHashJoinMode TKikimrSettings::GetHashJoinMode() const {
164165
return maybeHashJoinMode ? *maybeHashJoinMode : NDq::EHashJoinMode::Off;
165166
}
166167

168+
bool TKikimrSettings::UseSequentialReadsEnabled() const {
169+
return GetOptionalFlagValue(UseSequentialReads.Get()) == EOptionalFlag::Enabled;
170+
}
171+
167172
TKikimrSettings::TConstPtr TKikimrConfiguration::Snapshot() const {
168173
return std::make_shared<const TKikimrSettings>(*this);
169174
}

ydb/core/kqp/provider/yql_kikimr_settings.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ struct TKikimrSettings {
7272

7373

7474
NCommon::TConfSetting<ui32, false> MaxTasksPerStage;
75+
NCommon::TConfSetting<bool, false> UseSequentialReads;
7576

7677
/* Runtime */
7778
NCommon::TConfSetting<bool, true> ScanQuery;
@@ -82,6 +83,7 @@ struct TKikimrSettings {
8283
bool SystemColumnsEnabled() const;
8384
bool SpillingEnabled() const;
8485
bool DisableLlvmForUdfStages() const;
86+
bool UseSequentialReadsEnabled() const;
8587

8688
bool HasOptDisableTopSort() const;
8789
bool HasOptDisableSqlInToJoin() const;

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1034,7 +1034,9 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
10341034
readProto.SetSorted(readSettings.Sorted);
10351035
YQL_ENSURE(readSettings.SkipNullKeys.empty());
10361036

1037-
if (readSettings.SequentialInFlight) {
1037+
if (Config->UseSequentialReadsEnabled()) {
1038+
readProto.SetSequentialInFlightShards(1);
1039+
} else if (readSettings.SequentialInFlight) {
10381040
readProto.SetSequentialInFlightShards(*readSettings.SequentialInFlight);
10391041
}
10401042

ydb/core/kqp/ut/opt/kqp_ne_ut.cpp

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4322,7 +4322,46 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
43224322
}
43234323
}
43244324

4325+
Y_UNIT_TEST_TWIN(SequentialReadsPragma, Enabled) {
4326+
TKikimrRunner kikimr;
4327+
auto db = kikimr.GetTableClient();
4328+
auto session = db.CreateSession().GetValueSync().GetSession();
4329+
4330+
NYdb::NTable::TExecDataQuerySettings querySettings;
4331+
querySettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
4332+
4333+
TString query = R"(
4334+
SELECT Key, Data FROM `/Root/EightShard`
4335+
WHERE Text = "Value1"
4336+
ORDER BY Key
4337+
LIMIT 1;
4338+
)";
4339+
4340+
if (Enabled) {
4341+
TString pragma = TString(R"(
4342+
PRAGMA ydb.UseSequentialReads = "true";
4343+
)");
43254344

4345+
query = pragma + query;
4346+
}
4347+
4348+
auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), querySettings).GetValueSync();
4349+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
4350+
CompareYson(R"([[[101u];[1]]])", FormatResultSetYson(result.GetResultSet(0)));
4351+
4352+
auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
4353+
for (const auto& phase : stats.query_phases()) {
4354+
for (const auto& access : phase.table_access()) {
4355+
if (access.name() == "/Root/EightShard") {
4356+
if (Enabled) {
4357+
UNIT_ASSERT_LT(access.partitions_count(), 8);
4358+
} else {
4359+
UNIT_ASSERT_EQUAL(access.partitions_count(), 8);
4360+
}
4361+
}
4362+
}
4363+
}
4364+
}
43264365

43274366
}
43284367

0 commit comments

Comments
 (0)