Skip to content

Commit ba5f107

Browse files
committed
schemeboard: pass describe-result as an opaque payload
Change type of `{TEvUpdate,TEvNotify}.DescribeSchemeResult` from transparent `TEvDescribeSchemeResult` to opaque `bytes` and support that throughout Populator, Replica, Subscriber actors. Properly typed TEvDescribeSchemeResult induce additional overhead to automatically serialize and deserialize this message when transfering over the wire. This performance cost is usually either negligible or imperceptible. But in specific situations, particularly when rapidly updating partitioning information for tables with huge number of shards, this overhead could lead to significant issues. Schemeboard replicas could get overloaded and become unresponsive to further requests. This is problematic, especially considering the schemeboard subsystem's critical role in servicing all databases within a cluster, making it a SPOF. The core realization is that the schemeboard components do not require the full content of a TEvDescribeSchemeResult message to operate efficiently. Instead, only a limited set of fields (path, path-id, version and info about subdomain/database) is required for processing. And a whole TEvDescribeSchemeResult could be passed through as an opaque payload. Type change from TEvDescribeSchemeResult to bytes without changing field number is a safe move. Actual value of the field remains unchanged at the wire protocol level. Thus, older implementations will interpret the payload as a TEvDescribeSchemeResult message and proceed with deserialization as usual. And newer implementations will recognize the data as a binary blob and will deserialize it explicitly only when necessary. KIKIMR-14948
1 parent 5ba8840 commit ba5f107

20 files changed

+579
-307
lines changed

ydb/core/protos/flat_tx_scheme.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ message TEvDescribeSchemeResult {
112112
optional string LastExistedPrefixPath = 7;
113113
optional fixed64 LastExistedPrefixPathId = 8;
114114
optional NKikimrSchemeOp.TPathDescription LastExistedPrefixDescription = 9;
115+
115116
optional fixed64 PathOwnerId = 10;
116117
}
117118

ydb/core/protos/scheme_board.proto

Lines changed: 63 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import "ydb/core/protos/flat_tx_scheme.proto";
1+
import "ydb/core/scheme/protos/pathid.proto";
22

33
package NKikimrSchemeBoard;
44
option java_package = "ru.yandex.kikimr.proto";
@@ -13,22 +13,68 @@ message TEvHandshake {
1313
optional uint64 Generation = 2;
1414
}
1515

16-
// here and below
17-
// Owner is the tablet id of schemeshard witch holds the records
18-
// LocalPathId is a second part of TPathId
19-
// PathOwnerId is a first part of TPathId
16+
// Here and below.
17+
// Owner is the tablet id of schemeshard which holds the records.
18+
// (PathOwnerId, LocalPathId) constitute TPathId of the object.
2019

20+
// TEvUpdate.DescribeSchemeResultSerialized is a NKikimrScheme.TEvDescribeSchemeResult
21+
// in the form of opaque payload.
22+
// Originally, that field existed as a properly typed TEvDescribeSchemeResult message.
23+
// However, that induce additional overhead to serialize and deserialize this message
24+
// when transfering over wire.
25+
// This performance cost is usually either negligible or imperceptible.
26+
// But in specific situations, particularly when rapidly updating partitioning information
27+
// for tables with huge number of shards, this overhead could lead to significant issues.
28+
// Schemeboard replicas could get overloaded and become unresponsive to further requests.
29+
// This is problematic, especially considering the schemeboard subsystem's critical role
30+
// in servicing all databases within a cluster, making it a Single Point of Failure (SPOF).
31+
//
32+
// The core realization is that the schemeboard components do not require the full content of
33+
// a TEvDescribeSchemeResult message to operate efficiently. Instead, only a limited set of
34+
// fields (path, path-id, version and info about subdomain/database) is required for processing.
35+
// And a whole TEvDescribeSchemeResult could be passed through as an opaque payload.
36+
//
37+
// Type change from TEvDescribeSchemeResult to bytes without changing field number
38+
// is a safe move. Actual value of the field remains unchanged at the wire protocol level.
39+
// Thus, older implementations will interpret the payload as a TEvDescribeSchemeResult message
40+
// and proceed with deserialization as usual. And newer implementations will recognize the data
41+
// as a binary blob and will deserialize it explicitly only when necessary.
42+
//
43+
// - Path
44+
// - PathOwnerId, LocalPathId
45+
// - PathDirEntryPathVersion
46+
// - PathSubdomainPathId
47+
// - PathAbandonedTenantsSchemeShards
48+
// are taken from the original TEvDescribeSchemeResult (one way or another).
49+
//
2150
message TEvUpdate {
2251
optional uint64 Owner = 1;
2352
optional uint64 Generation = 2;
2453
optional TLocalPathIdRange DeletedLocalPathIds = 3;
25-
optional string Path = 4;
26-
optional uint64 LocalPathId = 5;
54+
55+
optional string Path = 4; // extracted from DescribeSchemeResult.Path
56+
optional uint64 LocalPathId = 5; // extracted from DescribeSchemeResult.PathId
57+
2758
optional bool IsDeletion = 6 [default = false];
28-
optional NKikimrScheme.TEvDescribeSchemeResult DescribeSchemeResult = 7;
59+
60+
optional bytes DescribeSchemeResultSerialized = 7;
61+
2962
optional bool NeedAck = 8 [default = false];
30-
optional uint64 PathOwnerId = 9;
63+
64+
optional uint64 PathOwnerId = 9; // extracted from DescribeSchemeResult.PathOwnerId, DescribeSchemeResult.PathDescription.Self.SchemeshardId in order of presence
65+
3166
optional TLocalPathIdRange MigratedLocalPathIds = 10;
67+
68+
// Explicit values extracted from DescribeSchemeResultSerialized
69+
70+
// DescribeSchemeResult.PathDescription.Self.PathVersion
71+
optional uint64 PathDirEntryPathVersion = 11;
72+
73+
// DescribeSchemeResult.PathDescription.DomainDescription.DomainKey
74+
optional NKikimrProto.TPathID PathSubdomainPathId = 13;
75+
76+
// DescribeSchemeResult.PathDescription.AbandonedTenantsSchemeShards
77+
repeated uint64 PathAbandonedTenantsSchemeShards = 14;
3278
}
3379

