diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index a962bcdc704db..7e66be594f5fd 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -502,6 +502,7 @@ message LeaderView { repeated int64 growing_segmentIDs = 4; map growing_segments = 5; int64 TargetVersion = 6; + int64 num_of_growing_rows = 7; } message SegmentDist { diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index 31b59fd500192..9d44cfa4bc0c5 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -71,11 +71,20 @@ func (b *RowCountBasedBalancer) convertToNodeItems(nodeIDs []int64) []*nodeItem ret := make([]*nodeItem, 0, len(nodeIDs)) for _, nodeInfo := range b.getNodes(nodeIDs) { node := nodeInfo.ID() + + // calculate sealed segment row count on node segments := b.dist.SegmentDistManager.GetByNode(node) rowcnt := 0 for _, s := range segments { rowcnt += int(s.GetNumOfRows()) } + + // calculate growing segment row count on node + views := b.dist.GetLeaderView(node) + for _, view := range views { + rowcnt += int(view.NumOfGrowingRows) + } + // more row count, less priority nodeItem := newNodeItem(rowcnt, node) ret = append(ret, &nodeItem) diff --git a/internal/querycoordv2/balance/rowcount_based_balancer_test.go b/internal/querycoordv2/balance/rowcount_based_balancer_test.go index 72eacc976f169..a5795fd486420 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer_test.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer_test.go @@ -19,6 +19,7 @@ package balance import ( "testing" + "github.com/samber/lo" mock "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -765,6 +766,48 @@ func (suite *RowCountBasedBalancerTestSuite) getCollectionBalancePlans(balancer return segmentPlans, channelPlans } +func (suite *RowCountBasedBalancerTestSuite) TestAssignSegmentWithGrowing() { + suite.SetupSuite() + defer suite.TearDownTest() + balancer := suite.balancer + + distributions := map[int64][]*meta.Segment{ + 1: { + {SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 20, CollectionID: 1}, Node: 1}, + }, + 2: { + {SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 20, CollectionID: 1}, Node: 2}, + }, + } + for node, s := range distributions { + balancer.dist.SegmentDistManager.Update(node, s...) + } + + for _, node := range lo.Keys(distributions) { + nodeInfo := session.NewNodeInfo(node, "127.0.0.1:0") + nodeInfo.UpdateStats(session.WithSegmentCnt(20)) + nodeInfo.SetState(session.NodeStateNormal) + suite.balancer.nodeManager.Add(nodeInfo) + } + + toAssign := []*meta.Segment{ + {SegmentInfo: &datapb.SegmentInfo{ID: 3, NumOfRows: 10, CollectionID: 1}, Node: 3}, + {SegmentInfo: &datapb.SegmentInfo{ID: 4, NumOfRows: 10, CollectionID: 1}, Node: 3}, + } + + // mock 50 growing row count in node 1, which is delegator, expect all segment assign to node 2 + leaderView := &meta.LeaderView{ + ID: 1, + CollectionID: 1, + NumOfGrowingRows: 50, + } + suite.balancer.dist.LeaderViewManager.Update(1, leaderView) + plans := balancer.AssignSegment(1, toAssign, lo.Keys(distributions)) + for _, p := range plans { + suite.Equal(int64(2), p.To) + } +} + func TestRowCountBasedBalancerSuite(t *testing.T) { suite.Run(t, new(RowCountBasedBalancerTestSuite)) } diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index 98f7f1405c71e..b0c55dd988c47 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -92,17 +92,31 @@ func (b *ScoreBasedBalancer) convertToNodeItems(collectionID int64, nodeIDs []in } func (b *ScoreBasedBalancer) calculatePriority(collectionID, nodeID int64) int { - globalSegments := b.dist.SegmentDistManager.GetByNode(nodeID) rowCount := 0 + // calculate global sealed segment row count + globalSegments := b.dist.SegmentDistManager.GetByNode(nodeID) for _, s := range globalSegments { rowCount += int(s.GetNumOfRows()) } - collectionSegments := b.dist.SegmentDistManager.GetByCollectionAndNode(collectionID, nodeID) + // calculate global growing segment row count + views := b.dist.GetLeaderView(nodeID) + for _, view := range views { + rowCount += int(view.NumOfGrowingRows) + } + collectionRowCount := 0 + // calculate collection sealed segment row count + collectionSegments := b.dist.SegmentDistManager.GetByCollectionAndNode(collectionID, nodeID) for _, s := range collectionSegments { collectionRowCount += int(s.GetNumOfRows()) } + + // calculate collection growing segment row count + collectionViews := b.dist.LeaderViewManager.GetByCollectionAndNode(collectionID, nodeID) + for _, view := range collectionViews { + collectionRowCount += int(view.NumOfGrowingRows) + } return collectionRowCount + int(float64(rowCount)* params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat()) } diff --git a/internal/querycoordv2/balance/score_based_balancer_test.go b/internal/querycoordv2/balance/score_based_balancer_test.go index cb1367f90b6d1..db2135b649a2f 100644 --- a/internal/querycoordv2/balance/score_based_balancer_test.go +++ b/internal/querycoordv2/balance/score_based_balancer_test.go @@ -18,6 +18,7 @@ package balance import ( "testing" + "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -234,6 +235,48 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegment() { } } +func (suite *ScoreBasedBalancerTestSuite) TestAssignSegmentWithGrowing() { + suite.SetupSuite() + defer suite.TearDownTest() + balancer := suite.balancer + + distributions := map[int64][]*meta.Segment{ + 1: { + {SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 20, CollectionID: 1}, Node: 1}, + }, + 2: { + {SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 20, CollectionID: 1}, Node: 2}, + }, + } + for node, s := range distributions { + balancer.dist.SegmentDistManager.Update(node, s...) + } + + for _, node := range lo.Keys(distributions) { + nodeInfo := session.NewNodeInfo(node, "127.0.0.1:0") + nodeInfo.UpdateStats(session.WithSegmentCnt(20)) + nodeInfo.SetState(session.NodeStateNormal) + suite.balancer.nodeManager.Add(nodeInfo) + } + + toAssign := []*meta.Segment{ + {SegmentInfo: &datapb.SegmentInfo{ID: 3, NumOfRows: 10, CollectionID: 1}, Node: 3}, + {SegmentInfo: &datapb.SegmentInfo{ID: 4, NumOfRows: 10, CollectionID: 1}, Node: 3}, + } + + // mock 50 growing row count in node 1, which is delegator, expect all segment assign to node 2 + leaderView := &meta.LeaderView{ + ID: 1, + CollectionID: 1, + NumOfGrowingRows: 50, + } + suite.balancer.dist.LeaderViewManager.Update(1, leaderView) + plans := balancer.AssignSegment(1, toAssign, lo.Keys(distributions)) + for _, p := range plans { + suite.Equal(int64(2), p.To) + } +} + func (suite *ScoreBasedBalancerTestSuite) TestBalanceOneRound() { cases := []struct { name string diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go index 66fc0139a8ccc..dbb0d6a3168fe 100644 --- a/internal/querycoordv2/dist/dist_handler.go +++ b/internal/querycoordv2/dist/dist_handler.go @@ -197,13 +197,14 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons } view := &meta.LeaderView{ - ID: resp.GetNodeID(), - CollectionID: lview.GetCollection(), - Channel: lview.GetChannel(), - Version: version, - Segments: lview.GetSegmentDist(), - GrowingSegments: segments, - TargetVersion: lview.TargetVersion, + ID: resp.GetNodeID(), + CollectionID: lview.GetCollection(), + Channel: lview.GetChannel(), + Version: version, + Segments: lview.GetSegmentDist(), + GrowingSegments: segments, + TargetVersion: lview.TargetVersion, + NumOfGrowingRows: lview.GetNumOfGrowingRows(), } updates = append(updates, view) } diff --git a/internal/querycoordv2/meta/leader_view_manager.go b/internal/querycoordv2/meta/leader_view_manager.go index 2d58cce3fecc0..f26e9df584519 100644 --- a/internal/querycoordv2/meta/leader_view_manager.go +++ b/internal/querycoordv2/meta/leader_view_manager.go @@ -25,13 +25,14 @@ import ( ) type LeaderView struct { - ID int64 - CollectionID int64 - Channel string - Version int64 - Segments map[int64]*querypb.SegmentDist - GrowingSegments map[int64]*Segment - TargetVersion int64 + ID int64 + CollectionID int64 + Channel string + Version int64 + Segments map[int64]*querypb.SegmentDist + GrowingSegments map[int64]*Segment + TargetVersion int64 + NumOfGrowingRows int64 } func (view *LeaderView) Clone() *LeaderView { @@ -46,13 +47,14 @@ func (view *LeaderView) Clone() *LeaderView { } return &LeaderView{ - ID: view.ID, - CollectionID: view.CollectionID, - Channel: view.Channel, - Version: view.Version, - Segments: segments, - GrowingSegments: growings, - TargetVersion: view.TargetVersion, + ID: view.ID, + CollectionID: view.CollectionID, + Channel: view.Channel, + Version: view.Version, + Segments: segments, + GrowingSegments: growings, + TargetVersion: view.TargetVersion, + NumOfGrowingRows: view.NumOfGrowingRows, } } @@ -168,8 +170,8 @@ func (mgr *LeaderViewManager) GetLeadersByGrowingSegment(segmentID int64) *Leade return nil } -// GetGrowingSegmentDistByCollectionAndNode returns all segments of the given collection and node. -func (mgr *LeaderViewManager) GetGrowingSegmentDistByCollectionAndNode(collectionID, nodeID int64) map[int64]*Segment { +// GetGrowingSegments returns all segments of the given collection and node. +func (mgr *LeaderViewManager) GetGrowingSegments(collectionID, nodeID int64) map[int64]*Segment { mgr.rwmutex.RLock() defer mgr.rwmutex.RUnlock() @@ -209,6 +211,19 @@ func (mgr *LeaderViewManager) GetLeaderView(id int64) map[string]*LeaderView { return mgr.views[id] } +func (mgr *LeaderViewManager) GetByCollectionAndNode(collection, node int64) map[string]*LeaderView { + mgr.rwmutex.RLock() + defer mgr.rwmutex.RUnlock() + + ret := make(map[string]*LeaderView) + for _, view := range mgr.views[node] { + if collection == view.CollectionID { + ret[view.Channel] = view + } + } + return ret +} + func (mgr *LeaderViewManager) GetLeaderShardView(id int64, shard string) *LeaderView { mgr.rwmutex.RLock() defer mgr.rwmutex.RUnlock() diff --git a/internal/querycoordv2/meta/leader_view_manager_test.go b/internal/querycoordv2/meta/leader_view_manager_test.go index 1eb7acffe0c16..93ad8e0779a7d 100644 --- a/internal/querycoordv2/meta/leader_view_manager_test.go +++ b/internal/querycoordv2/meta/leader_view_manager_test.go @@ -137,6 +137,10 @@ func (suite *LeaderViewManagerSuite) TestGetDist() { suite.AssertChannelDist(shard, nodes) } } + + // test get growing segments + segments := mgr.GetGrowingSegments(101, 1) + suite.Len(segments, 1) } func (suite *LeaderViewManagerSuite) TestGetLeader() { @@ -156,6 +160,10 @@ func (suite *LeaderViewManagerSuite) TestGetLeader() { suite.Equal(view, views[leader]) } } + + // Test GetByCollectionAndNode + leaders := mgr.GetByCollectionAndNode(101, 1) + suite.Len(leaders, 1) } func (suite *LeaderViewManagerSuite) AssertSegmentDist(segment int64, nodes []int64) bool { diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 390bd2fbd3594..9604fa81bdd65 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -1288,6 +1288,7 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get } } + numOfGrowingRows := int64(0) growingSegments := make(map[int64]*msgpb.MsgPosition) for _, entry := range growing { segment := node.manager.Segment.GetWithType(entry.SegmentID, segments.SegmentTypeGrowing) @@ -1297,14 +1298,16 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get continue } growingSegments[entry.SegmentID] = segment.StartPosition() + numOfGrowingRows += segment.InsertCount() } leaderViews = append(leaderViews, &querypb.LeaderView{ - Collection: delegator.Collection(), - Channel: key, - SegmentDist: sealedSegments, - GrowingSegments: growingSegments, - TargetVersion: delegator.GetTargetVersion(), + Collection: delegator.Collection(), + Channel: key, + SegmentDist: sealedSegments, + GrowingSegments: growingSegments, + TargetVersion: delegator.GetTargetVersion(), + NumOfGrowingRows: numOfGrowingRows, }) return true })