From 826b726c86eebaac8d8a1eb8182c998f9b804d62 Mon Sep 17 00:00:00 2001 From: wei liu Date: Fri, 10 Jan 2025 14:12:57 +0800 Subject: [PATCH] fix: Prevent leader checker from generating excessive duplicate leader tasks (#39000) issue: #39001 Background: Segment Load Version: Each segment load request assigns a timestamp as its version. When multiple copies of a segment are loaded on different QueryNodes, the leader checker uses this version to identify the latest copy and updates the routing table in the leader view to point to it. Delegator Router Version: When a delegator builds a route to a QueryNode that has loaded a segment, it also records the segment's version. Router Table Update Logic: If the leader checker detects that the version of a segment in the routing table does not match the version in the worker, it updates the routing table to point to the QueryNode with the latest version. Additionally, it updates the segment's load version in the QueryNode during this process. Issue: When a channel is undergoing load balancing, the leader checker may sync the routing table to a new delegator. This sync operation modifies the segment's load version, which invalidates the routing in the old delegator. Subsequently, the leader checker updates the routing table in the old delegator, breaking the routing in the new delegator. This cycle continues, causing repeated updates and inconsistencies. Fix: This PR introduces two changes to address the issue: 1. Use NodeID to verify whether the delegator's routing table needs an update, avoiding unnecessary modifications. 2. Ensure compatibility by using the latest segment's load version as the version recorded in the routing table. These changes resolve the cyclic updates and prevent the leader checker from generating excessive duplicate tasks, ensuring routing stability across delegators during load balancing. Signed-off-by: Wei Liu --- .../querycoordv2/checkers/leader_checker.go | 9 ++++++--- .../checkers/leader_checker_test.go | 20 ++++++++++++------- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/internal/querycoordv2/checkers/leader_checker.go b/internal/querycoordv2/checkers/leader_checker.go index 756e842d8c250..110d58068f52d 100644 --- a/internal/querycoordv2/checkers/leader_checker.go +++ b/internal/querycoordv2/checkers/leader_checker.go @@ -171,13 +171,16 @@ func (c *LeaderChecker) findNeedLoadedSegments(ctx context.Context, replica *met continue } - // when segment's version in leader view doesn't match segment's version in dist - // which means leader view store wrong segment location in leader view, then we should update segment location and segment's version + // The routing table on the delegator points to the nodes where segments are loaded. There are two scenarios that require updating the routing table on the delegator: + // 1. Missing Segment Routing - The routing table lacks the route for a specific segment. + // 2. Outdated Segment Routing - A segment has multiple copies loaded, but the routing table points to a node that does not host the most recently loaded copy. + // This ensures the routing table remains accurate and up-to-date, reflecting the latest segment distribution. version, ok := leaderView.Segments[s.GetID()] - if !ok || version.GetVersion() != s.Version { + if !ok || version.GetNodeID() != s.Node { log.RatedDebug(10, "leader checker append a segment to set", zap.Int64("segmentID", s.GetID()), zap.Int64("nodeID", s.Node)) + action := task.NewLeaderAction(leaderView.ID, s.Node, task.ActionTypeGrow, s.GetInsertChannel(), s.GetID(), time.Now().UnixNano()) t := task.NewLeaderSegmentTask( ctx, diff --git a/internal/querycoordv2/checkers/leader_checker_test.go b/internal/querycoordv2/checkers/leader_checker_test.go index ff0daaa76e66e..1d44938f7690b 100644 --- a/internal/querycoordv2/checkers/leader_checker_test.go +++ b/internal/querycoordv2/checkers/leader_checker_test.go @@ -138,22 +138,28 @@ func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegments() { suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(1)) suite.Equal(tasks[0].Priority(), task.TaskPriorityLow) - // test segment's version in leader view doesn't match segment's version in dist - observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel")) - view = utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}) + // Verify that the segment routing table in the leader view does not point to the most recent segment replica. + // the leader view points to the segment on querynode-2, with version 1 + // the distribution shows that the segment is on querynode-1, with latest version 2 + node1, node2 := int64(1), int64(2) + version1, version2 := int64(1), int64(2) + observer.dist.SegmentDistManager.Update(node1) + observer.dist.SegmentDistManager.Update(node2, utils.CreateTestSegment(1, 1, 1, node2, version2, "test-insert-channel")) + view = utils.CreateTestLeaderView(node2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{}) view.TargetVersion = observer.target.GetCollectionTargetVersion(ctx, 1, meta.CurrentTarget) view.Segments[1] = &querypb.SegmentDist{ - NodeID: 0, - Version: time.Now().UnixMilli() - 1, + NodeID: node1, + Version: version1, } - observer.dist.LeaderViewManager.Update(2, view) + observer.dist.LeaderViewManager.Update(node2, view) tasks = suite.checker.Check(context.TODO()) suite.Len(tasks, 1) suite.Equal(tasks[0].Source(), utils.LeaderChecker) suite.Len(tasks[0].Actions(), 1) suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow) - suite.Equal(tasks[0].Actions()[0].Node(), int64(1)) + suite.Equal(tasks[0].Actions()[0].Node(), node2) + suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).GetLeaderID(), node2) suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(1)) suite.Equal(tasks[0].Priority(), task.TaskPriorityLow)