@@ -17,30 +17,39 @@ namespace NKikimr::NStorage {
1717 if (RootState == ERootState::INITIAL && hasQuorum) { // becoming root node
1818 Y_ABORT_UNLESS (!Scepter);
1919 Scepter = std::make_shared<TScepter>();
20-
21- auto makeAllBoundNodes = [&] {
22- TStringStream s;
23- const char *sep = " {" ;
24- for (const auto & [nodeId, _] : AllBoundNodes) {
25- s << std::exchange (sep, " " ) << nodeId;
26- }
27- s << ' }' ;
28- return s.Str ();
29- };
30- STLOG (PRI_DEBUG, BS_NODE, NWDC19, " Starting config collection" , (Scepter, Scepter->Id ),
31- (AllBoundNodes, makeAllBoundNodes ()));
32- RootState = ERootState::IN_PROGRESS;
33- TEvScatter task;
34- task.SetTaskId (RandomNumber<ui64>());
35- task.MutableCollectConfigs ();
36- IssueScatterTask (TActorId (), std::move (task));
20+ BecomeRoot ();
3721 } else if (Scepter && !hasQuorum) { // unbecoming root node -- lost quorum
3822 SwitchToError (" quorum lost" );
3923 }
4024 }
4125
26+ void TDistributedConfigKeeper::BecomeRoot () {
27+ auto makeAllBoundNodes = [&] {
28+ TStringStream s;
29+ const char *sep = " {" ;
30+ for (const auto & [nodeId, _] : AllBoundNodes) {
31+ s << std::exchange (sep, " " ) << nodeId;
32+ }
33+ s << ' }' ;
34+ return s.Str ();
35+ };
36+ STLOG (PRI_DEBUG, BS_NODE, NWDC19, " Starting config collection" , (Scepter, Scepter->Id ),
37+ (AllBoundNodes, makeAllBoundNodes ()));
38+ RootState = ERootState::IN_PROGRESS;
39+ TEvScatter task;
40+ task.SetTaskId (RandomNumber<ui64>());
41+ task.MutableCollectConfigs ();
42+ IssueScatterTask (TActorId (), std::move (task));
43+ }
44+
45+ void TDistributedConfigKeeper::UnbecomeRoot () {
46+ }
47+
4248 void TDistributedConfigKeeper::SwitchToError (const TString& reason) {
4349 STLOG (PRI_ERROR, BS_NODE, NWDC38, " SwitchToError" , (RootState, RootState), (Reason, reason));
50+ if (Scepter) {
51+ UnbecomeRoot ();
52+ }
4453 Scepter.reset ();
4554 RootState = ERootState::ERROR_TIMEOUT;
4655 ErrorReason = reason;
@@ -498,66 +507,93 @@ namespace NKikimr::NStorage {
498507 void TDistributedConfigKeeper::Perform (TEvGather::TCollectConfigs *response,
499508 const TEvScatter::TCollectConfigs& /* request*/ , TScatterTask& task) {
500509 THashMap<TStorageConfigMeta, TEvGather::TCollectConfigs::TNode*> baseConfigs;
510+ THashSet<TNodeIdentifier> nodesAlreadyReplied{SelfNode};
501511
502- auto addBaseConfig = [&](const TEvGather::TCollectConfigs::TNode& item) {
503- const auto & config = item.GetBaseConfig ();
504- auto & ptr = baseConfigs[config];
505- if (!ptr) {
506- ptr = response->AddNodes ();
507- ptr->MutableBaseConfig ()->CopyFrom (config);
508- }
509- ptr->MutableNodeIds ()->MergeFrom (item.GetNodeIds ());
510- };
511-
512- auto addPerDiskConfig = [&](const TEvGather::TCollectConfigs::TPersistentConfig& item, auto addFunc, auto & set) {
513- const auto & config = item.GetConfig ();
514- auto & ptr = set[config];
515- if (!ptr) {
516- ptr = (response->*addFunc)();
517- ptr->MutableConfig ()->CopyFrom (config);
518- }
519- ptr->MutableDisks ()->MergeFrom (item.GetDisks ());
520- };
521-
522- TEvGather::TCollectConfigs::TNode s;
523- SelfNode.Serialize (s.AddNodeIds ());
524- auto *cfg = s.MutableBaseConfig ();
525- cfg->CopyFrom (BaseConfig);
526- addBaseConfig (s);
512+ auto *ptr = response->AddNodes ();
513+ ptr->MutableBaseConfig ()->CopyFrom (BaseConfig);
514+ SelfNode.Serialize (ptr->AddNodeIds ());
515+ baseConfigs.emplace (BaseConfig, ptr);
527516
528517 THashMap<TStorageConfigMeta, TEvGather::TCollectConfigs::TPersistentConfig*> committedConfigs;
529518 THashMap<TStorageConfigMeta, TEvGather::TCollectConfigs::TPersistentConfig*> proposedConfigs;
530-
531519 for (auto & item : *response->MutableCommittedConfigs ()) {
532520 committedConfigs[item.GetConfig ()] = &item;
533521 }
534522 for (auto & item : *response->MutableProposedConfigs ()) {
535523 proposedConfigs[item.GetConfig ()] = &item;
536524 }
537525
526+ auto addBaseConfig = [&](const TEvGather::TCollectConfigs::TNode& item, auto *nodesToIgnore) {
527+ const auto & config = item.GetBaseConfig ();
528+ auto & ptr = baseConfigs[config];
529+ for (const auto & nodeId : item.GetNodeIds ()) {
530+ TNodeIdentifier n (nodeId);
531+ if (const auto [_, inserted] = nodesAlreadyReplied.emplace (n); inserted) {
532+ if (!ptr) {
533+ ptr = response->AddNodes ();
534+ ptr->MutableBaseConfig ()->CopyFrom (config);
535+ }
536+ ptr->AddNodeIds ()->CopyFrom (nodeId);
537+ } else {
538+ nodesToIgnore->insert (std::move (n));
539+ }
540+ }
541+ };
542+
543+ auto addPerDiskConfig = [&](const TEvGather::TCollectConfigs::TPersistentConfig& item, auto addFunc, auto & set,
544+ const auto & nodesToIgnore) {
545+ const auto & config = item.GetConfig ();
546+ auto & ptr = set[config];
547+ for (const auto & disk : item.GetDisks ()) {
548+ if (!nodesToIgnore.contains (TNodeIdentifier (disk.GetNodeId ()))) {
549+ if (!ptr) {
550+ ptr = (response->*addFunc)();
551+ ptr->MutableConfig ()->CopyFrom (config);
552+ }
553+ ptr->AddDisks ()->CopyFrom (disk);
554+ }
555+ }
556+ };
557+
538558 for (const auto & reply : task.CollectedResponses ) {
539559 if (reply.HasCollectConfigs ()) {
540560 const auto & cc = reply.GetCollectConfigs ();
561+
562+ THashSet<TNodeIdentifier> nodesToIgnore;
541563 for (const auto & item : cc.GetNodes ()) {
542- addBaseConfig (item);
564+ addBaseConfig (item, &nodesToIgnore );
543565 }
544566 for (const auto & item : cc.GetCommittedConfigs ()) {
545- addPerDiskConfig (item, &TEvGather::TCollectConfigs::AddCommittedConfigs, committedConfigs);
567+ addPerDiskConfig (item, &TEvGather::TCollectConfigs::AddCommittedConfigs, committedConfigs, nodesToIgnore );
546568 }
547569 for (const auto & item : cc.GetProposedConfigs ()) {
548- addPerDiskConfig (item, &TEvGather::TCollectConfigs::AddProposedConfigs, proposedConfigs);
570+ addPerDiskConfig (item, &TEvGather::TCollectConfigs::AddProposedConfigs, proposedConfigs, nodesToIgnore);
571+ }
572+ for (const auto & item : cc.GetNoMetadata ()) {
573+ if (!nodesToIgnore.contains (TNodeIdentifier (item.GetNodeId ()))) {
574+ response->AddNoMetadata ()->CopyFrom (item);
575+ }
576+ }
577+ for (const auto & item : cc.GetErrors ()) {
578+ if (!nodesToIgnore.contains (TNodeIdentifier (item.GetNodeId ()))) {
579+ response->AddErrors ()->CopyFrom (item);
580+ }
549581 }
550- response->MutableNoMetadata ()->MergeFrom (cc.GetNoMetadata ());
551- response->MutableErrors ()->MergeFrom (cc.GetErrors ());
552582 }
553583 }
554584 }
555585
556586 void TDistributedConfigKeeper::Perform (TEvGather::TProposeStorageConfig *response,
557587 const TEvScatter::TProposeStorageConfig& /* request*/ , TScatterTask& task) {
588+ THashSet<TNodeIdentifier> nodesAlreadyReplied;
558589 for (const auto & reply : task.CollectedResponses ) {
559590 if (reply.HasProposeStorageConfig ()) {
560- response->MutableStatus ()->MergeFrom (reply.GetProposeStorageConfig ().GetStatus ());
591+ const auto & config = reply.GetProposeStorageConfig ();
592+ for (const auto & status : config.GetStatus ()) {
593+ if (const auto [_, inserted] = nodesAlreadyReplied.insert (status.GetNodeId ()); inserted) {
594+ response->AddStatus ()->CopyFrom (status);
595+ }
596+ }
561597 }
562598 }
563599 }
0 commit comments