Skip to content

Commit

Permalink
Clear segment cache when closing flowgraph (milvus-io#17671)
Browse files Browse the repository at this point in the history
See also: milvus-io#17537

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn authored Jun 22, 2022
1 parent f23690a commit 1215843
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 0 deletions.
7 changes: 7 additions & 0 deletions internal/datanode/data_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,17 @@ func (dsService *dataSyncService) close() {
metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID())).Sub(2) // timeTickChannel + deltaChannel
}

dsService.clearGlobalFlushingCache()

dsService.cancelFn()
dsService.flushManager.close()
}

func (dsService *dataSyncService) clearGlobalFlushingCache() {
segments := dsService.replica.listAllSegmentIDs()
dsService.flushingSegCache.Remove(segments...)
}

// getSegmentInfos return the SegmentInfo details according to the given ids through RPC to datacoord
func (dsService *dataSyncService) getSegmentInfos(segmentIds []int64) ([]*datapb.SegmentInfo, error) {
var segmentInfos []*datapb.SegmentInfo
Expand Down
36 changes: 36 additions & 0 deletions internal/datanode/data_sync_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/common"
Expand Down Expand Up @@ -403,3 +404,38 @@ func TestGetSegmentInfos(t *testing.T) {
assert.Error(t, err)
assert.Empty(t, segmentInfos3)
}

func TestClearGlobalFlushingCache(t *testing.T) {
dataCoord := &DataCoordFactory{}
cm := storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir))
defer cm.RemoveWithPrefix("")
replica, err := newReplica(context.Background(), &RootCoordFactory{pkType: schemapb.DataType_Int64}, cm, 1)
require.NoError(t, err)

cache := newCache()
dsService := &dataSyncService{
dataCoord: dataCoord,
replica: replica,
flushingSegCache: cache,
}

err = replica.addNewSegment(1, 1, 1, "", &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
assert.NoError(t, err)

err = replica.addFlushedSegment(2, 1, 1, "", 0, nil, 0)
assert.NoError(t, err)

err = replica.addNormalSegment(3, 1, 1, "", 0, nil, nil, 0)
assert.NoError(t, err)

cache.checkOrCache(1)
cache.checkOrCache(2)
cache.checkOrCache(4)

dsService.clearGlobalFlushingCache()

assert.False(t, cache.checkIfCached(1))
assert.False(t, cache.checkIfCached(2))
assert.False(t, cache.checkIfCached(3))
assert.True(t, cache.checkIfCached(4))
}

0 comments on commit 1215843

Please sign in to comment.