Skip to content

Commit 0e102dd

Browse files
committed
Fix reads from many shards (ydb-platform#11569)
1 parent 288f740 commit 0e102dd

File tree

2 files changed

+175
-9
lines changed

2 files changed

+175
-9
lines changed

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,15 +1030,12 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
10301030
}
10311031

10321032
std::sort(std::begin(shardsRanges), std::end(shardsRanges), [&](const TShardRangesWithShardId& lhs, const TShardRangesWithShardId& rhs) {
1033-
// Special case for infinity
1034-
if (lhs.Ranges->GetRightBorder().first->GetCells().empty() || rhs.Ranges->GetRightBorder().first->GetCells().empty()) {
1035-
YQL_ENSURE(!lhs.Ranges->GetRightBorder().first->GetCells().empty() || !rhs.Ranges->GetRightBorder().first->GetCells().empty());
1036-
return rhs.Ranges->GetRightBorder().first->GetCells().empty();
1037-
}
1038-
return CompareTypedCellVectors(
1039-
lhs.Ranges->GetRightBorder().first->GetCells().data(),
1040-
rhs.Ranges->GetRightBorder().first->GetCells().data(),
1041-
keyTypes.data(), keyTypes.size()) < 0;
1033+
return CompareBorders<false, false>(
1034+
lhs.Ranges->GetRightBorder().first->GetCells(),
1035+
rhs.Ranges->GetRightBorder().first->GetCells(),
1036+
lhs.Ranges->GetRightBorder().second,
1037+
rhs.Ranges->GetRightBorder().second,
1038+
keyTypes) < 0;
10421039
});
10431040

