Skip to content

Support PG types in schemeshard (#11023) #14301

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ namespace NKikimr {
namespace NSchemeShard {

static bool IsIntegerType(NScheme::TTypeInfo type) {
// TODO: support pg types
switch (type.GetTypeId()) {
case NScheme::NTypeIds::Bool:

Expand Down Expand Up @@ -90,6 +89,7 @@ TSerializedCellVec ChooseSplitKeyByHistogram(const NKikimrTableStats::THistogram
splitKey[i] = keyMed.GetCells()[i];
} else {
// med == lo and med != hi, so we want to find a value that is > med and <= hi
// TODO: support this optimization for integer pg types
if (IsIntegerType(columnType) && !keyMed.GetCells()[i].IsNull()) {
// For integer types we can add 1 to med
ui64 val = 0;
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/tx/schemeshard/schemeshard_info_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2263,15 +2263,16 @@ bool TTopicInfo::FillKeySchema(const NKikimrPQ::TPQTabletConfig& tabletConfig, T
KeySchema.reserve(tabletConfig.PartitionKeySchemaSize());

for (const auto& component : tabletConfig.GetPartitionKeySchema()) {
// TODO: support pg types
auto typeId = component.GetTypeId();
if (!NScheme::NTypeIds::IsYqlType(typeId)) {
if (!NScheme::NTypeIds::IsYqlType(typeId) && typeId != NScheme::NTypeIds::Pg) {
error = TStringBuilder() << "TypeId is not supported"
<< ": typeId# " << typeId
<< ", component# " << component.GetName();
return false;
}
KeySchema.push_back(NScheme::TTypeInfo(typeId));
auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(typeId,
component.HasTypeInfo() ? &component.GetTypeInfo() : nullptr);
KeySchema.push_back(typeInfoMod.TypeInfo);
}

return true;
Expand Down
18 changes: 18 additions & 0 deletions ydb/core/tx/schemeshard/ut_base/ut_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6602,6 +6602,24 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
TestDescribeResult(DescribePath(runtime, "/MyRoot/PQGroup3", true), {
NLs::CheckPartCount("PQGroup3", 2, 1, 2, 2),
});

// pg type
TestCreatePQGroup(runtime, ++txId, "/MyRoot", R"(
Name: "PQGroup4"
TotalGroupCount: 2
PartitionPerTablet: 1
PQTabletConfig {
PartitionConfig { LifetimeSeconds: 10 }
PartitionKeySchema { Name: "key1" TypeId: 0x3000 TypeInfo { PgTypeId: 23 } }
}
PartitionBoundaries {
Tuple { Optional { Text: "1000" } }
}
)");
env.TestWaitNotification(runtime, txId);
TestDescribeResult(DescribePath(runtime, "/MyRoot/PQGroup4", true), {
NLs::CheckPartCount("PQGroup4", 2, 1, 2, 2),
});
}

Y_UNIT_TEST(AlterPersQueueGroupWithKeySchema) {
Expand Down
74 changes: 74 additions & 0 deletions ydb/core/tx/schemeshard/ut_split_merge/ut_split_merge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,80 @@ Y_UNIT_TEST_SUITE(TSchemeShardSplitBySizeTest) {
}
}

Y_UNIT_TEST(SplitShardsWhithPgKey) {
TTestBasicRuntime runtime;

TTestEnvOptions opts;
opts.EnableBackgroundCompaction(false);
opts.EnableTablePgTypes(true);

TTestEnv env(runtime, opts);

ui64 txId = 100;

NDataShard::gDbStatsReportInterval = TDuration::Seconds(1);
NDataShard::gDbStatsDataSizeResolution = 10;
NDataShard::gDbStatsRowCountResolution = 10;

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_CRIT);

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "pgint8"}
Columns { Name: "value" Type: "Utf8"}
KeyColumnNames: ["key"]
UniformPartitionsCount: 1
)");
env.TestWaitNotification(runtime, txId);

TString valueString = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA";
for (ui64 key = 0; key < 1000; ++key) {
auto pgKey = NPg::PgNativeBinaryFromNativeText(ToString(key * 1'000'000), NPg::TypeDescFromPgTypeName("pgint8")).Str;
UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell(pgKey)}, {TCell(valueString)});
}

TestDescribeResult(DescribePath(runtime, "/MyRoot/Table", true),
{NLs::PartitionCount(1)});

TestAlterTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
PartitionConfig {
PartitioningPolicy {
MinPartitionsCount: 100
MaxPartitionsCount: 100
SizeToSplit: 1
}
}
)");
env.TestWaitNotification(runtime, txId);

while (true) {
TVector<THolder<IEventHandle>> suppressed;
auto prevObserver = SetSuppressObserver(runtime, suppressed, TEvDataShard::TEvGetTableStatsResult::EventType);

WaitForSuppressed(runtime, suppressed, 1, prevObserver);
for (auto &msg : suppressed) {
runtime.Send(msg.Release());
}
suppressed.clear();

bool itIsEnough = false;

NLs::TCheckFunc checkPartitionCount = [&] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
if (record.GetPathDescription().TablePartitionsSize() >= 10) {
itIsEnough = true;
}
};

TestDescribeResult(DescribePath(runtime, "/MyRoot/Table", true),
{checkPartitionCount});

if (itIsEnough) {
return;
}
}
}

Y_UNIT_TEST(Merge1KShards) {
TTestBasicRuntime runtime;

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/schemeshard/ut_split_merge/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ IF (NOT WITH_VALGRIND)
library/cpp/getopt
library/cpp/regex/pcre
library/cpp/svnversion
ydb/core/testlib/default
ydb/core/testlib/pg
ydb/core/tx
ydb/core/tx/schemeshard/ut_helpers
ydb/library/yql/public/udf/service/exception_policy
Expand Down
Loading