Skip to content

Commit 6c891d7

Browse files
authored
Add slot names generation to node broker (#2808)
1 parent 70c8cb4 commit 6c891d7

10 files changed

+505
-19
lines changed

ydb/core/mind/node_broker.cpp

Lines changed: 86 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ void TNodeBroker::OnActivateExecutor(const TActorContext &ctx)
6464
MinDynamicId = Max(MaxStaticId + 1, (ui64)Min(appData->DynamicNameserviceConfig->MinDynamicNodeId, TActorId::MaxNodeId));
6565
MaxDynamicId = Max(MinDynamicId, (ui64)Min(appData->DynamicNameserviceConfig->MaxDynamicNodeId, TActorId::MaxNodeId));
6666

67+
EnableSlotNameGeneration = appData->FeatureFlags.GetEnableSlotNameGeneration();
68+
6769
ClearState();
6870

6971
ProcessTx(CreateTxInitScheme(), ctx);
@@ -125,11 +127,38 @@ bool TNodeBroker::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev,
125127
<< " Location: " << node.Location.ToString() << Endl
126128
<< " Lease: " << node.Lease << Endl
127129
<< " Expire: " << node.ExpirationString() << Endl
128-
<< " AuthorizedByCertificate: " << (node.AuthorizedByCertificate ? "true" : "false") << Endl;
130+
<< " AuthorizedByCertificate: " << (node.AuthorizedByCertificate ? "true" : "false") << Endl
131+
<< " ServicedSubDomain: " << node.ServicedSubDomain << Endl
132+
<< " SlotIndex: " << node.SlotIndex << Endl;
129133
}
130134
str << Endl;
131135

132136
str << "Free Node IDs count: " << FreeIds.Count() << Endl;
137+
138+
str << Endl;
139+
str << "Slot Indexes Pools usage: " << Endl;
140+
size_t totalSize = 0;
141+
size_t totalCapacity = 0;
142+
for (const auto &[subdomainKey, slotIndexesPool] : SlotIndexesPools) {
143+
const size_t size = slotIndexesPool.Size();
144+
totalSize += size;
145+
const size_t capacity = slotIndexesPool.Capacity();
146+
totalCapacity += capacity;
147+
const double usagePercent = floor(size * 100.0 / capacity);
148+
str << " " << subdomainKey
149+
<< " = " << usagePercent << "% (" << size << " of " << capacity << ")"
150+
<< Endl;
151+
}
152+
str << Endl;
153+
154+
if (totalCapacity > 0) {
155+
const double totalUsagePercent = floor(totalSize * 100.0 / totalCapacity);
156+
str << " Total"
157+
<< " = " << totalUsagePercent << "% (" << totalSize << " of " << totalCapacity << ")"
158+
<< Endl;
159+
} else {
160+
str << " No Slot Indexes Pools" << Endl;
161+
}
133162
}
134163
}
135164

@@ -159,11 +188,15 @@ void TNodeBroker::ClearState()
159188
Hosts.clear();
160189

161190
RecomputeFreeIds();
191+
RecomputeSlotIndexesPools();
162192
}
163193

164194
void TNodeBroker::AddNode(const TNodeInfo &info)
165195
{
166196
FreeIds.Reset(info.NodeId);
197+
if (info.SlotIndex.has_value()) {
198+
SlotIndexesPools[info.ServicedSubDomain].Acquire(info.SlotIndex.value());
199+
}
167200

168201
if (info.Expire > Epoch.Start) {
169202
LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER,
@@ -215,6 +248,24 @@ void TNodeBroker::RecomputeFreeIds()
215248
}
216249
}
217250