10441041
// One shard (ranges set) can be assigned only to one task. Otherwise, we can break some optimizations like removing unnecessary shuffle.

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4225,6 +4225,175 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
42254225
CompareYson(R"([[1u;["test1"];10];[2u;["test2"];11];[3u;["test3"];12];[4u;#;13];[101u;["test1"];10];[102u;["test2"];11];[103u;["test3"];12];[104u;#;13]])", FormatResultSetYson(result.GetResultSet(1)));
42264226
}
42274227
}
4228+
4229+
Y_UNIT_TEST(MixedReadQueryWithoutStreamLookup) {
4230+
NKikimrConfig::TAppConfig appConfig;
4231+
appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true);
4232+
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(false);
4233+
appConfig.MutableTableServiceConfig()->SetEnableHtapTx(false);
4234+
appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(false);
4235+
4236+
auto settings = TKikimrSettings()
4237+
.SetAppConfig(appConfig)
4238+
.SetWithSampleTables(false);
4239+
4240+
TKikimrRunner kikimr(settings);
4241+
Tests::NCommon::TLoggerInit(kikimr).Initialize();
4242+
4243+
auto client = kikimr.GetQueryClient();
4244+
4245+
{
4246+
auto createTable = client.ExecuteQuery(R"sql(
4247+
CREATE TABLE `/Root/DataShard` (
4248+
Col1 Uint64 NOT NULL,
4249+
Col2 Int32 NOT NULL,
4250+
Col3 String,
4251+
PRIMARY KEY (Col1, Col2)
4252+
) WITH (STORE = ROW);
4253+
CREATE TABLE `/Root/ColumnShard` (
4254+
Col1 Uint64 NOT NULL,
4255+
Col2 Int32 NOT NULL,
4256+
Col3 String,
4257+
PRIMARY KEY (Col1, Col2)
4258+
) WITH (STORE = COLUMN);
4259+
)sql", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync();
4260+
UNIT_ASSERT_C(createTable.IsSuccess(), createTable.GetIssues().ToString());
4261+
}
4262+
4263+
{
4264+
auto replaceValues = client.ExecuteQuery(R"sql(
4265+
REPLACE INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES
4266+
(1u, 1, "row"), (1u, 2, "row"), (1u, 3, "row"), (2u, 3, "row");
4267+
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
4268+
UNIT_ASSERT_C(replaceValues.IsSuccess(), replaceValues.GetIssues().ToString());
4269+
}
4270+
{
4271+
auto replaceValues = client.ExecuteQuery(R"sql(
4272+
REPLACE INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES
4273+
(1u, 1, "row"), (1u, 2, "row"), (1u, 3, "row"), (2u, 3, "row");
4274+
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
4275+
UNIT_ASSERT_C(replaceValues.IsSuccess(), replaceValues.GetIssues().ToString());
4276+
}
4277+
4278+
{
4279+
auto it = client.StreamExecuteQuery(R"sql(
4280+
SELECT Col3 FROM `/Root/DataShard` WHERE Col1 = 1u
4281+
UNION ALL
4282+
SELECT Col3 FROM `/Root/ColumnShard` WHERE Col1 = 1u;
4283+
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
4284+
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
4285+
TString output = StreamResultToYson(it);
4286+
CompareYson(
4287+
output,
4288+
R"([[["row"]];[["row"]];[["row"]];[["row"]];[["row"]];[["row"]]])");
4289+
}
4290+
4291+
{
4292+
auto it = client.StreamExecuteQuery(R"sql(
4293+
SELECT r.Col3
4294+
FROM `/Root/DataShard` AS r
4295+
JOIN `/Root/ColumnShard` AS c
4296+
ON r.Col1 = c.Col1;
4297+
)sql", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
4298+
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
4299+
TString output = StreamResultToYson(it);
4300+
CompareYson(
4301+
output,
4302+
R"([[["row"]];[["row"]];[["row"]];[["row"]];[["row"]];[["row"]];[["row"]];[["row"]];[["row"]];[["row"]]])");
4303+
}
4304+
}
4305+
4306+
Y_UNIT_TEST(ReadManyRanges) {
4307+
NKikimrConfig::TAppConfig appConfig;
4308+
auto settings = TKikimrSettings()
4309+
.SetAppConfig(appConfig)
4310+
.SetWithSampleTables(false);
4311+
4312+
TKikimrRunner kikimr(settings);
4313+
Tests::NCommon::TLoggerInit(kikimr).Initialize();
4314+
4315+
auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
4316+
4317+
const TString query = R"(
4318+
CREATE TABLE `/Root/DataShard` (
4319+
Col1 String,
4320+
Col2 String,
4321+
Col3 String,
4322+
PRIMARY KEY (Col1, Col2)
4323+
)
4324+
WITH (
4325+
STORE = ROW,
4326+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10,
4327+
PARTITION_AT_KEYS = (("a"), ("b"), ("c"), ("d"), ("e"), ("f"), ("g"), ("h"), ("k"), ("p"), ("q"), ("x"))
4328+
);
4329+
)";
4330+
4331+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
4332+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
4333+
4334+
auto client = kikimr.GetQueryClient();
4335+
4336+
{
4337+
auto prepareResult = client.ExecuteQuery(R"(
4338+
UPSERT INTO `/Root/DataShard` (Col1, Col2) VALUES ("y", "1") , ("y", "2"), ("d", "1"), ("b", "1"), ("k", "1"), ("q", "1"), ("p", "1");
4339+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
4340+
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
4341+
}
4342+
4343+
{
4344+
auto result = client.ExecuteQuery(R"(
4345+
SELECT COUNT(*) FROM `/Root/DataShard` WHERE Col1 IN ("d", "b", "k", "q", "p") OR (Col1 = "y" AND Col2 = "2");
4346+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
4347+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
4348+
CompareYson(R"([[6u]])", FormatResultSetYson(result.GetResultSet(0)));
4349+
}
4350+
}
4351+
4352+
Y_UNIT_TEST(ReadManyShardsRange) {
4353+
NKikimrConfig::TAppConfig appConfig;
4354+
auto settings = TKikimrSettings()
4355+
.SetAppConfig(appConfig)
4356+
.SetWithSampleTables(false);
4357+
4358+
TKikimrRunner kikimr(settings);
4359+
Tests::NCommon::TLoggerInit(kikimr).Initialize();
4360+
4361+
auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
4362+
4363+
const TString query = R"(
4364+
CREATE TABLE `/Root/DataShard` (
4365+
Col1 String,
4366+
Col2 String,
4367+
Col3 String,
4368+
PRIMARY KEY (Col1, Col2)
4369+
)
4370+
WITH (
4371+
STORE = ROW,
4372+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10,
4373+
PARTITION_AT_KEYS = (("a", "0"), ("b", "b"), ("c", "d"))
4374+
);
4375+
)";
4376+
4377+
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
4378+
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
4379+
4380+
auto client = kikimr.GetQueryClient();
4381+
4382+
{
4383+
auto prepareResult = client.ExecuteQuery(R"(
4384+
UPSERT INTO `/Root/DataShard` (Col1, Col2) VALUES ("a", "a") , ("c", "c"), ("d", "d");
4385+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
4386+
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
4387+
}
4388+
4389+
{
4390+
auto result = client.ExecuteQuery(R"(
4391+
SELECT COUNT(*) FROM `/Root/DataShard` WHERE "a" <= Col1 AND Col1 <= "c";
4392+
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
4393+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
4394+
CompareYson(R"([[2u]])", FormatResultSetYson(result.GetResultSet(0)));
4395+
}
4396+
}
42284397
}
42294398

42304399
} // namespace NKqp

0 commit comments

Comments
 (0)