Skip to content

Commit e5289eb

Browse files
authored
[24-3-hotfix] Fix reads from many shards #11569 (#11659)
1 parent c86dfaf commit e5289eb

File tree

2 files changed

+98
-9
lines changed

2 files changed

+98
-9
lines changed

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,15 +1030,12 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
10301030
}
10311031

10321032
std::sort(std::begin(shardsRanges), std::end(shardsRanges), [&](const TShardRangesWithShardId& lhs, const TShardRangesWithShardId& rhs) {
1033-
// Special case for infinity
1034-
if (lhs.Ranges->GetRightBorder().first->GetCells().empty() || rhs.Ranges->GetRightBorder().first->GetCells().empty()) {
1035-
YQL_ENSURE(!lhs.Ranges->GetRightBorder().first->GetCells().empty() || !rhs.Ranges->GetRightBorder().first->GetCells().empty());
1036-
return rhs.Ranges->GetRightBorder().first->GetCells().empty();
1037-
}
1038-
return CompareTypedCellVectors(
1039-
lhs.Ranges->GetRightBorder().first->GetCells().data(),
1040-
rhs.Ranges->GetRightBorder().first->GetCells().data(),
1041-
keyTypes.data(), keyTypes.size()) < 0;
1033+
return CompareBorders<false, false>(
1034+
lhs.Ranges->GetRightBorder().first->GetCells(),
1035+
rhs.Ranges->GetRightBorder().first->GetCells(),
1036+
lhs.Ranges->GetRightBorder().second,
1037+
rhs.Ranges->GetRightBorder().second,
1038+
keyTypes) < 0;
10421039
});
10431040

10441041
// One shard (ranges set) can be assigned only to one task. Otherwise, we can break some optimizations like removing unnecessary shuffle.

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4225,6 +4225,98 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
42254225
CompareYson(R"([[1u;["test1"];10];[2u;["test2"];11];[3u;["test3"];12];[4u;#;13];[101u;["test1"];10];[102u;["test2"];11];[103u;["test3"];12];[104u;#;13]])", FormatResultSetYson(result.GetResultSet(1)));
42264226
}
42274227
}
4228+
4229+
Y_UNIT_TEST(ReadManyRanges) {
4230+
NKikimrConfig::TAppConfig appConfig;
4231+
auto settings = TKikimrSettings()
4232+
.SetAppConfig(appConfig)
4233+
.SetWithSampleTables(false);
4234+
4235+
TKikimrRunner kikimr(settings);
4236+
Tests::NCommon::TLoggerInit(kikimr).Initialize();
4237+
4238+
auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
4239+
4240+
const TString query = R"(
4241+
CREATE TABLE `/Root/DataShard` (
4242+
Col1 String,
4243+
Col2 String,
4244+
Col3 String,
4245+
PRIMARY KEY (Col1, Col2)
4246+
)
4247+
WITH (
4248+
STORE = ROW,
4249+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10,
4250+
PARTITION_AT_KEYS = (("a"), ("b"), ("c"), ("d"), ("e"), ("f"), ("g"), ("h"), ("k"), ("p"), ("q"), ("x"))
4251+
);
4252+
)";
4253+
4254+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
4255+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
4256+
4257+
auto client = kikimr.GetQueryClient();
4258+
4259+
{
4260+
auto prepareResult = client.ExecuteQuery(R"(
4261+
UPSERT INTO `/Root/DataShard` (Col1, Col2) VALUES ("y", "1") , ("y", "2"), ("d", "1"), ("b", "1"), ("k", "1"), ("q", "1"), ("p", "1");
4262+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
4263+
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
4264+
}
4265+
4266+
{
4267+
auto result = client.ExecuteQuery(R"(
4268+
SELECT COUNT(*) FROM `/Root/DataShard` WHERE Col1 IN ("d", "b", "k", "q", "p") OR (Col1 = "y" AND Col2 = "2");
4269+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
4270+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
4271+
CompareYson(R"([[6u]])", FormatResultSetYson(result.GetResultSet(0)));
4272+
}
4273+
}
4274+
4275+
Y_UNIT_TEST(ReadManyShardsRange) {
4276+
NKikimrConfig::TAppConfig appConfig;
4277+
auto settings = TKikimrSettings()
4278+
.SetAppConfig(appConfig)
4279+
.SetWithSampleTables(false);
4280+
4281+
TKikimrRunner kikimr(settings);
4282+
Tests::NCommon::TLoggerInit(kikimr).Initialize();
4283+
4284+
auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
4285+
4286+
const TString query = R"(
4287+
CREATE TABLE `/Root/DataShard` (
4288+
Col1 String,
4289+
Col2 String,
4290+
Col3 String,
4291+
PRIMARY KEY (Col1, Col2)
4292+
)
4293+
WITH (
4294+
STORE = ROW,
4295+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10,
4296+
PARTITION_AT_KEYS = (("a", "0"), ("b", "b"), ("c", "d"))
4297+
);
4298+
)";
4299+
4300+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
4301+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
4302+
4303+
auto client = kikimr.GetQueryClient();
4304+
4305+
{
4306+
auto prepareResult = client.ExecuteQuery(R"(
4307+
UPSERT INTO `/Root/DataShard` (Col1, Col2) VALUES ("a", "a") , ("c", "c"), ("d", "d");
4308+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
4309+
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
4310+
}
4311+
4312+
{
4313+
auto result = client.ExecuteQuery(R"(
4314+
SELECT COUNT(*) FROM `/Root/DataShard` WHERE "a" <= Col1 AND Col1 <= "c";
4315+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
4316+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
4317+
CompareYson(R"([[2u]])", FormatResultSetYson(result.GetResultSet(0)));
4318+
}
4319+
}
42284320
}
42294321

42304322
} // namespace NKqp

0 commit comments

Comments
 (0)