251+
void TNodeBroker::RecomputeSlotIndexesPools()
252+
{
253+
for (auto &[_, slotIndexesPool] : SlotIndexesPools) {
254+
slotIndexesPool.ReleaseAll();
255+
}
256+
257+
for (const auto &[_, node] : Nodes) {
258+
if (node.SlotIndex.has_value()) {
259+
SlotIndexesPools[node.ServicedSubDomain].Acquire(node.SlotIndex.value());
260+
}
261+
}
262+
for (const auto &[_, node] : ExpiredNodes) {
263+
if (node.SlotIndex.has_value()) {
264+
SlotIndexesPools[node.ServicedSubDomain].Acquire(node.SlotIndex.value());
265+
}
266+
}
267+
}
268+
218269
bool TNodeBroker::IsBannedId(ui32 id) const
219270
{
220271
for (auto &pr : BannedIds)
@@ -288,6 +339,10 @@ void TNodeBroker::FillNodeInfo(const TNodeInfo &node,
288339
info.SetAddress(node.Address);
289340
info.SetExpire(node.Expire.GetValue());
290341
node.Location.Serialize(info.MutableLocation(), false);
342+
if (EnableSlotNameGeneration && node.SlotIndex.has_value()) {
343+
const TString slotName = TStringBuilder() << "slot-" << node.SlotIndex;
344+
info.SetSlotName(slotName);
345+
}
291346
}
292347

293348
void TNodeBroker::ComputeNextEpochDiff(TStateDiff &diff)
@@ -328,9 +383,13 @@ void TNodeBroker::ApplyStateDiff(const TStateDiff &diff)
328383
LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER,
329384
"Remove node " << it->second.IdString());
330385

331-
ExpiredNodes.erase(it);
332-
if (!IsBannedId(id) && id >= MinDynamicId && id <= MaxDynamicId)
386+
if (!IsBannedId(id) && id >= MinDynamicId && id <= MaxDynamicId) {
333387
FreeIds.Set(id);
388+
}
389+
if (it->second.SlotIndex.has_value()) {
390+
SlotIndexesPools[it->second.ServicedSubDomain].Release(it->second.SlotIndex.value());
391+
}
392+
ExpiredNodes.erase(it);
334393
}
335394

336395
LOG_DEBUG_S(TActorContext::AsActorContext(), NKikimrServices::NODE_BROKER,
@@ -437,7 +496,9 @@ void TNodeBroker::DbAddNode(const TNodeInfo &node,
437496
<< " dc=" << node.Location.GetDataCenterId()
438497
<< " location=" << node.Location.ToString()
439498
<< " lease=" << node.Lease
440-
<< " expire=" << node.ExpirationString());
499+
<< " expire=" << node.ExpirationString()
500+
<< " servicedsubdomain=" << node.ServicedSubDomain
501+
<< " slotindex= " << node.SlotIndex);
441502

442503
NIceDb::TNiceDb db(txc.DB);
443504
using T = Schema::Nodes;
@@ -448,7 +509,16 @@ void TNodeBroker::DbAddNode(const TNodeInfo &node,
448509
.Update<T::Address>(node.Address)
449510
.Update<T::Lease>(node.Lease)
450511
.Update<T::Expire>(node.Expire.GetValue())
451-
.Update<T::Location>(node.Location.GetSerializedLocation());
512+
.Update<T::Location>(node.Location.GetSerializedLocation())
513+
.Update<T::ServicedSubDomain>(node.ServicedSubDomain);
514+
515+
if (node.SlotIndex.has_value()) {
516+
db.Table<T>().Key(node.NodeId)
517+
.Update<T::SlotIndex>(node.SlotIndex.value());
518+
} else {
519+
db.Table<T>().Key(node.NodeId)
520+
.UpdateToNull<T::SlotIndex>();
521+
}
452522
}
453523

