Skip to content

Commit

Permalink
enhance: Solve channel unbalance on datanode (milvus-io#34984)
Browse files Browse the repository at this point in the history
issue: milvus-io#33583
the old policy permit datanode has at most 2 more channels than other
datanode. so if milvus has 2 datanode and 2 channels, both 2 channels
will be assign to 1 datanode, left another datanode empty.

This PR refine the balance policy to solve channel unbalance on datanode

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
Signed-off-by: Sumit Dubey <sumit.dubey2@ibm.com>
  • Loading branch information
weiliu1031 authored and sumitd2 committed Aug 6, 2024
1 parent 8419250 commit e97a0ba
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 6 deletions.
21 changes: 19 additions & 2 deletions internal/datacoord/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,32 @@ func AvgBalanceChannelPolicy(cluster Assignments) *ChannelOpSet {
totalChannelNum += len(nodeChs.Channels)
}
channelCountPerNode := totalChannelNum / avaNodeNum
maxChannelCountPerNode := channelCountPerNode
remainder := totalChannelNum % avaNodeNum
if remainder > 0 {
maxChannelCountPerNode += 1
}
for _, nChannels := range cluster {
chCount := len(nChannels.Channels)
if chCount <= channelCountPerNode+1 {
if chCount == 0 {
continue
}

toReleaseCount := chCount - channelCountPerNode
if remainder > 0 && chCount >= maxChannelCountPerNode {
remainder -= 1
toReleaseCount = chCount - maxChannelCountPerNode
}

if toReleaseCount == 0 {
log.Info("node channel count is not much larger than average, skip reallocate",
zap.Int64("nodeID", nChannels.NodeID),
zap.Int("channelCount", chCount),
zap.Int("channelCountPerNode", channelCountPerNode))
continue
}

reallocate := NewNodeChannelInfo(nChannels.NodeID)
toReleaseCount := chCount - channelCountPerNode - 1
for _, ch := range nChannels.Channels {
reallocate.AddChannel(ch)
toReleaseCount--
Expand Down Expand Up @@ -144,6 +159,7 @@ func AvgAssignByCountPolicy(currentCluster Assignments, toAssign *NodeChannelInf
fromCluster = append(fromCluster, info)
channelNum += len(info.Channels)
nodeToAvg.Insert(info.NodeID)
return
}

// Get toCluster by filtering out execlusive nodes
Expand All @@ -152,6 +168,7 @@ func AvgAssignByCountPolicy(currentCluster Assignments, toAssign *NodeChannelInf
}

toCluster = append(toCluster, info)
channelNum += len(info.Channels)
nodeToAvg.Insert(info.NodeID)
})

Expand Down
35 changes: 31 additions & 4 deletions internal/datacoord/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,16 @@ func (s *PolicySuite) TestAvgBalanceChannelPolicy() {
s.Nil(opSet)
})
s.Run("test uneven with conservative effect", func() {
// as we deem that the node having only one channel more than average as even, so there's no reallocation
// for this test case
// even distribution should have not results
uneven := []*NodeChannelInfo{
{100, getChannels(map[string]int64{"ch1": 1, "ch2": 1})},
{NodeID: 101},
}

opSet := AvgBalanceChannelPolicy(uneven)
s.Nil(opSet)
s.Equal(opSet.Len(), 1)
for _, op := range opSet.Collect() {
s.True(lo.Contains([]string{"ch1", "ch2"}, op.GetChannelNames()[0]))
}
})
s.Run("test uneven with zero", func() {
uneven := []*NodeChannelInfo{
Expand Down Expand Up @@ -286,4 +286,31 @@ func (s *AssignByCountPolicySuite) TestWithUnassignedChannels() {
})
s.ElementsMatch([]int64{3, 1}, nodeIDs)
})

s.Run("assign to reach average", func() {
curCluster := []*NodeChannelInfo{
{1, getChannels(map[string]int64{"ch-1": 1, "ch-2": 1, "ch-3": 1})},
{2, getChannels(map[string]int64{"ch-4": 1, "ch-5": 1, "ch-6": 4, "ch-7": 4, "ch-8": 4})},
}
unassigned := NewNodeChannelInfo(bufferID,
getChannel("new-ch-1", 1),
getChannel("new-ch-2", 1),
getChannel("new-ch-3", 1),
)

opSet := AvgAssignByCountPolicy(curCluster, unassigned, nil)
s.NotNil(opSet)

s.Equal(3, opSet.GetChannelNumber())
s.Equal(2, opSet.Len())
for _, op := range opSet.Collect() {
if op.Type == Delete {
s.Equal(int64(bufferID), op.NodeID)
}

if op.Type == Watch {
s.Equal(int64(1), op.NodeID)
}
}
})
}

0 comments on commit e97a0ba

Please sign in to comment.