3480
message TEvUpdateAck {
@@ -65,16 +111,22 @@ message TEvUnsubscribe {
65111
optional uint64 LocalPathId = 3;
66112
}
67113

114+
// See comments for TEvUpdate.
68115
message TEvNotify {
69116
optional string Path = 1;
70117
// and/or
71118
optional uint64 PathOwnerId = 2;
72119
optional uint64 LocalPathId = 3;
73120
// common fields
74121
optional bool IsDeletion = 4 [default = false];
75-
optional NKikimrScheme.TEvDescribeSchemeResult DescribeSchemeResult = 5;
76-
optional uint64 Version = 6;
122+
123+
optional bytes DescribeSchemeResultSerialized = 5;
124+
125+
optional uint64 Version = 6; // same as TEvUpdate.PathDirEntryPathVersion
77126
optional bool Strong = 7 [default = false];
127+
128+
optional NKikimrProto.TPathID PathSubdomainPathId = 8;
129+
repeated uint64 PathAbandonedTenantsSchemeShards = 9;
78130
}
79131

80132
message TEvNotifyAck {

ydb/core/tx/datashard/datashard_ut_change_exchange.cpp

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -789,7 +789,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
789789
static THolder<TDataStreamsClient> MakeClient(const NYdb::TDriver& driver, const TString& database) {
790790
return MakeHolder<TDataStreamsClient>(driver, NYdb::TCommonClientSettings().Database(database));
791791
}
792-
};
792+
};
793793

794794
class TTestTopicEnv: public TTestEnv<TTestTopicEnv, NYdb::NTopic::TTopicClient> {
795795
public:
@@ -798,7 +798,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
798798
static THolder<NYdb::NTopic::TTopicClient> MakeClient(const NYdb::TDriver& driver, const TString& database) {
799799
return MakeHolder<NYdb::NTopic::TTopicClient>(driver, NYdb::NTopic::TTopicClientSettings().Database(database));
800800
}
801-
};
801+
};
802802

