Skip to content

Commit

Permalink
Fix remove segment when querynode receive segmentChangeInfo
Browse files Browse the repository at this point in the history
Signed-off-by: xige-16 <xi.ge@zilliz.com>
  • Loading branch information
xige-16 committed Oct 26, 2021
1 parent e95b7a1 commit 17f1156
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 40 deletions.
32 changes: 32 additions & 0 deletions internal/querycoord/query_coord_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,38 @@ func TestHandoffSegmentLoop(t *testing.T) {
waitTaskFinalState(handoffTask, taskFailed)
})

releasePartitionTask := genReleasePartitionTask(baseCtx, queryCoord)
err = queryCoord.scheduler.Enqueue(releasePartitionTask)
assert.Nil(t, err)
waitTaskFinalState(releasePartitionTask, taskExpired)

t.Run("Test handoffReleasedPartition", func(t *testing.T) {
baseTask := newBaseTask(baseCtx, querypb.TriggerCondition_handoff)
segmentInfo := &querypb.SegmentInfo{
SegmentID: defaultSegmentID,
CollectionID: defaultCollectionID,
PartitionID: defaultPartitionID,
SegmentState: querypb.SegmentState_sealed,
}
handoffReq := &querypb.HandoffSegmentsRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_HandoffSegments,
},
SegmentInfos: []*querypb.SegmentInfo{segmentInfo},
}
handoffTask := &handoffTask{
baseTask: baseTask,
HandoffSegmentsRequest: handoffReq,
dataCoord: queryCoord.dataCoordClient,
cluster: queryCoord.cluster,
meta: queryCoord.meta,
}
err = queryCoord.scheduler.Enqueue(handoffTask)
assert.Nil(t, err)

waitTaskFinalState(handoffTask, taskExpired)
})

queryCoord.Stop()
err = removeAllSession()
assert.Nil(t, err)
Expand Down
5 changes: 5 additions & 0 deletions internal/querycoord/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,11 @@ func (ht *handoffTask) execute(ctx context.Context) error {
continue
}

if collectionInfo.LoadType == querypb.LoadType_loadCollection && ht.meta.hasReleasePartition(collectionID, partitionID) {
log.Debug("handoffTask: partition has not been released", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID))
continue
}

partitionLoaded := false
for _, id := range collectionInfo.PartitionIDs {
if id == partitionID {
Expand Down
21 changes: 1 addition & 20 deletions internal/querynode/query_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,31 +328,12 @@ func (q *queryCollection) adjustByChangeInfo(msg *msgstream.SealedSegmentsChange
},
},
})
// 3. delete growing segment because these segments are loaded
hasGrowingSegment := q.streaming.replica.hasSegment(segment.SegmentID)
if hasGrowingSegment {
err = q.streaming.replica.removeSegment(segment.SegmentID)
if err != nil {
return err
}
log.Debug("remove growing segment in adjustByChangeInfo",
zap.Any("collectionID", q.collectionID),
zap.Any("segmentID", segment.SegmentID),
)
}
}

// for OfflineSegments:
for _, segment := range msg.OfflineSegments {
// 1. update global sealed segments
// update global sealed segments
q.globalSegmentManager.removeGlobalSegmentInfo(segment.SegmentID)
// 2. load balance, remove old sealed segments
if msg.OfflineNodeID == Params.QueryNodeID {
err := q.historical.replica.removeSegment(segment.SegmentID)
if err != nil {
return err
}
}
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/querynode/query_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,6 @@ func TestQueryCollection_adjustByChangeInfo(t *testing.T) {
info.OfflineSegments = append(info.OfflineSegments, genSimpleSegmentInfo())

err = qc.adjustByChangeInfo(info)
assert.Error(t, err)
assert.Nil(t, err)
})
}
21 changes: 2 additions & 19 deletions internal/querynode/query_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/types"
Expand Down Expand Up @@ -356,23 +355,7 @@ func (node *QueryNode) adjustByChangeInfo(info *querypb.SealedSegmentsChangeInfo

// For online segments:
for _, segmentInfo := range info.OnlineSegments {
// 1. update excluded segment, cluster have been loaded sealed segments,
// so we need to avoid getting growing segment from flow graph.
node.streaming.replica.addExcludedSegments(segmentInfo.CollectionID, []*datapb.SegmentInfo{
{
ID: segmentInfo.SegmentID,
CollectionID: segmentInfo.CollectionID,
PartitionID: segmentInfo.PartitionID,
InsertChannel: segmentInfo.ChannelID,
NumOfRows: segmentInfo.NumRows,
// TODO: add status, remove query pb segment status, use common pb segment status?
DmlPosition: &internalpb.MsgPosition{
// use max timestamp to filter out dm messages
Timestamp: typeutil.MaxTimestamp,
},
},
})
// 2. delete growing segment because these segments are loaded in historical.
// delete growing segment because these segments are loaded in historical.
hasGrowingSegment := node.streaming.replica.hasSegment(segmentInfo.SegmentID)
if hasGrowingSegment {
err := node.streaming.replica.removeSegment(segmentInfo.SegmentID)
Expand All @@ -389,7 +372,7 @@ func (node *QueryNode) adjustByChangeInfo(info *querypb.SealedSegmentsChangeInfo

// For offline segments:
for _, segment := range info.OfflineSegments {
// 1. load balance or compaction, remove old sealed segments.
// load balance or compaction, remove old sealed segments.
if info.OfflineNodeID == Params.QueryNodeID {
err := node.historical.replica.removeSegment(segment.SegmentID)
if err != nil {
Expand Down

0 comments on commit 17f1156

Please sign in to comment.