From 3eb08aebd47f873971d46ab42dcba3759b1c7043 Mon Sep 17 00:00:00 2001 From: xiaofanluan Date: Sat, 11 May 2024 18:08:44 +0800 Subject: [PATCH] enhance: remove L0 delete cache Signed-off-by: xiaofanluan --- internal/querynodev2/delegator/delegator.go | 3 - .../querynodev2/delegator/delegator_data.go | 115 +++++------------- .../delegator/delegator_data_test.go | 91 ++++++++++---- 3 files changed, 101 insertions(+), 108 deletions(-) diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index 5750e35030f42..e9aa9c8c9891f 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -111,8 +111,6 @@ type shardDelegator struct { tsafeManager tsafe.Manager pkOracle pkoracle.PkOracle level0Mut sync.RWMutex - // TODO, there is not reason we want to cache L0Deletions, it also - level0Deletions map[int64]*storage.DeleteData // partitionID -> deletions // stream delete buffer deleteMut sync.RWMutex deleteBuffer deletebuffer.DeleteBuffer[*deletebuffer.Item] @@ -877,7 +875,6 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni workerManager: workerManager, lifetime: lifetime.NewLifetime(lifetime.Initializing), distribution: NewDistribution(), - level0Deletions: make(map[int64]*storage.DeleteData), deleteBuffer: deletebuffer.NewListDeleteBuffer[*deletebuffer.Item](startTs, sizePerBlock), pkOracle: pkoracle.NewPkOracle(), tsafeManager: tsafeManager, diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index a9b7d53c495f0..3068f4ce464a1 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -365,7 +365,7 @@ func (sd *shardDelegator) LoadGrowing(ctx context.Context, infos []*querypb.Segm log := log.With( zap.Int64("segmentID", segment.ID()), ) - deletedPks, deletedTss := sd.GetLevel0Deletions(segment.Partition()) + deletedPks, deletedTss := sd.GetLevel0Deletions(segment.Partition(), pkoracle.NewCandidateKey(segment.ID(), segment.Partition(), segments.SegmentTypeGrowing)) if len(deletedPks) == 0 { continue } @@ -488,7 +488,7 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg } }) if req.GetInfos()[0].GetLevel() == datapb.SegmentLevel_L0 { - sd.RefreshLevel0DeletionCache() + sd.RefreshLevel0DeletionStats() } else { log.Debug("load delete...") err = sd.loadStreamDelete(ctx, candidates, infos, req.GetDeltaPositions(), targetNodeID, worker, entries) @@ -512,95 +512,51 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg return nil } -func (sd *shardDelegator) GetLevel0Deletions(partitionID int64) ([]storage.PrimaryKey, []storage.Timestamp) { - sd.level0Mut.RLock() - deleteData, ok1 := sd.level0Deletions[partitionID] - allPartitionsDeleteData, ok2 := sd.level0Deletions[common.AllPartitionsID] - sd.level0Mut.RUnlock() - // we may need to merge the specified partition deletions and the all partitions deletions, - // so release the mutex as early as possible. - - if ok1 && ok2 { - pks := make([]storage.PrimaryKey, 0, deleteData.RowCount+allPartitionsDeleteData.RowCount) - tss := make([]storage.Timestamp, 0, deleteData.RowCount+allPartitionsDeleteData.RowCount) - - i := 0 - j := 0 - for i < int(deleteData.RowCount) || j < int(allPartitionsDeleteData.RowCount) { - if i == int(deleteData.RowCount) { - pks = append(pks, allPartitionsDeleteData.Pks[j]) - tss = append(tss, allPartitionsDeleteData.Tss[j]) - j++ - } else if j == int(allPartitionsDeleteData.RowCount) { - pks = append(pks, deleteData.Pks[i]) - tss = append(tss, deleteData.Tss[i]) - i++ - } else if deleteData.Tss[i] < allPartitionsDeleteData.Tss[j] { - pks = append(pks, deleteData.Pks[i]) - tss = append(tss, deleteData.Tss[i]) - i++ - } else { - pks = append(pks, allPartitionsDeleteData.Pks[j]) - tss = append(tss, allPartitionsDeleteData.Tss[j]) - j++ - } - } - - return pks, tss - } else if ok1 { - return deleteData.Pks, deleteData.Tss - } else if ok2 { - return allPartitionsDeleteData.Pks, allPartitionsDeleteData.Tss - } - - return nil, nil -} +func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkoracle.Candidate) ([]storage.PrimaryKey, []storage.Timestamp) { + sd.level0Mut.Lock() + defer sd.level0Mut.Unlock() -// TODO we can build this cache every time we read. we only need to update stats when load or release l0segments -func (sd *shardDelegator) RefreshLevel0DeletionCache() { + // TODO: this could be large, host all L0 delete on delegator might be a dangerous, consider mmap it on local segment and stream processing it level0Segments := sd.segmentManager.GetBy(segments.WithLevel(datapb.SegmentLevel_L0), segments.WithChannel(sd.vchannelName)) - deletions := make(map[int64]*storage.DeleteData) + pks := make([]storage.PrimaryKey, 0) + tss := make([]storage.Timestamp, 0) + for _, segment := range level0Segments { segment := segment.(*segments.L0Segment) - pks, tss := segment.DeleteRecords() - deleteData, ok := deletions[segment.Partition()] - if !ok { - deleteData = storage.NewDeleteData(pks, tss) - } else { - deleteData.AppendBatch(pks, tss) + if segment.Partition() == partitionID || segment.Partition() == common.AllPartitionsID { + segmentPks, segmentTss := segment.DeleteRecords() + for i, pk := range segmentPks { + if candidate.MayPkExist(pk) { + pks = append(pks, pk) + tss = append(tss, segmentTss[i]) + } + } } - deletions[segment.Partition()] = deleteData } - type DeletePair struct { - Pk storage.PrimaryKey - Ts storage.Timestamp - } - for _, deleteData := range deletions { - pairs := make([]DeletePair, deleteData.RowCount) - for i := range deleteData.Pks { - pairs[i] = DeletePair{deleteData.Pks[i], deleteData.Tss[i]} - } - sort.Slice(pairs, func(i, j int) bool { - return pairs[i].Ts < pairs[j].Ts - }) - for i := range pairs { - deleteData.Pks[i], deleteData.Tss[i] = pairs[i].Pk, pairs[i].Ts - } - } + sort.Slice(pks, func(i, j int) bool { + return tss[i] < tss[j] + }) + return pks, tss +} + +func (sd *shardDelegator) RefreshLevel0DeletionStats() { sd.level0Mut.Lock() defer sd.level0Mut.Unlock() + level0Segments := sd.segmentManager.GetBy(segments.WithLevel(datapb.SegmentLevel_L0), segments.WithChannel(sd.vchannelName)) totalSize := int64(0) - for _, delete := range deletions { - totalSize += delete.Size() + for _, segment := range level0Segments { + segment := segment.(*segments.L0Segment) + pks, tss := segment.DeleteRecords() + totalSize += lo.SumBy(pks, func(pk storage.PrimaryKey) int64 { return pk.Size() }) + int64(len(tss)*8) } + metrics.QueryNodeLevelZeroSize.WithLabelValues( fmt.Sprint(paramtable.GetNodeID()), fmt.Sprint(sd.collectionID), sd.vchannelName, ).Set(float64(totalSize)) - sd.level0Deletions = deletions } func (sd *shardDelegator) loadStreamDelete(ctx context.Context, @@ -636,14 +592,9 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context, position = deltaPositions[0] } - deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition()) + deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition(), candidate) deleteData := &storage.DeleteData{} - for i, pk := range deletedPks { - if candidate.MayPkExist(pk) { - deleteData.Append(pk, deletedTss[i]) - } - } - + deleteData.AppendBatch(deletedPks, deletedTss) if deleteData.RowCount > 0 { log.Info("forward L0 delete to worker...", zap.Int64("deleteRowNum", deleteData.RowCount), @@ -901,7 +852,7 @@ func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.Rele } if hasLevel0 { - sd.RefreshLevel0DeletionCache() + sd.RefreshLevel0DeletionStats() } partitionsToReload := make([]UniqueID, 0) lo.ForEach(req.GetSegmentIDs(), func(segmentID int64, _ int) { diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 76c13cb57bf05..74cfbfe25ada5 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -1132,44 +1132,89 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() { partitionID := int64(10) partitionDeleteData := storage.NewDeleteData([]storage.PrimaryKey{storage.NewInt64PrimaryKey(1)}, []storage.Timestamp{100}) allPartitionDeleteData := storage.NewDeleteData([]storage.PrimaryKey{storage.NewInt64PrimaryKey(2)}, []storage.Timestamp{101}) - delegator.level0Deletions[partitionID] = partitionDeleteData - pks, _ := delegator.GetLevel0Deletions(partitionID) + schema := segments.GenTestCollectionSchema("test_stop", schemapb.DataType_Int64, true) + collection := segments.NewCollection(1, schema, nil, &querypb.LoadMetaInfo{ + LoadType: querypb.LoadType_LoadCollection, + }) + + l0, _ := segments.NewL0Segment(collection, segments.SegmentTypeSealed, 1, &querypb.SegmentLoadInfo{ + CollectionID: 1, + SegmentID: 2, + PartitionID: partitionID, + InsertChannel: delegator.vchannelName, + Level: datapb.SegmentLevel_L0, + NumOfRows: 1, + }) + l0.LoadDeltaData(context.TODO(), partitionDeleteData) + delegator.segmentManager.Put(context.TODO(), segments.SegmentTypeSealed, l0) + + l0Global, _ := segments.NewL0Segment(collection, segments.SegmentTypeSealed, 2, &querypb.SegmentLoadInfo{ + CollectionID: 1, + SegmentID: 3, + PartitionID: common.AllPartitionsID, + InsertChannel: delegator.vchannelName, + Level: datapb.SegmentLevel_L0, + NumOfRows: int64(1), + }) + l0Global.LoadDeltaData(context.TODO(), allPartitionDeleteData) + + pks, _ := delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) s.True(pks[0].EQ(partitionDeleteData.Pks[0])) - pks, _ = delegator.GetLevel0Deletions(partitionID + 1) + pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) s.Empty(pks) - delegator.level0Deletions[common.AllPartitionsID] = allPartitionDeleteData - pks, _ = delegator.GetLevel0Deletions(partitionID) - s.Len(pks, 2) + delegator.segmentManager.Put(context.TODO(), segments.SegmentTypeSealed, l0Global) + pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) s.True(pks[0].EQ(partitionDeleteData.Pks[0])) s.True(pks[1].EQ(allPartitionDeleteData.Pks[0])) - delete(delegator.level0Deletions, partitionID) - pks, _ = delegator.GetLevel0Deletions(partitionID) + bfs := pkoracle.NewBloomFilterSet(3, l0.Partition(), commonpb.SegmentState_Sealed) + bfs.UpdateBloomFilter(allPartitionDeleteData.Pks) + + pks, _ = delegator.GetLevel0Deletions(partitionID, bfs) + // bf filtered segmen + s.Equal(len(pks), 1) s.True(pks[0].EQ(allPartitionDeleteData.Pks[0])) - // exchange the order - delegator.level0Deletions = make(map[int64]*storage.DeleteData) - partitionDeleteData, allPartitionDeleteData = allPartitionDeleteData, partitionDeleteData - delegator.level0Deletions[partitionID] = partitionDeleteData + delegator.segmentManager.Remove(context.TODO(), l0.ID(), querypb.DataScope_All) + pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) + s.True(pks[0].EQ(allPartitionDeleteData.Pks[0])) - pks, _ = delegator.GetLevel0Deletions(partitionID) - s.True(pks[0].EQ(partitionDeleteData.Pks[0])) + pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) + s.True(pks[0].EQ(allPartitionDeleteData.Pks[0])) - pks, _ = delegator.GetLevel0Deletions(partitionID + 1) + delegator.segmentManager.Remove(context.TODO(), l0Global.ID(), querypb.DataScope_All) + pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing)) s.Empty(pks) - delegator.level0Deletions[common.AllPartitionsID] = allPartitionDeleteData - pks, _ = delegator.GetLevel0Deletions(partitionID) - s.Len(pks, 2) - s.True(pks[0].EQ(allPartitionDeleteData.Pks[0])) - s.True(pks[1].EQ(partitionDeleteData.Pks[0])) + /* - delete(delegator.level0Deletions, partitionID) - pks, _ = delegator.GetLevel0Deletions(partitionID) - s.True(pks[0].EQ(allPartitionDeleteData.Pks[0])) + delete(delegator.level0Deletions, partitionID) + pks, _ = delegator.GetLevel0Deletions(partitionID) + s.True(pks[0].EQ(allPartitionDeleteData.Pks[0])) + + // exchange the order + delegator.level0Deletions = make(map[int64]*storage.DeleteData) + partitionDeleteData, allPartitionDeleteData = allPartitionDeleteData, partitionDeleteData + delegator.level0Deletions[partitionID] = partitionDeleteData + + pks, _ = delegator.GetLevel0Deletions(partitionID) + s.True(pks[0].EQ(partitionDeleteData.Pks[0])) + + pks, _ = delegator.GetLevel0Deletions(partitionID + 1) + s.Empty(pks) + + delegator.level0Deletions[common.AllPartitionsID] = allPartitionDeleteData + pks, _ = delegator.GetLevel0Deletions(partitionID) + s.Len(pks, 2) + s.True(pks[0].EQ(allPartitionDeleteData.Pks[0])) + s.True(pks[1].EQ(partitionDeleteData.Pks[0])) + + delete(delegator.level0Deletions, partitionID) + pks, _ = delegator.GetLevel0Deletions(partitionID) + s.True(pks[0].EQ(allPartitionDeleteData.Pks[0]))*/ } func (s *DelegatorDataSuite) TestReadDeleteFromMsgstream() {