Skip to content

Support PG types in schemeshard #11023

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ namespace NKikimr {
namespace NSchemeShard {

static bool IsIntegerType(NScheme::TTypeInfo type) {
// TODO: support pg types
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generic not-only-integer realization is enough. Tested in SplitShardsWhithPgKey test.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its enough to work but not enough to work well.
Below in the same file IsIntegerType() is used to choose between fast and slow branches for split prefix search. And failing to support integer pg types as integer forces ChooseSplitKeyByHistogram() to choose the slow pass.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, support for this optimization requires the ability to increment a value of integer pg type which is not currently supported. I'll do it in separate issue: #11168

switch (type.GetTypeId()) {
case NScheme::NTypeIds::Bool:

Expand Down Expand Up @@ -90,6 +89,7 @@ TSerializedCellVec ChooseSplitKeyByHistogram(const NKikimrTableStats::THistogram
splitKey[i] = keyMed.GetCells()[i];
} else {
// med == lo and med != hi, so we want to find a value that is > med and <= hi
// TODO: support this optimization for integer pg types
if (IsIntegerType(columnType) && !keyMed.GetCells()[i].IsNull()) {
// For integer types we can add 1 to med
ui64 val = 0;
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/tx/schemeshard/schemeshard_info_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2295,15 +2295,16 @@ bool TTopicInfo::FillKeySchema(const NKikimrPQ::TPQTabletConfig& tabletConfig, T
KeySchema.reserve(tabletConfig.PartitionKeySchemaSize());

for (const auto& component : tabletConfig.GetPartitionKeySchema()) {
// TODO: support pg types
auto typeId = component.GetTypeId();
if (!NScheme::NTypeIds::IsYqlType(typeId)) {
if (!NScheme::NTypeIds::IsYqlType(typeId) && typeId != NScheme::NTypeIds::Pg) {
error = TStringBuilder() << "TypeId is not supported"
<< ": typeId# " << typeId
<< ", component# " << component.GetName();
return false;
}
KeySchema.push_back(NScheme::TTypeInfo(typeId));
auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(typeId,
component.HasTypeInfo() ? &component.GetTypeInfo() : nullptr);
KeySchema.push_back(typeInfoMod.TypeInfo);
}

return true;
Expand Down
18 changes: 18 additions & 0 deletions ydb/core/tx/schemeshard/ut_base/ut_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6590,6 +6590,24 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
TestDescribeResult(DescribePath(runtime, "/MyRoot/PQGroup3", true), {
NLs::CheckPartCount("PQGroup3", 2, 1, 2, 2),
});

// pg type
TestCreatePQGroup(runtime, ++txId, "/MyRoot", R"(
Name: "PQGroup4"
TotalGroupCount: 2
PartitionPerTablet: 1
PQTabletConfig {
PartitionConfig { LifetimeSeconds: 10 }
PartitionKeySchema { Name: "key1" TypeId: 0x3000 TypeInfo { PgTypeId: 23 } }
}
PartitionBoundaries {
Tuple { Optional { Text: "1000" } }
}
)");
env.TestWaitNotification(runtime, txId);
TestDescribeResult(DescribePath(runtime, "/MyRoot/PQGroup4", true), {
NLs::CheckPartCount("PQGroup4", 2, 1, 2, 2),
});
}

Y_UNIT_TEST(AlterPersQueueGroupWithKeySchema) {
Expand Down
74 changes: 74 additions & 0 deletions ydb/core/tx/schemeshard/ut_split_merge/ut_split_merge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,80 @@ Y_UNIT_TEST_SUITE(TSchemeShardSplitBySizeTest) {
}
}

Y_UNIT_TEST(SplitShardsWhithPgKey) {
TTestBasicRuntime runtime;

TTestEnvOptions opts;
opts.EnableBackgroundCompaction(false);
opts.EnableTablePgTypes(true);

TTestEnv env(runtime, opts);

ui64 txId = 100;

NDataShard::gDbStatsReportInterval = TDuration::Seconds(1);
NDataShard::gDbStatsDataSizeResolution = 10;
NDataShard::gDbStatsRowCountResolution = 10;

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_CRIT);

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "pgint8"}
Columns { Name: "value" Type: "Utf8"}
KeyColumnNames: ["key"]
UniformPartitionsCount: 1
)");
env.TestWaitNotification(runtime, txId);

TString valueString = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA";
for (ui64 key = 0; key < 1000; ++key) {
auto pgKey = NPg::PgNativeBinaryFromNativeText(ToString(key * 1'000'000), NPg::TypeDescFromPgTypeName("pgint8")).Str;
UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell(pgKey)}, {TCell(valueString)});
}

TestDescribeResult(DescribePath(runtime, "/MyRoot/Table", true),
{NLs::PartitionCount(1)});

TestAlterTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
PartitionConfig {
PartitioningPolicy {
MinPartitionsCount: 100
MaxPartitionsCount: 100
SizeToSplit: 1
}
}
)");
env.TestWaitNotification(runtime, txId);

while (true) {
TVector<THolder<IEventHandle>> suppressed;
auto prevObserver = SetSuppressObserver(runtime, suppressed, TEvDataShard::TEvGetTableStatsResult::EventType);

WaitForSuppressed(runtime, suppressed, 1, prevObserver);
for (auto &msg : suppressed) {
runtime.Send(msg.Release());
}
suppressed.clear();

bool itIsEnough = false;

NLs::TCheckFunc checkPartitionCount = [&] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
if (record.GetPathDescription().TablePartitionsSize() >= 10) {
itIsEnough = true;
}
};

TestDescribeResult(DescribePath(runtime, "/MyRoot/Table", true),
{checkPartitionCount});

if (itIsEnough) {
return;
}
}
}

Y_UNIT_TEST(Merge1KShards) {
TTestBasicRuntime runtime;

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/schemeshard/ut_split_merge/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ IF (NOT WITH_VALGRIND)
library/cpp/getopt
library/cpp/regex/pcre
library/cpp/svnversion
ydb/core/testlib/default
ydb/core/testlib/pg
ydb/core/tx
ydb/core/tx/schemeshard/ut_helpers
ydb/library/yql/public/udf/service/exception_policy
Expand Down
Loading