@@ -495,6 +495,7 @@ void THive::Handle(TEvPrivate::TEvBootTablets::TPtr&) {
495495 for (auto * node : unimportantNodes) {
496496 node->Ping ();
497497 }
498+ ProcessNodePingQueue ();
498499 TVector<TTabletId> tabletsToReleaseFromParent;
499500 TSideEffects sideEffects;
500501 sideEffects.Reset (SelfId ());
@@ -685,11 +686,13 @@ void THive::Cleanup() {
685686
686687void THive::Handle (TEvLocal::TEvStatus::TPtr& ev) {
687688 BLOG_D (" Handle TEvLocal::TEvStatus for Node " << ev->Sender .NodeId () << " : " << ev->Get ()->Record .ShortDebugString ());
689+ RemoveFromPingInProgress (ev->Sender .NodeId ());
688690 Execute (CreateStatus (ev->Sender , ev->Get ()->Record ));
689691}
690692
691693void THive::Handle (TEvLocal::TEvSyncTablets::TPtr& ev) {
692694 BLOG_D (" THive::Handle::TEvSyncTablets" );
695+ RemoveFromPingInProgress (ev->Sender .NodeId ());
693696 Execute (CreateSyncTablets (ev->Sender , ev->Get ()->Record ));
694697}
695698
@@ -742,7 +745,10 @@ void THive::Handle(TEvInterconnect::TEvNodeConnected::TPtr &ev) {
742745void THive::Handle (TEvInterconnect::TEvNodeDisconnected::TPtr &ev) {
743746 TNodeId nodeId = ev->Get ()->NodeId ;
744747 BLOG_W (" Handle TEvInterconnect::TEvNodeDisconnected, NodeId " << nodeId);
745- ConnectedNodes.erase (nodeId);
748+ RemoveFromPingInProgress (nodeId);
749+ if (ConnectedNodes.erase (nodeId)) {
750+ UpdateCounterNodesConnected (-1 );
751+ }
746752 Execute (CreateDisconnectNode (THolder<TEvInterconnect::TEvNodeDisconnected>(ev->Release ().Release ())));
747753}
748754
@@ -912,6 +918,7 @@ void THive::Handle(TEvents::TEvUndelivered::TPtr &ev) {
912918 case TEvLocal::EvPing: {
913919 TNodeId nodeId = ev->Cookie ;
914920 TNodeInfo* node = FindNode (nodeId);
921+ NodePingsInProgress.erase (nodeId);
915922 if (node != nullptr && ev->Sender == node->Local ) {
916923 if (node->IsDisconnecting ()) {
917924 // ping continiousily until we fully disconnected from the node
@@ -920,6 +927,7 @@ void THive::Handle(TEvents::TEvUndelivered::TPtr &ev) {
920927 KillNode (node->Id , node->Local );
921928 }
922929 }
930+ ProcessNodePingQueue ();
923931 break ;
924932 }
925933 };
@@ -1684,6 +1692,13 @@ void THive::UpdateCounterNodesConnected(i64 nodesConnectedDiff) {
16841692 }
16851693}
16861694
1695+ void THive::UpdateCounterPingQueueSize () {
1696+ if (TabletCounters != nullptr ) {
1697+ auto & counter = TabletCounters->Simple ()[NHive::COUNTER_PINGQUEUE_SIZE];
1698+ counter.Set (NodePingQueue.size ());
1699+ }
1700+ }
1701+
16871702void THive::RecordTabletMove (const TTabletMoveInfo& moveInfo) {
16881703 TabletMoveHistory.PushBack (moveInfo);
16891704 TabletCounters->Cumulative ()[NHive::COUNTER_TABLETS_MOVED].Increment (1 );
@@ -2648,6 +2663,25 @@ void THive::ExecuteStartTablet(TFullTabletId tabletId, const TActorId& local, ui
26482663 Execute (CreateStartTablet (tabletId, local, cookie, external));
26492664}
26502665
2666+ void THive::QueuePing (const TActorId& local) {
2667+ NodePingQueue.push (local);
2668+ }
2669+
2670+ void THive::ProcessNodePingQueue () {
2671+ while (!NodePingQueue.empty () && NodePingsInProgress.size () < GetMaxPingsInFlight ()) {
2672+ TActorId local = NodePingQueue.front ();
2673+ TNodeId node = local.NodeId ();
2674+ NodePingQueue.pop ();
2675+ NodePingsInProgress.insert (node);
2676+ SendPing (local, node);
2677+ }
2678+ }
2679+
2680+ void THive::RemoveFromPingInProgress (TNodeId node) {
2681+ NodePingsInProgress.erase (node);
2682+ ProcessNodePingQueue ();
2683+ }
2684+
26512685void THive::SendPing (const TActorId& local, TNodeId id) {
26522686 Send (local,
26532687 new TEvLocal::TEvPing (HiveId,
0 commit comments