803803
TShardedTableOptions SimpleTable() {
804804
return TShardedTableOptions()
@@ -1344,7 +1344,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
13441344
(3, 30);
13451345
)", R"(
13461346
DELETE FROM `/Root/Table` WHERE key = 1;
1347-
)"}, {
1347+
)"}, {
13481348
R"({"update":{},"key":[1]})",
13491349
R"({"update":{},"key":[2]})",
13501350
R"({"update":{},"key":[3]})",
@@ -1360,7 +1360,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
13601360
(3, 30);
13611361
)", R"(
13621362
DELETE FROM `/Root/Table` WHERE key = 1;
1363-
)"}, {
1363+
)"}, {
13641364
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":1}})"}}},
13651365
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":2}})"}}},
13661366
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":3}})"}}},
@@ -1376,7 +1376,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
13761376
(3, 30);
13771377
)", R"(
13781378
DELETE FROM `/Root/Table` WHERE key = 1;
1379-
)"}, {
1379+
)"}, {
13801380
R"({"update":{"value":10},"key":[1]})",
13811381
R"({"update":{"value":20},"key":[2]})",
13821382
R"({"update":{"value":30},"key":[3]})",
@@ -1397,7 +1397,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
13971397
(3, 300);
13981398
)", R"(
13991399
DELETE FROM `/Root/Table` WHERE key = 1;
1400-
)"}, {
1400+
)"}, {
14011401
R"({"update":{},"newImage":{"value":10},"key":[1]})",
14021402
R"({"update":{},"newImage":{"value":20},"key":[2]})",
14031403
R"({"update":{},"newImage":{"value":30},"key":[3]})",
@@ -1421,7 +1421,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
14211421
(3, 300);
14221422
)", R"(
14231423
DELETE FROM `/Root/Table` WHERE key = 1;
1424-
)"}, {
1424+
)"}, {
14251425
{DebeziumBody("c", nullptr, R"({"key":1,"value":10})"), {{"__key", R"({"payload":{"key":1}})"}}},
14261426
{DebeziumBody("c", nullptr, R"({"key":2,"value":20})"), {{"__key", R"({"payload":{"key":2}})"}}},
14271427
{DebeziumBody("c", nullptr, R"({"key":3,"value":30})"), {{"__key", R"({"payload":{"key":3}})"}}},
@@ -1445,7 +1445,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
14451445
(3, 300);
14461446
)", R"(
14471447
DELETE FROM `/Root/Table` WHERE key = 1;
1448-
)"}, {
1448+
)"}, {
14491449
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":1}})"}}},
14501450
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":2}})"}}},
14511451
{DebeziumBody("u", nullptr, nullptr), {{"__key", R"({"payload":{"key":3}})"}}},
@@ -1456,7 +1456,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
14561456
});
14571457
}
14581458

