From 17f1156161fe8c0dfe51524d35d7eb6979e45527 Mon Sep 17 00:00:00 2001 From: xige-16 Date: Tue, 26 Oct 2021 10:45:09 +0800 Subject: [PATCH] Fix remove segment when querynode receive segmentChangeInfo Signed-off-by: xige-16 --- internal/querycoord/query_coord_test.go | 32 +++++++++++++++++++++ internal/querycoord/task.go | 5 ++++ internal/querynode/query_collection.go | 21 +------------- internal/querynode/query_collection_test.go | 2 +- internal/querynode/query_node.go | 21 ++------------ 5 files changed, 41 insertions(+), 40 deletions(-) diff --git a/internal/querycoord/query_coord_test.go b/internal/querycoord/query_coord_test.go index 848228ae667c7..2d20414d216a0 100644 --- a/internal/querycoord/query_coord_test.go +++ b/internal/querycoord/query_coord_test.go @@ -375,6 +375,38 @@ func TestHandoffSegmentLoop(t *testing.T) { waitTaskFinalState(handoffTask, taskFailed) }) + releasePartitionTask := genReleasePartitionTask(baseCtx, queryCoord) + err = queryCoord.scheduler.Enqueue(releasePartitionTask) + assert.Nil(t, err) + waitTaskFinalState(releasePartitionTask, taskExpired) + + t.Run("Test handoffReleasedPartition", func(t *testing.T) { + baseTask := newBaseTask(baseCtx, querypb.TriggerCondition_handoff) + segmentInfo := &querypb.SegmentInfo{ + SegmentID: defaultSegmentID, + CollectionID: defaultCollectionID, + PartitionID: defaultPartitionID, + SegmentState: querypb.SegmentState_sealed, + } + handoffReq := &querypb.HandoffSegmentsRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_HandoffSegments, + }, + SegmentInfos: []*querypb.SegmentInfo{segmentInfo}, + } + handoffTask := &handoffTask{ + baseTask: baseTask, + HandoffSegmentsRequest: handoffReq, + dataCoord: queryCoord.dataCoordClient, + cluster: queryCoord.cluster, + meta: queryCoord.meta, + } + err = queryCoord.scheduler.Enqueue(handoffTask) + assert.Nil(t, err) + + waitTaskFinalState(handoffTask, taskExpired) + }) + queryCoord.Stop() err = removeAllSession() assert.Nil(t, err) diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index 7afabb91b008d..9cec464b6eaa7 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -1428,6 +1428,11 @@ func (ht *handoffTask) execute(ctx context.Context) error { continue } + if collectionInfo.LoadType == querypb.LoadType_loadCollection && ht.meta.hasReleasePartition(collectionID, partitionID) { + log.Debug("handoffTask: partition has not been released", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID)) + continue + } + partitionLoaded := false for _, id := range collectionInfo.PartitionIDs { if id == partitionID { diff --git a/internal/querynode/query_collection.go b/internal/querynode/query_collection.go index b47468d3462ef..4627279b20583 100644 --- a/internal/querynode/query_collection.go +++ b/internal/querynode/query_collection.go @@ -328,31 +328,12 @@ func (q *queryCollection) adjustByChangeInfo(msg *msgstream.SealedSegmentsChange }, }, }) - // 3. delete growing segment because these segments are loaded - hasGrowingSegment := q.streaming.replica.hasSegment(segment.SegmentID) - if hasGrowingSegment { - err = q.streaming.replica.removeSegment(segment.SegmentID) - if err != nil { - return err - } - log.Debug("remove growing segment in adjustByChangeInfo", - zap.Any("collectionID", q.collectionID), - zap.Any("segmentID", segment.SegmentID), - ) - } } // for OfflineSegments: for _, segment := range msg.OfflineSegments { - // 1. update global sealed segments + // update global sealed segments q.globalSegmentManager.removeGlobalSegmentInfo(segment.SegmentID) - // 2. load balance, remove old sealed segments - if msg.OfflineNodeID == Params.QueryNodeID { - err := q.historical.replica.removeSegment(segment.SegmentID) - if err != nil { - return err - } - } } return nil } diff --git a/internal/querynode/query_collection_test.go b/internal/querynode/query_collection_test.go index 034bfc8fc2fd3..09fbff61b8889 100644 --- a/internal/querynode/query_collection_test.go +++ b/internal/querynode/query_collection_test.go @@ -666,6 +666,6 @@ func TestQueryCollection_adjustByChangeInfo(t *testing.T) { info.OfflineSegments = append(info.OfflineSegments, genSimpleSegmentInfo()) err = qc.adjustByChangeInfo(info) - assert.Error(t, err) + assert.Nil(t, err) }) } diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index c7b54b06ca3f3..f7ab52d4ed0a9 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -43,7 +43,6 @@ import ( etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" - "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/types" @@ -356,23 +355,7 @@ func (node *QueryNode) adjustByChangeInfo(info *querypb.SealedSegmentsChangeInfo // For online segments: for _, segmentInfo := range info.OnlineSegments { - // 1. update excluded segment, cluster have been loaded sealed segments, - // so we need to avoid getting growing segment from flow graph. - node.streaming.replica.addExcludedSegments(segmentInfo.CollectionID, []*datapb.SegmentInfo{ - { - ID: segmentInfo.SegmentID, - CollectionID: segmentInfo.CollectionID, - PartitionID: segmentInfo.PartitionID, - InsertChannel: segmentInfo.ChannelID, - NumOfRows: segmentInfo.NumRows, - // TODO: add status, remove query pb segment status, use common pb segment status? - DmlPosition: &internalpb.MsgPosition{ - // use max timestamp to filter out dm messages - Timestamp: typeutil.MaxTimestamp, - }, - }, - }) - // 2. delete growing segment because these segments are loaded in historical. + // delete growing segment because these segments are loaded in historical. hasGrowingSegment := node.streaming.replica.hasSegment(segmentInfo.SegmentID) if hasGrowingSegment { err := node.streaming.replica.removeSegment(segmentInfo.SegmentID) @@ -389,7 +372,7 @@ func (node *QueryNode) adjustByChangeInfo(info *querypb.SealedSegmentsChangeInfo // For offline segments: for _, segment := range info.OfflineSegments { - // 1. load balance or compaction, remove old sealed segments. + // load balance or compaction, remove old sealed segments. if info.OfflineNodeID == Params.QueryNodeID { err := node.historical.replica.removeSegment(segment.SegmentID) if err != nil {