diff --git a/internal/datacoord/cluster.go b/internal/datacoord/cluster.go index 9db6268da6b04..9e137134bae63 100644 --- a/internal/datacoord/cluster.go +++ b/internal/datacoord/cluster.go @@ -23,7 +23,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/samber/lo" @@ -61,21 +60,13 @@ func (c *Cluster) Startup(ctx context.Context, nodes []*NodeInfo) error { // Register registers a new node in cluster func (c *Cluster) Register(node *NodeInfo) error { c.sessionManager.AddSession(node) - err := c.channelManager.AddNode(node.NodeID) - if err == nil { - metrics.DataCoordNumDataNodes.WithLabelValues().Inc() - } - return err + return c.channelManager.AddNode(node.NodeID) } // UnRegister removes a node from cluster func (c *Cluster) UnRegister(node *NodeInfo) error { c.sessionManager.DeleteSession(node) - err := c.channelManager.DeleteNode(node.NodeID) - if err == nil { - metrics.DataCoordNumDataNodes.WithLabelValues().Dec() - } - return err + return c.channelManager.DeleteNode(node.NodeID) } // Watch tries to add a channel in datanode cluster diff --git a/internal/datacoord/indexnode_manager.go b/internal/datacoord/indexnode_manager.go index 7c43068939d82..d3ce99050ede4 100644 --- a/internal/datacoord/indexnode_manager.go +++ b/internal/datacoord/indexnode_manager.go @@ -57,6 +57,7 @@ func (nm *IndexNodeManager) setClient(nodeID UniqueID, client types.IndexNode) { nm.lock.Lock() defer nm.lock.Unlock() nm.nodeClients[nodeID] = client + metrics.IndexNodeNum.WithLabelValues().Set(float64(len(nm.nodeClients))) log.Debug("IndexNode IndexNodeManager setClient success", zap.Int64("nodeID", nodeID), zap.Int("IndexNode num", len(nm.nodeClients))) } @@ -67,7 +68,7 @@ func (nm *IndexNodeManager) RemoveNode(nodeID UniqueID) { defer nm.lock.Unlock() delete(nm.nodeClients, nodeID) delete(nm.stoppingNodes, nodeID) - metrics.IndexNodeNum.WithLabelValues().Dec() + metrics.IndexNodeNum.WithLabelValues().Set(float64(len(nm.nodeClients))) } func (nm *IndexNodeManager) StoppingNode(nodeID UniqueID) { @@ -91,7 +92,6 @@ func (nm *IndexNodeManager) AddNode(nodeID UniqueID, address string) error { return err } - metrics.IndexNodeNum.WithLabelValues().Inc() nm.setClient(nodeID, nodeClient) return nil } diff --git a/internal/datacoord/session_manager.go b/internal/datacoord/session_manager.go index 78d023f38381f..69381aeefd683 100644 --- a/internal/datacoord/session_manager.go +++ b/internal/datacoord/session_manager.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" @@ -85,6 +86,7 @@ func (c *SessionManager) AddSession(node *NodeInfo) { session := NewSession(node, c.sessionCreator) c.sessions.data[node.NodeID] = session + metrics.DataCoordNumDataNodes.WithLabelValues().Set(float64(len(c.sessions.data))) } // DeleteSession removes the node session @@ -96,6 +98,7 @@ func (c *SessionManager) DeleteSession(node *NodeInfo) { session.Dispose() delete(c.sessions.data, node.NodeID) } + metrics.DataCoordNumDataNodes.WithLabelValues().Set(float64(len(c.sessions.data))) } // getLiveNodeIDs returns IDs of all live DataNodes.