Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1030,15 +1030,12 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
}

std::sort(std::begin(shardsRanges), std::end(shardsRanges), [&](const TShardRangesWithShardId& lhs, const TShardRangesWithShardId& rhs) {
// Special case for infinity
if (lhs.Ranges->GetRightBorder().first->GetCells().empty() || rhs.Ranges->GetRightBorder().first->GetCells().empty()) {
YQL_ENSURE(!lhs.Ranges->GetRightBorder().first->GetCells().empty() || !rhs.Ranges->GetRightBorder().first->GetCells().empty());
return rhs.Ranges->GetRightBorder().first->GetCells().empty();
}
return CompareTypedCellVectors(
lhs.Ranges->GetRightBorder().first->GetCells().data(),
rhs.Ranges->GetRightBorder().first->GetCells().data(),
keyTypes.data(), keyTypes.size()) < 0;
return CompareBorders<false, false>(
lhs.Ranges->GetRightBorder().first->GetCells(),
rhs.Ranges->GetRightBorder().first->GetCells(),
lhs.Ranges->GetRightBorder().second,
rhs.Ranges->GetRightBorder().second,
keyTypes) < 0;
});

// One shard (ranges set) can be assigned only to one task. Otherwise, we can break some optimizations like removing unnecessary shuffle.
Expand Down
92 changes: 92 additions & 0 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4225,6 +4225,98 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
CompareYson(R"([[1u;["test1"];10];[2u;["test2"];11];[3u;["test3"];12];[4u;#;13];[101u;["test1"];10];[102u;["test2"];11];[103u;["test3"];12];[104u;#;13]])", FormatResultSetYson(result.GetResultSet(1)));
}
}

Y_UNIT_TEST(ReadManyRanges) {
NKikimrConfig::TAppConfig appConfig;
auto settings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetWithSampleTables(false);

TKikimrRunner kikimr(settings);
Tests::NCommon::TLoggerInit(kikimr).Initialize();

auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();

const TString query = R"(
CREATE TABLE `/Root/DataShard` (
Col1 String,
Col2 String,
Col3 String,
PRIMARY KEY (Col1, Col2)
)
WITH (
STORE = ROW,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10,
PARTITION_AT_KEYS = (("a"), ("b"), ("c"), ("d"), ("e"), ("f"), ("g"), ("h"), ("k"), ("p"), ("q"), ("x"))
);
)";

auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());

auto client = kikimr.GetQueryClient();

{
auto prepareResult = client.ExecuteQuery(R"(
UPSERT INTO `/Root/DataShard` (Col1, Col2) VALUES ("y", "1") , ("y", "2"), ("d", "1"), ("b", "1"), ("k", "1"), ("q", "1"), ("p", "1");
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
}

{
auto result = client.ExecuteQuery(R"(
SELECT COUNT(*) FROM `/Root/DataShard` WHERE Col1 IN ("d", "b", "k", "q", "p") OR (Col1 = "y" AND Col2 = "2");
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
CompareYson(R"([[6u]])", FormatResultSetYson(result.GetResultSet(0)));
}
}

Y_UNIT_TEST(ReadManyShardsRange) {
NKikimrConfig::TAppConfig appConfig;
auto settings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetWithSampleTables(false);

TKikimrRunner kikimr(settings);
Tests::NCommon::TLoggerInit(kikimr).Initialize();

auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();

const TString query = R"(
CREATE TABLE `/Root/DataShard` (
Col1 String,
Col2 String,
Col3 String,
PRIMARY KEY (Col1, Col2)
)
WITH (
STORE = ROW,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10,
PARTITION_AT_KEYS = (("a", "0"), ("b", "b"), ("c", "d"))
);
)";

auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());

auto client = kikimr.GetQueryClient();

{
auto prepareResult = client.ExecuteQuery(R"(
UPSERT INTO `/Root/DataShard` (Col1, Col2) VALUES ("a", "a") , ("c", "c"), ("d", "d");
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
}

{
auto result = client.ExecuteQuery(R"(
SELECT COUNT(*) FROM `/Root/DataShard` WHERE "a" <= Col1 AND Col1 <= "c";
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
CompareYson(R"([[2u]])", FormatResultSetYson(result.GetResultSet(0)));
}
}
}

} // namespace NKqp
Expand Down
Loading