Skip to content

Commit 672eb08

Browse files
authored
Merge acd8a4e into dffc12a
2 parents dffc12a + acd8a4e commit 672eb08

File tree

13 files changed

+138
-91
lines changed

13 files changed

+138
-91
lines changed

ydb/core/blobstorage/nodewarden/distconf.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ namespace NKikimr::NStorage {
1919
STLOG(PRI_DEBUG, BS_NODE, NWDC00, "Bootstrap");
2020

2121
auto ns = NNodeBroker::BuildNameserverTable(Cfg->NameserviceConfig);
22-
auto ev = std::make_unique<TEvInterconnect::TEvNodesInfo>();
22+
auto nodes = MakeIntrusive<TIntrusiveVector<TEvInterconnect::TNodeInfo>>();
2323
for (const auto& [nodeId, item] : ns->StaticNodeTable) {
24-
ev->Nodes.emplace_back(nodeId, item.Address, item.Host, item.ResolveHost, item.Port, item.Location);
24+
nodes->emplace_back(nodeId, item.Address, item.Host, item.ResolveHost, item.Port, item.Location);
2525
}
26+
auto ev = std::make_unique<TEvInterconnect::TEvNodesInfo>(nodes);
2627
Send(SelfId(), ev.release());
2728

2829
// and subscribe for the node list too

ydb/core/fq/libs/actors/nodes_manager.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -318,14 +318,14 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
318318
void HandleResponse(NFq::TEvInternalService::TEvHealthCheckResponse::TPtr& ev) {
319319
try {
320320
const auto& status = ev->Get()->Status.GetStatus();
321-
THolder<TEvInterconnect::TEvNodesInfo> nameServiceUpdateReq(new TEvInterconnect::TEvNodesInfo());
322321
if (!ev->Get()->Status.IsSuccess()) {
323322
ythrow yexception() << status << '\n' << ev->Get()->Status.GetIssues().ToString();
324323
}
325324
const auto& res = ev->Get()->Result;
326325

327-
auto& nodesInfo = nameServiceUpdateReq->Nodes;
328-
nodesInfo.reserve(res.nodes().size());
326+
auto nodesInfo = MakeIntrusive<TIntrusiveVector<TEvInterconnect::TNodeInfo>>();
327+
nodesInfo->reserve(res.nodes().size());
328+
THolder<TEvInterconnect::TEvNodesInfo> nameServiceUpdateReq(new TEvInterconnect::TEvNodesInfo(nodesInfo));
329329

330330
Peers.clear();
331331
std::set<ui32> nodeIds; // may be not unique
@@ -340,7 +340,7 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
340340
node.active_workers(), node.memory_limit(), node.memory_allocated(), node.data_center()});
341341

342342
if (node.interconnect_port()) {
343-
nodesInfo.emplace_back(TEvInterconnect::TNodeInfo{
343+
nodesInfo->emplace_back(TEvInterconnect::TNodeInfo{
344344
node.node_id(),
345345
node.node_address(),
346346
node.hostname(), // host
@@ -356,8 +356,8 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
356356
ServiceCounters.Counters->GetCounter("PeerCount", false)->Set(Peers.size());
357357
ServiceCounters.Counters->GetCounter("NodesHealthCheckOk", true)->Inc();
358358

359-
LOG_T("Send NodeInfo with size: " << nodesInfo.size() << " to DynamicNameserver");
360-
if (!nodesInfo.empty()) {
359+
LOG_T("Send NodeInfo with size: " << nodesInfo->size() << " to DynamicNameserver");
360+
if (!nodesInfo->empty()) {
361361
Send(GetNameserviceActorId(), nameServiceUpdateReq.Release());
362362
}
363363
} catch (yexception &e) {

ydb/core/health_check/health_check_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ Y_UNIT_TEST_SUITE(THealthCheckTest) {
338338

339339
void SetLongHostValue(TEvInterconnect::TEvNodesInfo::TPtr* ev) {
340340
TString host(1000000, 'a');
341-
auto& pbRecord = (*ev)->Get()->Nodes;
341+
auto& pbRecord = const_cast<TVector<TEvInterconnect::TNodeInfo>&>((*ev)->Get()->Nodes);
342342
for (auto itIssue = pbRecord.begin(); itIssue != pbRecord.end(); ++itIssue) {
343343
itIssue->Host = host;
344344
}

ydb/core/mind/bscontroller/impl.h

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1416,7 +1416,7 @@ class TBlobStorageController : public TActor<TBlobStorageController>, public TTa
14161416
TNodeId NodeId;
14171417
TNodeLocation Location;
14181418

1419-
THostRecord(TEvInterconnect::TNodeInfo&& nodeInfo)
1419+
THostRecord(TEvInterconnect::TNodeInfo nodeInfo)
14201420
: NodeId(nodeInfo.NodeId)
14211421
, Location(std::move(nodeInfo.Location))
14221422
{}
@@ -1432,11 +1432,13 @@ class TBlobStorageController : public TActor<TBlobStorageController>, public TTa
14321432
THashMap<TNodeId, THostId> NodeIdToHostId;
14331433

14341434
public:
1435+
THostRecordMapImpl() = default;
1436+
14351437
THostRecordMapImpl(TEvInterconnect::TEvNodesInfo *msg) {
1436-
for (TEvInterconnect::TNodeInfo& nodeInfo : msg->Nodes) {
1438+
for (const TEvInterconnect::TNodeInfo& nodeInfo : msg->Nodes) {
14371439
const THostId hostId(nodeInfo.Host, nodeInfo.Port);
14381440
NodeIdToHostId.emplace(nodeInfo.NodeId, hostId);
1439-
HostIdToRecord.emplace(hostId, std::move(nodeInfo));
1441+
HostIdToRecord.emplace(hostId, nodeInfo);
14401442
}
14411443
}
14421444

@@ -1824,8 +1826,7 @@ class TBlobStorageController : public TActor<TBlobStorageController>, public TTa
18241826

18251827
// For test purposes, required for self heal actor
18261828
void CreateEmptyHostRecordsMap() {
1827-
TEvInterconnect::TEvNodesInfo nodes;
1828-
HostRecords = std::make_shared<THostRecordMapImpl>(&nodes);
1829+
HostRecords = std::make_shared<THostRecordMapImpl>();
18291830
}
18301831

18311832
ui64 NextConfigTxSeqNo = 1;

ydb/core/mind/dynamic_nameserver.cpp

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,18 @@ void TDynamicNodeResolverBase::Handle(TEvNodeBroker::TEvResolvedNode::TPtr &ev,
6767

6868
if (rec.GetStatus().GetCode() != NKikimrNodeBroker::TStatus::OK) {
6969
// Reset proxy if node expired.
70-
if (exists)
70+
if (exists) {
7171
ResetInterconnectProxyConfig(NodeId, ctx);
72+
*NeedUpdateListNodesCache = true; // node was erased
73+
}
7274
ReplyWithErrorAndDie(ctx);
7375
return;
7476
}
7577

7678
TDynamicConfig::TDynamicNodeInfo node(rec.GetNode());
79+
if (!exists || !oldNode.EqualExceptExpire(node)) {
80+
*NeedUpdateListNodesCache = true;
81+
}
7782

7883
// If ID is re-used by another node then proxy has to be reset.
7984
if (exists && !oldNode.EqualExceptExpire(node))
@@ -145,6 +150,7 @@ void TDynamicNameserver::Handle(TEvNodeWardenStorageConfig::TPtr ev) {
145150
auto newStaticConfig = BuildNameserverTable(config);
146151
if (StaticConfig->StaticNodeTable != newStaticConfig->StaticNodeTable) {
147152
StaticConfig = std::move(newStaticConfig);
153+
*NeedUpdateListNodesCache = true;
148154
for (const auto& subscriber : StaticNodeChangeSubscribers) {
149155
TActivationContext::Send(new IEventHandle(SelfId(), subscriber, new TEvInterconnect::TEvListNodes));
150156
}
@@ -220,31 +226,38 @@ void TDynamicNameserver::ResolveDynamicNode(ui32 nodeId,
220226
reply->NodeId = nodeId;
221227
ctx.Send(ev->Sender, reply);
222228
} else {
223-
ctx.RegisterWithSameMailbox(new TDynamicNodeResolver(SelfId(), nodeId, DynamicConfigs[domain], ev, deadline));
229+
ctx.RegisterWithSameMailbox(new TDynamicNodeResolver(SelfId(), nodeId, DynamicConfigs[domain],
230+
NeedUpdateListNodesCache, ev, deadline));
224231
}
225232
}
226233

227234
void TDynamicNameserver::SendNodesList(const TActorContext &ctx)
228235
{
229-
auto now = ctx.Now();
230-
for (auto &sender : ListNodesQueue) {
231-
THolder<TEvInterconnect::TEvNodesInfo> reply(new TEvInterconnect::TEvNodesInfo);
236+
if (*NeedUpdateListNodesCache) {
237+
auto newListNodesCache = MakeIntrusive<TIntrusiveVector<TEvInterconnect::TNodeInfo>>();
238+
239+
auto now = ctx.Now();
232240
for (const auto &pr : StaticConfig->StaticNodeTable) {
233-
reply->Nodes.emplace_back(pr.first,
234-
pr.second.Address, pr.second.Host, pr.second.ResolveHost,
235-
pr.second.Port, pr.second.Location, true);
241+
newListNodesCache->emplace_back(pr.first,
242+
pr.second.Address, pr.second.Host, pr.second.ResolveHost,
243+
pr.second.Port, pr.second.Location, true);
236244
}
237245

238246
for (auto &config : DynamicConfigs) {
239247
for (auto &pr : config->DynamicNodes) {
240248
if (pr.second.Expire > now)
241-
reply->Nodes.emplace_back(pr.first, pr.second.Address,
242-
pr.second.Host, pr.second.ResolveHost,
243-
pr.second.Port, pr.second.Location, false);
249+
newListNodesCache->emplace_back(pr.first, pr.second.Address,
250+
pr.second.Host, pr.second.ResolveHost,
251+
pr.second.Port, pr.second.Location, false);
244252
}
245253
}
246254

247-
ctx.Send(sender, reply.Release());
255+
ListNodesCache = newListNodesCache;
256+
*NeedUpdateListNodesCache = false;
257+
}
258+
259+
for (auto &sender : ListNodesQueue) {
260+
ctx.Send(sender, new TEvInterconnect::TEvNodesInfo(ListNodesCache));
248261
}
249262
ListNodesQueue.clear();
250263
}
@@ -300,15 +313,18 @@ void TDynamicNameserver::UpdateState(const NKikimrNodeBroker::TNodesInfo &rec,
300313
config->ExpiredNodes.emplace(node.GetNodeId(), info);
301314
}
302315

316+
*NeedUpdateListNodesCache = true;
303317
config->Epoch = rec.GetEpoch();
304318
ctx.Schedule(config->Epoch.End - ctx.Now(),
305319
new TEvPrivate::TEvUpdateEpoch(domain, config->Epoch.Id + 1));
306320
} else {
307321
// Note: this update may be optimized to only include new nodes
308322
for (auto &node : rec.GetNodes()) {
309323
auto nodeId = node.GetNodeId();
310-
if (!config->DynamicNodes.contains(nodeId))
324+
if (!config->DynamicNodes.contains(nodeId)) {
311325
config->DynamicNodes.emplace(nodeId, node);
326+
*NeedUpdateListNodesCache = true;
327+
}
312328
}
313329
config->Epoch = rec.GetEpoch();
314330
}
@@ -395,8 +411,8 @@ void TDynamicNameserver::Handle(TEvInterconnect::TEvGetNode::TPtr &ev, const TAc
395411
ctx.Send(ev->Sender, reply.Release());
396412
} else {
397413
const TInstant deadline = ev->Get()->Deadline;
398-
ctx.RegisterWithSameMailbox(new TDynamicNodeSearcher(SelfId(), nodeId, DynamicConfigs[domain], ev.Release(),
399-
deadline));
414+
ctx.RegisterWithSameMailbox(new TDynamicNodeSearcher(SelfId(), nodeId, DynamicConfigs[domain],
415+
NeedUpdateListNodesCache, ev.Release(), deadline));
400416
}
401417
}
402418
}
@@ -455,6 +471,7 @@ void TDynamicNameserver::Handle(NConsole::TEvConsole::TEvConfigNotificationReque
455471
auto newStaticConfig = BuildNameserverTable(config.GetNameserviceConfig());
456472
if (StaticConfig->StaticNodeTable != newStaticConfig->StaticNodeTable) {
457473
StaticConfig = std::move(newStaticConfig);
474+
*NeedUpdateListNodesCache = true;
458475
for (const auto& subscriber : StaticNodeChangeSubscribers) {
459476
TActivationContext::Send(new IEventHandle(SelfId(), subscriber, new TEvInterconnect::TEvListNodes));
460477
}

ydb/core/mind/dynamic_nameserver_impl.h

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,12 @@ class TDynamicNodeResolverBase : public TActorBootstrapped<TDynamicNodeResolverB
7878
}
7979

8080
TDynamicNodeResolverBase(TActorId owner, ui32 nodeId, TDynamicConfigPtr config,
81+
std::shared_ptr<bool> needUpdateListNodesCache,
8182
TAutoPtr<IEventHandle> origRequest, TInstant deadline)
8283
: Owner(owner)
8384
, NodeId(nodeId)
8485
, Config(config)
86+
, NeedUpdateListNodesCache(needUpdateListNodesCache)
8587
, OrigRequest(origRequest)
8688
, Deadline(deadline)
8789
{
@@ -118,6 +120,7 @@ class TDynamicNodeResolverBase : public TActorBootstrapped<TDynamicNodeResolverB
118120
TActorId Owner;
119121
ui32 NodeId;
120122
TDynamicConfigPtr Config;
123+
std::shared_ptr<bool> NeedUpdateListNodesCache;
121124
TAutoPtr<IEventHandle> OrigRequest;
122125
const TInstant Deadline;
123126

@@ -128,8 +131,9 @@ class TDynamicNodeResolverBase : public TActorBootstrapped<TDynamicNodeResolverB
128131
class TDynamicNodeResolver : public TDynamicNodeResolverBase {
129132
public:
130133
TDynamicNodeResolver(TActorId owner, ui32 nodeId, TDynamicConfigPtr config,
134+
std::shared_ptr<bool> needUpdateListNodesCache,
131135
TAutoPtr<IEventHandle> origRequest, TInstant deadline)
132-
: TDynamicNodeResolverBase(owner, nodeId, config, origRequest, deadline)
136+
: TDynamicNodeResolverBase(owner, nodeId, config, needUpdateListNodesCache, origRequest, deadline)
133137
{
134138
}
135139

@@ -140,8 +144,9 @@ class TDynamicNodeResolver : public TDynamicNodeResolverBase {
140144
class TDynamicNodeSearcher : public TDynamicNodeResolverBase {
141145
public:
142146
TDynamicNodeSearcher(TActorId owner, ui32 nodeId, TDynamicConfigPtr config,
147+
std::shared_ptr<bool> needUpdateListNodesCache,
143148
TAutoPtr<IEventHandle> origRequest, TInstant deadline)
144-
: TDynamicNodeResolverBase(owner, nodeId, config, origRequest, deadline)
149+
: TDynamicNodeResolverBase(owner, nodeId, config, needUpdateListNodesCache, origRequest, deadline)
145150
{
146151
}
147152

@@ -180,6 +185,8 @@ class TDynamicNameserver : public TActorBootstrapped<TDynamicNameserver> {
180185

181186
TDynamicNameserver(const TIntrusivePtr<TTableNameserverSetup> &setup, ui32 resolvePoolId)
182187
: StaticConfig(setup)
188+
, ListNodesCache(nullptr)
189+
, NeedUpdateListNodesCache(std::make_shared<bool>(true))
183190
, ResolvePoolId(resolvePoolId)
184191
{
185192
Y_ABORT_UNLESS(StaticConfig->IsEntriesUnique());
@@ -258,6 +265,10 @@ class TDynamicNameserver : public TActorBootstrapped<TDynamicNameserver> {
258265
TIntrusivePtr<TTableNameserverSetup> StaticConfig;
259266
std::array<TDynamicConfigPtr, DOMAINS_COUNT> DynamicConfigs;
260267
TVector<TActorId> ListNodesQueue;
268+
269+
TIntrusiveVector<TEvInterconnect::TNodeInfo>::TPtr ListNodesCache;
270+
std::shared_ptr<bool> NeedUpdateListNodesCache;
271+
261272
std::array<TActorId, DOMAINS_COUNT> NodeBrokerPipes;
262273
// When ListNodes requests are sent to NodeBroker tablets this
263274
// bitmap indicates domains which didn't answer yet.

0 commit comments

Comments
 (0)