1459-
Y_UNIT_TEST(NewImageLogDebezium) {
1459+
Y_UNIT_TEST(NewImageLogDebezium) {
14601460
TopicRunner::Read(SimpleTable(), NewImage(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson), {R"(
14611461
UPSERT INTO `/Root/Table` (key, value) VALUES
14621462
(1, 10),
@@ -1469,7 +1469,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
14691469
(3, 300);
14701470
)", R"(
14711471
DELETE FROM `/Root/Table` WHERE key = 1;
1472-
)"}, {
1472+
)"}, {
14731473
{DebeziumBody("u", nullptr, R"({"key":1,"value":10})"), {{"__key", R"({"payload":{"key":1}})"}}},
14741474
{DebeziumBody("u", nullptr, R"({"key":2,"value":20})"), {{"__key", R"({"payload":{"key":2}})"}}},
14751475
{DebeziumBody("u", nullptr, R"({"key":3,"value":30})"), {{"__key", R"({"payload":{"key":3}})"}}},
@@ -1486,7 +1486,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
14861486
(1, 10),
14871487
(2, 20),
14881488
(3, 30);
1489-
)"}, {
1489+
)"}, {
14901490
R"({"update":{},"key":[1],"ts":"***"})",
14911491
R"({"update":{},"key":[2],"ts":"***"})",
14921492
R"({"update":{},"key":[3],"ts":"***"})",
@@ -1512,7 +1512,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
15121512
UPSERT INTO `/Root/Table` (__Hash, id_shard, id_sort, __RowData) VALUES (
15131513
1, "10", "100", JsonDocument('{"M":{"color":{"S":"pink"},"weight":{"N":"4.5"}}}')
15141514
);
1515-
)"}, {
1515+
)"}, {
15161516
WriteJson(NJson::TJsonMap({
15171517
{"awsRegion", ""},
15181518
{"dynamodb", NJson::TJsonMap({
@@ -1541,7 +1541,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
15411541
);
15421542
)", R"(
15431543
DELETE FROM `/Root/Table` WHERE __Hash = 1;
1544-
)"}, {
1544+
)"}, {
15451545
WriteJson(NJson::TJsonMap({
15461546
{"awsRegion", ""},
15471547
{"dynamodb", NJson::TJsonMap({
@@ -1639,7 +1639,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
16391639
(1, 0.0%s/0.0%s),
16401640
(2, 1.0%s/0.0%s),
16411641
(3, -1.0%s/0.0%s);
1642-
)", s, s, s, s, s, s)}, {
1642+
)", s, s, s, s, s, s)}, {
16431643
R"({"update":{"value":"nan"},"key":[1]})",
16441644
R"({"update":{"value":"inf"},"key":[2]})",
16451645
R"({"update":{"value":"-inf"},"key":[3]})",
@@ -1674,7 +1674,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
16741674
TopicRunner::Read(table, KeysOnly(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson), {Sprintf(R"(
16751675
UPSERT INTO `/Root/Table` (key, value) VALUES
16761676
("%s", 1);
1677-
)", key.c_str())}, {
1677+
)", key.c_str())}, {
16781678
{DebeziumBody("u", nullptr, nullptr), {{"__key", Sprintf(R"({"payload":{"key":"%s"}})", key.c_str())}}},
16791679
});
16801680
}
@@ -2043,7 +2043,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
20432043
ExecSQL(env.GetServer(), env.GetEdgeActor(), R"(
20442044
UPSERT INTO `/Root/TableAux` (key, value)
20452045
VALUES (1, 10);
2046-
)");
2046+
)");
20472047

20482048
SetSplitMergePartCountLimit(&runtime, -1);
20492049
const auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table");
@@ -2292,7 +2292,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
22922292
auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table");
22932293
UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1);
22942294

2295-
WaitTxNotification(env.GetServer(), env.GetEdgeActor(),
2295+
WaitTxNotification(env.GetServer(), env.GetEdgeActor(),
22962296
AsyncSplitTable(env.GetServer(), env.GetEdgeActor(), "/Root/Table", tabletIds.at(0), 4));
22972297

22982298
// execute on old partitions
@@ -2376,7 +2376,8 @@ Y_UNIT_TEST_SUITE(Cdc) {
23762376

23772377
case TSchemeBoardEvents::EvUpdate:
23782378
if (auto* msg = ev->Get<TSchemeBoardEvents::TEvUpdate>()) {
2379-
const auto desc = msg->GetRecord().GetDescribeSchemeResult();
2379+
NKikimrScheme::TEvDescribeSchemeResult desc;
2380+
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(desc, msg->GetRecord().GetDescribeSchemeResultSerialized()));
23802381
if (desc.GetPath() == "/Root/Table/Stream" && desc.GetPathDescription().GetSelf().GetCreateFinished()) {
23812382
delayed.emplace_back(ev.Release());
23822383
return TTestActorRuntime::EEventAction::DROP;
@@ -2446,7 +2447,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
24462447
ExecSQL(env.GetServer(), env.GetEdgeActor(), R"(
24472448
UPSERT INTO `/Root/Table` (key, value)
24482449
VALUES (1, 10);
2449-
)");
2450+
)");
24502451

24512452
SetSplitMergePartCountLimit(&runtime, -1);
24522453
const auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table");
@@ -3266,7 +3267,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
32663267
auto tabletIds = GetTableShards(env.GetServer(), env.GetEdgeActor(), "/Root/Table");
32673268
UNIT_ASSERT_VALUES_EQUAL(tabletIds.size(), 1);
32683269

3269-
WaitTxNotification(env.GetServer(), env.GetEdgeActor(),
3270+
WaitTxNotification(env.GetServer(), env.GetEdgeActor(),
32703271
AsyncSplitTable(env.GetServer(), env.GetEdgeActor(), "/Root/Table", tabletIds.at(0), 4));
32713272

32723273
// merge
@@ -3298,7 +3299,7 @@ template <>
32983299
void Out<std::pair<TString, TString>>(IOutputStream& output, const std::pair<TString, TString>& x) {
32993300
output << x.first << ":" << x.second;
33003301
}
3301-
3302+
33023303
void AppendToString(TString& dst, const std::pair<TString, TString>& x) {
33033304
TStringOutput output(dst);
33043305
output << x;

ydb/core/tx/scheme_board/cache_ut.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ class TCacheTest: public TTestWithSchemeshard {
3737
" Kind: \"pool-kind-1\" "
3838
"} "
3939
" Name: \"Root\" ");
40+
41+
// Context->SetLogPriority(NKikimrServices::SCHEME_BOARD_REPLICA, NLog::PRI_DEBUG);
42+
// Context->SetLogPriority(NKikimrServices::SCHEME_BOARD_SUBSCRIBER, NLog::PRI_DEBUG);
43+
// Context->SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NLog::PRI_DEBUG);
44+
// Context->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NLog::PRI_DEBUG);
4045
}
4146

4247
UNIT_TEST_SUITE(TCacheTest);

0 commit comments

Comments
 (0)