454524
void TNodeBroker::DbApplyStateDiff(const TStateDiff &diff,
@@ -590,7 +660,10 @@ bool TNodeBroker::DbLoadState(TTransactionContext &txc,
590660

591661
info.Lease = nodesRowset.GetValue<T::Lease>();
592662
info.Expire = expire;
593-
663+
info.ServicedSubDomain = TSubDomainKey(nodesRowset.GetValueOrDefault<T::ServicedSubDomain>());
664+
if (nodesRowset.HaveValue<T::SlotIndex>()) {
665+
info.SlotIndex = nodesRowset.GetValue<T::SlotIndex>();
666+
}
594667
AddNode(info);
595668

596669
LOG_DEBUG_S(ctx, NKikimrServices::NODE_BROKER,
@@ -776,6 +849,7 @@ void TNodeBroker::Handle(TEvNodeBroker::TEvRegistrationRequest::TPtr &ev,
776849
TEvNodeBroker::TEvRegistrationRequest::TPtr Ev;
777850
TNodeBroker *Self;
778851
NActors::TScopeId ScopeId;
852+
TSubDomainKey ServicedSubDomain;
779853

780854
public:
781855
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
@@ -821,8 +895,9 @@ void TNodeBroker::Handle(TEvNodeBroker::TEvRegistrationRequest::TPtr &ev,
821895
} else {
822896
ScopeId = {response.DomainInfo->DomainKey.OwnerId, response.DomainInfo->DomainKey.LocalPathId};
823897
}
898+
ServicedSubDomain = TSubDomainKey(response.DomainInfo->DomainKey.OwnerId, response.DomainInfo->DomainKey.LocalPathId);
824899
} else {
825-
LOG_WARN_S(ctx, NKikimrServices::NODE_BROKER, "Cannot resolve scope id"
900+
LOG_WARN_S(ctx, NKikimrServices::NODE_BROKER, "Cannot resolve tenant"
826901
<< ": request# " << Ev->Get()->Record.ShortDebugString()
827902
<< ", response# " << response.ToString(*AppData()->TypeRegistry));
828903
}
@@ -835,11 +910,12 @@ void TNodeBroker::Handle(TEvNodeBroker::TEvRegistrationRequest::TPtr &ev,
835910
}
836911

837912
void Finish(const TActorContext& ctx) {
838-
LOG_TRACE_S(ctx, NKikimrServices::NODE_BROKER, "Finished resolving scope id"
913+
LOG_TRACE_S(ctx, NKikimrServices::NODE_BROKER, "Finished resolving tenant"
839914
<< ": request# " << Ev->Get()->Record.ShortDebugString()
840-
<< ": scope id# " << ScopeIdToString(ScopeId));
915+
<< ": scope id# " << ScopeIdToString(ScopeId)
916+
<< ": serviced subdomain# " << ServicedSubDomain);
841917

842-
Self->ProcessTx(Self->CreateTxRegisterNode(Ev, ScopeId), ctx);
918+
Self->ProcessTx(Self->CreateTxRegisterNode(Ev, ScopeId, ServicedSubDomain), ctx);
843919
Die(ctx);
844920
}
845921

ydb/core/mind/node_broker__register_node.cpp

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@ using namespace NKikimrNodeBroker;
1010

1111
class TNodeBroker::TTxRegisterNode : public TTransactionBase<TNodeBroker> {
1212
public:
13-
TTxRegisterNode(TNodeBroker *self, TEvNodeBroker::TEvRegistrationRequest::TPtr &ev, const NActors::TScopeId& scopeId)
13+
TTxRegisterNode(TNodeBroker *self, TEvNodeBroker::TEvRegistrationRequest::TPtr &ev,
14+
const NActors::TScopeId& scopeId, const TSubDomainKey& servicedSubDomain)
1415
: TBase(self)
1516
, Event(ev)
1617
, ScopeId(scopeId)
18+
, ServicedSubDomain(servicedSubDomain)
1719
, NodeId(0)
1820
, ExtendLease(false)
1921
, FixNodeId(false)
@@ -60,6 +62,12 @@ class TNodeBroker::TTxRegisterNode : public TTransactionBase<TNodeBroker> {
6062
ctx);
6163
}
6264

65+
if (Self->EnableSlotNameGeneration && rec.HasPath() && ServicedSubDomain == InvalidSubDomainKey) {
66+
return Error(TStatus::ERROR,
67+
TStringBuilder() << "Cannot resolve subdomain key for path " << rec.GetPath(),
68+
ctx);
69+
}
70+
6371
// Already registered?
6472
auto it = Self->Hosts.find(std::make_tuple(host, addr, port));
6573
if (it != Self->Hosts.end()) {
@@ -91,7 +99,21 @@ class TNodeBroker::TTxRegisterNode : public TTransactionBase<TNodeBroker> {
9199
ExtendLease = true;
92100
}
93101
node.AuthorizedByCertificate = rec.GetAuthorizedByCertificate();
94-
102+
103+
if (Self->EnableSlotNameGeneration) {
104+
if (ServicedSubDomain != node.ServicedSubDomain) {
105+
if (node.SlotIndex.has_value()) {
106+
Self->SlotIndexesPools[node.ServicedSubDomain].Release(node.SlotIndex.value());
107+
}
108+
node.ServicedSubDomain = ServicedSubDomain;
109+
node.SlotIndex = Self->SlotIndexesPools[node.ServicedSubDomain].AcquireLowestFreeIndex();
110+
Self->DbAddNode(node, txc);
111+
} else if (!node.SlotIndex.has_value()) {
112+
node.SlotIndex = Self->SlotIndexesPools[node.ServicedSubDomain].AcquireLowestFreeIndex();
113+
Self->DbAddNode(node, txc);
114+
}
115+
}
116+
95117
Response->Record.MutableStatus()->SetCode(TStatus::OK);
96118
Self->FillNodeInfo(node, *Response->Record.MutableNode());
97119

@@ -109,6 +131,11 @@ class TNodeBroker::TTxRegisterNode : public TTransactionBase<TNodeBroker> {
109131
Node->Lease = 1;
110132
Node->Expire = expire;
111133

134+
if (Self->EnableSlotNameGeneration) {
135+
Node->ServicedSubDomain = ServicedSubDomain;
136+
Node->SlotIndex = Self->SlotIndexesPools[Node->ServicedSubDomain].AcquireLowestFreeIndex();
137+
}
138+
112139
Response->Record.MutableStatus()->SetCode(TStatus::OK);
113140

114141
Self->DbAddNode(*Node, txc);
@@ -151,16 +178,19 @@ class TNodeBroker::TTxRegisterNode : public TTransactionBase<TNodeBroker> {
151178
private:
152179
TEvNodeBroker::TEvRegistrationRequest::TPtr Event;
153180
const NActors::TScopeId ScopeId;
181+
const TSubDomainKey ServicedSubDomain;
154182
TAutoPtr<TEvNodeBroker::TEvRegistrationResponse> Response;
155183
THolder<TNodeInfo> Node;
156184
ui32 NodeId;
157185
bool ExtendLease;
158186
bool FixNodeId;
159187
};
160188

161-
ITransaction *TNodeBroker::CreateTxRegisterNode(TEvNodeBroker::TEvRegistrationRequest::TPtr &ev, const NActors::TScopeId& scopeId)
189+
ITransaction *TNodeBroker::CreateTxRegisterNode(TEvNodeBroker::TEvRegistrationRequest::TPtr &ev,
190+
const NActors::TScopeId& scopeId,
191+
const TSubDomainKey& servicedSubDomain)
162192
{
163-
return new TTxRegisterNode(this, ev, scopeId);
193+
return new TTxRegisterNode(this, ev, scopeId, servicedSubDomain);
164194
}
165195

166196
} // NNodeBroker

ydb/core/mind/node_broker__scheme.h

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include "defs.h"
44

5+
#include <ydb/core/base/subdomain.h>
56
#include <ydb/core/scheme/scheme_types_defs.h>
67
#include <ydb/core/tablet_flat/flat_cxx_database.h>
78

@@ -22,9 +23,22 @@ struct Schema : NIceDb::Schema {
2223
struct Lease : Column<10, NScheme::NTypeIds::Uint32> {};
2324
struct Expire : Column<11, NScheme::NTypeIds::Uint64> {};
2425
struct Location : Column<12, NScheme::NTypeIds::String> {};
26+
struct ServicedSubDomain : Column<13, NScheme::NTypeIds::String> { using Type = NKikimrSubDomains::TDomainKey; };
27+
struct SlotIndex : Column<14, NScheme::NTypeIds::Uint32> {};
2528

2629
using TKey = TableKey<ID>;
27-
using TColumns = TableColumns<ID, Host, Port, ResolveHost, Address, Lease, Expire, Location>;
30+
using TColumns = TableColumns<
31+
ID,
32+
Host,
33+
Port,
34+
ResolveHost,
35+
Address,
36+
Lease,
37+
Expire,
38+
Location,
39+
ServicedSubDomain,
40+
SlotIndex
41+
>;
2842
};
2943

3044
struct Config : Table<2> {

ydb/core/mind/node_broker_impl.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
#pragma once
22

33
#include "node_broker.h"
4+
#include "slot_indexes_pool.h"
45

56
#include <ydb/core/base/tablet_pipe.h>
7+
#include <ydb/core/base/subdomain.h>
68
#include <ydb/core/cms/console/console.h>
79
#include <ydb/core/cms/console/configs_dispatcher.h>
810
#include <ydb/core/cms/console/tx_processor.h>
@@ -114,6 +116,8 @@ class TNodeBroker : public TActor<TNodeBroker>
114116
ui32 Lease;
115117
TInstant Expire;
116118
bool AuthorizedByCertificate = false;
119+
std::optional<ui32> SlotIndex;
120+
TSubDomainKey ServicedSubDomain;
117121
};
118122

119123
// State changes to apply while moving to the next epoch.
@@ -134,7 +138,9 @@ class TNodeBroker : public TActor<TNodeBroker>
134138
ITransaction *CreateTxExtendLease(TEvNodeBroker::TEvExtendLeaseRequest::TPtr &ev);
135139
ITransaction *CreateTxInitScheme();
136140
ITransaction *CreateTxLoadState();
137-
ITransaction *CreateTxRegisterNode(TEvNodeBroker::TEvRegistrationRequest::TPtr &ev, const NActors::TScopeId& scopeId);
141+
ITransaction *CreateTxRegisterNode(TEvNodeBroker::TEvRegistrationRequest::TPtr &ev,
142+
const NActors::TScopeId& scopeId,
143+
const TSubDomainKey& servicedSubDomain);
138144
ITransaction *CreateTxUpdateConfig(TEvConsole::TEvConfigNotificationRequest::TPtr &ev);
139145
ITransaction *CreateTxUpdateConfig(TEvNodeBroker::TEvSetConfigRequest::TPtr &ev);
140146
ITransaction *CreateTxUpdateConfigSubscription(TEvConsole::TEvReplaceConfigSubscriptionsResponse::TPtr &ev);
@@ -208,6 +214,7 @@ class TNodeBroker : public TActor<TNodeBroker>
208214
void ExtendLease(TNodeInfo &node);
209215
void FixNodeId(TNodeInfo &node);
210216
void RecomputeFreeIds();
217+
void RecomputeSlotIndexesPools();
211218
bool IsBannedId(ui32 id) const;
212219

213220
void AddDelayedListNodesRequest(ui64 epoch,
@@ -292,6 +299,9 @@ class TNodeBroker : public TActor<TNodeBroker>
292299
THashMap<std::tuple<TString, TString, ui16>, ui32> Hosts;
293300
// Bitmap with free Node IDs (with no lower 5 bits).
294301
TDynBitMap FreeIds;
302+
// Maps tenant to its slot indexes pool.
303+
std::unordered_map<TSubDomainKey, TSlotIndexesPool, THash<TSubDomainKey>> SlotIndexesPools;
304+
bool EnableSlotNameGeneration = false;
295305
// Epoch info.
296306
TEpochInfo Epoch;
297307
// Current config.

0 commit comments

Comments
 (0)