@@ -586,6 +586,36 @@ void TNodeBroker::PrepareEpochCache()
586586 TabletCounters->Simple ()[COUNTER_EPOCH_DELTAS_SIZE_BYTES].Set (EpochDeltasCache.size ());
587587}
588588
589+ void TNodeBroker::PrepareUpdateNodesLog ()
590+ {
591+ LOG_DEBUG_S (TActorContext::AsActorContext (), NKikimrServices::NODE_BROKER,
592+ " Preparing update nodes log for epoch #" << Committed.Epoch .ToString ()
593+ << " nodes=" << Committed.Nodes .size ()
594+ << " expired=" << Committed.ExpiredNodes .size ()
595+ << " removed=" << Committed.RemovedNodes .size ());
596+
597+ UpdateNodesLog.clear ();
598+ UpdateNodesLogVersions.clear ();
599+
600+ TVector<TVersionedNodeID> nodeIdsSortedByVersion;
601+ for (auto &entry : Committed.Nodes ) {
602+ nodeIdsSortedByVersion.emplace_back (entry.second .NodeId , entry.second .Version );
603+ }
604+ for (auto &entry : Committed.ExpiredNodes ) {
605+ nodeIdsSortedByVersion.emplace_back (entry.second .NodeId , entry.second .Version );
606+ }
607+ for (auto &entry : Committed.RemovedNodes ) {
608+ nodeIdsSortedByVersion.emplace_back (entry.second .NodeId , entry.second .Version );
609+ }
610+ std::sort (nodeIdsSortedByVersion.begin (), nodeIdsSortedByVersion.end (), TVersionedNodeID::TCmpByVersion ());
611+
612+ for (const auto &id : nodeIdsSortedByVersion) {
613+ const auto & node = *Committed.FindNode (id.NodeId );
614+ AddNodeToUpdateNodesLog (node);
615+ }
616+ TabletCounters->Simple ()[COUNTER_UPDATE_NODES_LOG_SIZE_BYTES].Set (UpdateNodesLog.size ());
617+ }
618+
589619void TNodeBroker::AddNodeToEpochCache (const TNodeInfo &node)
590620{
591621 LOG_DEBUG_S (TActorContext::AsActorContext (), NKikimrServices::NODE_BROKER,
@@ -615,13 +645,83 @@ void TNodeBroker::AddDeltaToEpochDeltasCache(const TString &delta, ui64 version)
615645 TabletCounters->Simple ()[COUNTER_EPOCH_DELTAS_SIZE_BYTES].Set (EpochDeltasCache.size ());
616646}
617647
648+ void TNodeBroker::AddNodeToUpdateNodesLog (const TNodeInfo &node)
649+ {
650+ LOG_DEBUG_S (TActorContext::AsActorContext (), NKikimrServices::NODE_BROKER,
651+ " Add node " << node.IdShortString () << " to update nodes log" );
652+
653+ NKikimrNodeBroker::TUpdateNodes updateNodes;
654+
655+ switch (node.State ) {
656+ case ENodeState::Active:
657+ FillNodeInfo (node, *updateNodes.AddUpdates ()->MutableNode ());
658+ break ;
659+ case ENodeState::Expired:
660+ updateNodes.AddUpdates ()->SetExpiredNode (node.NodeId );
661+ break ;
662+ case ENodeState::Removed:
663+ updateNodes.AddUpdates ()->SetRemovedNode (node.NodeId );
664+ break ;
665+ }
666+
667+ TString delta;
668+ Y_PROTOBUF_SUPPRESS_NODISCARD updateNodes.SerializeToString (&delta);
669+
670+ Y_ENSURE (UpdateNodesLogVersions.empty () || UpdateNodesLogVersions.back ().Version <= node.Version );
671+ if (!UpdateNodesLogVersions.empty () && UpdateNodesLogVersions.back ().Version == node.Version ) {
672+ UpdateNodesLog += delta;
673+ UpdateNodesLogVersions.back ().CacheEndOffset = UpdateNodesLog.size ();
674+ } else {
675+ UpdateNodesLog += delta;
676+ UpdateNodesLogVersions.emplace_back (node.Version , UpdateNodesLog.size ());
677+ }
678+ TabletCounters->Simple ()[COUNTER_UPDATE_NODES_LOG_SIZE_BYTES].Set (UpdateNodesLog.size ());
679+ }
680+
618681void TNodeBroker::SubscribeForConfigUpdates (const TActorContext &ctx)
619682{
620683 ui32 nodeBrokerItem = (ui32)NKikimrConsole::TConfigItem::NodeBrokerConfigItem;
621684 ui32 featureFlagsItem = (ui32)NKikimrConsole::TConfigItem::FeatureFlagsItem;
622685 NConsole::SubscribeViaConfigDispatcher (ctx, {nodeBrokerItem, featureFlagsItem}, ctx.SelfID );
623686}
624687
688+ void TNodeBroker::SendUpdateNodes (const TActorContext &ctx)
689+ {
690+ if (SentVersion >= Committed.Epoch .Version ) {
691+ return ;
692+ }
693+
694+ for (const auto & [_, subscriber] : ServerPipeToSubscriber) {
695+ SendUpdateNodes (subscriber, SentVersion, ctx);
696+ }
697+ SentVersion = Committed.Epoch .Version ;
698+ }
699+
700+ void TNodeBroker::SendUpdateNodes (TActorId subscriber, ui64 version, const TActorContext &ctx)
701+ {
702+ if (version >= Committed.Epoch .Version ) {
703+ return ;
704+ }
705+
706+ NKikimrNodeBroker::TUpdateNodes record;
707+ record.SetFromVersion (version);
708+ Committed.Epoch .Serialize (*record.MutableEpoch ());
709+ auto response = MakeHolder<TEvNodeBroker::TEvUpdateNodes>(record);
710+
711+ auto it = std::lower_bound (UpdateNodesLogVersions.begin (), UpdateNodesLogVersions.end (), version + 1 );
712+ if (it != UpdateNodesLogVersions.begin ()) {
713+ response->PreSerializedData = UpdateNodesLog.substr (std::prev (it)->CacheEndOffset );
714+ } else {
715+ response->PreSerializedData = UpdateNodesLog;
716+ }
717+
718+ TabletCounters->Percentile ()[COUNTER_UPDATE_NODES_BYTES].IncrementFor (response->GetCachedByteSize ());
719+ LOG_TRACE_S (TActorContext::AsActorContext (), NKikimrServices::NODE_BROKER,
720+ " Send TEvUpdateNodes v" << version << " -> v" << Committed.Epoch .Version
721+ << " to " << subscriber);
722+ ctx.Send (subscriber, response.Release ());
723+ }
724+
625725void TNodeBroker::TState::LoadConfigFromProto (const NKikimrNodeBroker::TConfig &config)
626726{
627727 Config = config;
@@ -1452,6 +1552,38 @@ void TNodeBroker::Handle(TEvNodeBroker::TEvSetConfigRequest::TPtr &ev,
14521552 Execute (CreateTxUpdateConfig (ev), ctx);
14531553}
14541554
1555+ void TNodeBroker::Handle (TEvNodeBroker::TEvSubscribeNodesRequest::TPtr &ev,
1556+ const TActorContext &ctx)
1557+ {
1558+ TabletCounters->Cumulative ()[COUNTER_SUBSCRIBE_NODES_REQUESTS].Increment (1 );
1559+
1560+ if (auto [_, inserted] = ServerPipeToSubscriber.emplace (ev->Recipient , ev->Sender ); inserted) {
1561+ LOG_DEBUG_S (ctx, NKikimrServices::NODE_BROKER,
1562+ " New subscriber " << ev->Sender << " , server pipe id: " << ev->Recipient
1563+ << " , cached version: " << ev->Get ()->Record .GetCachedVersion ());
1564+ SendUpdateNodes (ev->Sender , ev->Get ()->Record .GetCachedVersion (), ctx);
1565+ }
1566+ }
1567+
1568+ void TNodeBroker::Handle (TEvNodeBroker::TEvSyncNodesRequest::TPtr &ev,
1569+ const TActorContext &ctx)
1570+ {
1571+ TabletCounters->Cumulative ()[COUNTER_SYNC_NODES_REQUESTS].Increment (1 );
1572+ SendUpdateNodes (ev->Sender , ev->Get ()->Record .GetCachedVersion (), ctx);
1573+ auto response = MakeHolder<TEvNodeBroker::TEvSyncNodesResponse>();
1574+ ctx.Send (ev->Sender , response.Release (), 0 , ev->Cookie );
1575+ }
1576+
1577+ void TNodeBroker::Handle (TEvTabletPipe::TEvServerDisconnected::TPtr &ev,
1578+ const TActorContext &ctx)
1579+ {
1580+ if (auto it = ServerPipeToSubscriber.find (ev->Get ()->ServerId ); it != ServerPipeToSubscriber.end ()) {
1581+ LOG_DEBUG_S (ctx, NKikimrServices::NODE_BROKER,
1582+ " Unsubscribed " << it->second << " , server pipe id: " << it->first );
1583+ ServerPipeToSubscriber.erase (it);
1584+ }
1585+ }
1586+
14551587void TNodeBroker::Handle (TEvPrivate::TEvUpdateEpoch::TPtr &ev,
14561588 const TActorContext &ctx)
14571589{
0 commit comments