Skip to content

Commit

Permalink
enhance: Add back BF lazy load logic for datanode watch channel
Browse files Browse the repository at this point in the history
Add back lazy loading statslog when watch dml channel on datanode.

Related to milvus-io#22994 milvus-io#27675

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia committed Aug 22, 2024
1 parent 582d2ee commit 16823de
Show file tree
Hide file tree
Showing 27 changed files with 306 additions and 123 deletions.
12 changes: 6 additions & 6 deletions internal/datanode/compaction/l0_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/io"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
Expand Down Expand Up @@ -240,7 +240,7 @@ func (t *LevelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWr
func (t *LevelZeroCompactionTask) splitDelta(
ctx context.Context,
allDelta *storage.DeleteData,
segmentBfs map[int64]*metacache.BloomFilterSet,
segmentBfs map[int64]*pkoracle.BloomFilterSet,
) map[int64]*SegmentDeltaWriter {
traceCtx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta")
defer span.End()
Expand Down Expand Up @@ -281,7 +281,7 @@ type BatchApplyRet = struct {
Segment2Hits map[int64][]bool
}

func (t *LevelZeroCompactionTask) applyBFInParallel(ctx context.Context, deltaData *storage.DeleteData, pool *conc.Pool[any], segmentBfs map[int64]*metacache.BloomFilterSet) *typeutil.ConcurrentMap[int, *BatchApplyRet] {
func (t *LevelZeroCompactionTask) applyBFInParallel(ctx context.Context, deltaData *storage.DeleteData, pool *conc.Pool[any], segmentBfs map[int64]*pkoracle.BloomFilterSet) *typeutil.ConcurrentMap[int, *BatchApplyRet] {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact applyBFInParallel")
defer span.End()
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
Expand Down Expand Up @@ -418,7 +418,7 @@ func (t *LevelZeroCompactionTask) loadDelta(ctx context.Context, deltaLogs []str
return dData, nil
}

func (t *LevelZeroCompactionTask) loadBF(ctx context.Context, targetSegments []*datapb.CompactionSegmentBinlogs) (map[int64]*metacache.BloomFilterSet, error) {
func (t *LevelZeroCompactionTask) loadBF(ctx context.Context, targetSegments []*datapb.CompactionSegmentBinlogs) (map[int64]*pkoracle.BloomFilterSet, error) {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact loadBF")
defer span.End()

Expand All @@ -427,7 +427,7 @@ func (t *LevelZeroCompactionTask) loadBF(ctx context.Context, targetSegments []*
pool = io.GetOrCreateStatsPool()

mu = &sync.Mutex{}
bfs = make(map[int64]*metacache.BloomFilterSet)
bfs = make(map[int64]*pkoracle.BloomFilterSet)
)

for _, segment := range targetSegments {
Expand All @@ -445,7 +445,7 @@ func (t *LevelZeroCompactionTask) loadBF(ctx context.Context, targetSegments []*
zap.Error(err))
return err, err
}
bf := metacache.NewBloomFilterSet(pks...)
bf := pkoracle.NewBloomFilterSet(pks...)
mu.Lock()
defer mu.Unlock()
bfs[segment.GetSegmentID()] = bf
Expand Down
10 changes: 5 additions & 5 deletions internal/datanode/compaction/l0_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/io"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
Expand Down Expand Up @@ -494,15 +494,15 @@ func (s *LevelZeroCompactionTaskSuite) TestSerializeUpload() {
}

func (s *LevelZeroCompactionTaskSuite) TestSplitDelta() {
bfs1 := metacache.NewBloomFilterSetWithBatchSize(100)
bfs1 := pkoracle.NewBloomFilterSetWithBatchSize(100)
bfs1.UpdatePKRange(&storage.Int64FieldData{Data: []int64{1, 3}})
bfs2 := metacache.NewBloomFilterSetWithBatchSize(100)
bfs2 := pkoracle.NewBloomFilterSetWithBatchSize(100)
bfs2.UpdatePKRange(&storage.Int64FieldData{Data: []int64{3}})
bfs3 := metacache.NewBloomFilterSetWithBatchSize(100)
bfs3 := pkoracle.NewBloomFilterSetWithBatchSize(100)
bfs3.UpdatePKRange(&storage.Int64FieldData{Data: []int64{3}})

predicted := []int64{100, 101, 102}
segmentBFs := map[int64]*metacache.BloomFilterSet{
segmentBFs := map[int64]*pkoracle.BloomFilterSet{
100: bfs1,
101: bfs2,
102: bfs3,
Expand Down
3 changes: 2 additions & 1 deletion internal/datanode/compaction/mix_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/io"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/storage"
Expand Down Expand Up @@ -214,7 +215,7 @@ func (s *MixCompactionTaskSuite) TestCompactTwoToOne() {
PartitionID: PartitionID,
ID: 99999,
NumOfRows: 0,
}, metacache.NewBloomFilterSet())
}, pkoracle.NewBloomFilterSet())

s.plan.SegmentBinlogs = append(s.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{
SegmentID: seg.SegmentID(),
Expand Down
9 changes: 5 additions & 4 deletions internal/datanode/importv2/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle"
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
Expand Down Expand Up @@ -59,8 +60,8 @@ func NewSyncTask(ctx context.Context,
CollectionID: collectionID,
PartitionID: partitionID,
InsertChannel: vchannel,
}, func(info *datapb.SegmentInfo) *metacache.BloomFilterSet {
bfs := metacache.NewBloomFilterSet()
}, func(info *datapb.SegmentInfo) pkoracle.PkStat {
bfs := pkoracle.NewBloomFilterSet()
return bfs
})
}
Expand Down Expand Up @@ -240,8 +241,8 @@ func NewMetaCache(req *datapb.ImportRequest) map[string]metacache.MetaCache {
},
Schema: schema,
}
metaCache := metacache.NewMetaCache(info, func(segment *datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
metaCache := metacache.NewMetaCache(info, func(segment *datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
metaCaches[channel] = metaCache
}
Expand Down
10 changes: 5 additions & 5 deletions internal/datanode/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/milvus-io/milvus/internal/datanode/compaction"
"github.com/milvus-io/milvus/internal/datanode/importv2"
"github.com/milvus-io/milvus/internal/flushcommon/io"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
Expand Down Expand Up @@ -333,7 +333,7 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
log.Info("segment loading PKs", zap.Int64("segmentID", segID))
newSegments = append(newSegments, newSeg)
future := io.GetOrCreateStatsPool().Submit(func() (any, error) {
var val *metacache.BloomFilterSet
var val *pkoracle.BloomFilterSet
var err error
err = binlog.DecompressBinLog(storage.StatsBinlog, req.GetCollectionId(), req.GetPartitionId(), newSeg.GetSegmentId(), []*datapb.FieldBinlog{newSeg.GetPkStatsLog()})
if err != nil {
Expand All @@ -345,7 +345,7 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
log.Warn("failed to load segment stats log", zap.Error(err))
return val, err
}
val = metacache.NewBloomFilterSet(pks...)
val = pkoracle.NewBloomFilterSet(pks...)
return val, nil
})
futures = append(futures, future)
Expand All @@ -358,8 +358,8 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
return merr.Status(err), nil
}

newSegmentsBF := lo.Map(futures, func(future *conc.Future[any], _ int) *metacache.BloomFilterSet {
return future.Value().(*metacache.BloomFilterSet)
newSegmentsBF := lo.Map(futures, func(future *conc.Future[any], _ int) *pkoracle.BloomFilterSet {
return future.Value().(*pkoracle.BloomFilterSet)
})

ds.GetMetaCache().UpdateSegmentView(req.GetPartitionId(), newSegments, newSegmentsBF, allSegments)
Expand Down
79 changes: 40 additions & 39 deletions internal/datanode/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/milvus-io/milvus/internal/datanode/compaction"
"github.com/milvus-io/milvus/internal/flushcommon/broker"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/flushcommon/metacache/pkoracle"
"github.com/milvus-io/milvus/internal/flushcommon/pipeline"
"github.com/milvus-io/milvus/internal/flushcommon/util"
"github.com/milvus-io/milvus/internal/proto/datapb"
Expand Down Expand Up @@ -372,7 +373,7 @@ func (s *DataNodeServicesSuite) TestFlushSegments() {
PartitionID: 2,
State: commonpb.SegmentState_Growing,
StartPosition: &msgpb.MsgPosition{},
}, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSet() })
}, func(_ *datapb.SegmentInfo) pkoracle.PkStat { return pkoracle.NewBloomFilterSet() })

s.Run("service_not_ready", func() {
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -634,8 +635,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
},
},
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 100,
Expand All @@ -645,8 +646,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
NumOfRows: 0,
State: commonpb.SegmentState_Growing,
Level: datapb.SegmentLevel_L0,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 101,
Expand All @@ -656,8 +657,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
NumOfRows: 0,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 102,
Expand All @@ -667,8 +668,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
NumOfRows: 0,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L0,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 103,
Expand All @@ -678,8 +679,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
NumOfRows: 0,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L0,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
Expand Down Expand Up @@ -754,8 +755,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
},
},
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 100,
Expand All @@ -765,8 +766,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
NumOfRows: 0,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 101,
Expand All @@ -776,8 +777,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
NumOfRows: 0,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
Expand Down Expand Up @@ -840,8 +841,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
},
},
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 100,
Expand All @@ -851,8 +852,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
NumOfRows: 0,
State: commonpb.SegmentState_Growing,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 101,
Expand All @@ -862,8 +863,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
NumOfRows: 0,
State: commonpb.SegmentState_Flushing,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
Expand Down Expand Up @@ -926,8 +927,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
},
},
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 100,
Expand All @@ -937,8 +938,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
NumOfRows: 0,
State: commonpb.SegmentState_Growing,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 101,
Expand All @@ -948,8 +949,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
NumOfRows: 0,
State: commonpb.SegmentState_Flushing,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 102,
Expand All @@ -959,8 +960,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
NumOfRows: 0,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
Expand Down Expand Up @@ -1017,8 +1018,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
},
},
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 100,
Expand All @@ -1028,8 +1029,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
NumOfRows: 0,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L0,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 101,
Expand All @@ -1039,8 +1040,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
NumOfRows: 0,
State: commonpb.SegmentState_Flushing,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
Expand Down Expand Up @@ -1097,8 +1098,8 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
},
},
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
}, func(*datapb.SegmentInfo) pkoracle.PkStat {
return pkoracle.NewBloomFilterSet()
})
mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
Expand Down
Loading

0 comments on commit 16823de

Please sign in to comment.