Skip to content

Commit

Permalink
Fix indexnode and datanode num metric (#25919)
Browse files Browse the repository at this point in the history
Signed-off-by: xige-16 <xi.ge@zilliz.com>
  • Loading branch information
xige-16 authored Jul 28, 2023
1 parent 7bd9ece commit d7cd1c8
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 13 deletions.
13 changes: 2 additions & 11 deletions internal/datacoord/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/samber/lo"
Expand Down Expand Up @@ -61,21 +60,13 @@ func (c *Cluster) Startup(ctx context.Context, nodes []*NodeInfo) error {
// Register registers a new node in cluster
func (c *Cluster) Register(node *NodeInfo) error {
c.sessionManager.AddSession(node)
err := c.channelManager.AddNode(node.NodeID)
if err == nil {
metrics.DataCoordNumDataNodes.WithLabelValues().Inc()
}
return err
return c.channelManager.AddNode(node.NodeID)
}

// UnRegister removes a node from cluster
func (c *Cluster) UnRegister(node *NodeInfo) error {
c.sessionManager.DeleteSession(node)
err := c.channelManager.DeleteNode(node.NodeID)
if err == nil {
metrics.DataCoordNumDataNodes.WithLabelValues().Dec()
}
return err
return c.channelManager.DeleteNode(node.NodeID)
}

// Watch tries to add a channel in datanode cluster
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/indexnode_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (nm *IndexNodeManager) setClient(nodeID UniqueID, client types.IndexNode) {
nm.lock.Lock()
defer nm.lock.Unlock()
nm.nodeClients[nodeID] = client
metrics.IndexNodeNum.WithLabelValues().Set(float64(len(nm.nodeClients)))
log.Debug("IndexNode IndexNodeManager setClient success", zap.Int64("nodeID", nodeID), zap.Int("IndexNode num", len(nm.nodeClients)))
}

Expand All @@ -67,7 +68,7 @@ func (nm *IndexNodeManager) RemoveNode(nodeID UniqueID) {
defer nm.lock.Unlock()
delete(nm.nodeClients, nodeID)
delete(nm.stoppingNodes, nodeID)
metrics.IndexNodeNum.WithLabelValues().Dec()
metrics.IndexNodeNum.WithLabelValues().Set(float64(len(nm.nodeClients)))
}

func (nm *IndexNodeManager) StoppingNode(nodeID UniqueID) {
Expand All @@ -91,7 +92,6 @@ func (nm *IndexNodeManager) AddNode(nodeID UniqueID, address string) error {
return err
}

metrics.IndexNodeNum.WithLabelValues().Inc()
nm.setClient(nodeID, nodeClient)
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions internal/datacoord/session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
Expand Down Expand Up @@ -85,6 +86,7 @@ func (c *SessionManager) AddSession(node *NodeInfo) {

session := NewSession(node, c.sessionCreator)
c.sessions.data[node.NodeID] = session
metrics.DataCoordNumDataNodes.WithLabelValues().Set(float64(len(c.sessions.data)))
}

// DeleteSession removes the node session
Expand All @@ -96,6 +98,7 @@ func (c *SessionManager) DeleteSession(node *NodeInfo) {
session.Dispose()
delete(c.sessions.data, node.NodeID)
}
metrics.DataCoordNumDataNodes.WithLabelValues().Set(float64(len(c.sessions.data)))
}

// getLiveNodeIDs returns IDs of all live DataNodes.
Expand Down

0 comments on commit d7cd1c8

Please sign in to comment.