Skip to content

Commit

Permalink
enhance: Remove l0 delete cache (#32990)
Browse files Browse the repository at this point in the history
fix #32979
remove l0 cache and build delete pk and ts everytime. this reduce the
memory and also increase the code readability

Signed-off-by: xiaofanluan <xiaofan.luan@zilliz.com>
  • Loading branch information
xiaofan-luan authored May 21, 2024
1 parent e18d5ac commit 3d105fc
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 113 deletions.
12 changes: 5 additions & 7 deletions internal/querynodev2/delegator/delegator.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,11 @@ type shardDelegator struct {

lifetime lifetime.Lifetime[lifetime.State]

distribution *distribution
segmentManager segments.SegmentManager
tsafeManager tsafe.Manager
pkOracle pkoracle.PkOracle
level0Mut sync.RWMutex
level0Deletions map[int64]*storage.DeleteData // partitionID -> deletions
distribution *distribution
segmentManager segments.SegmentManager
tsafeManager tsafe.Manager
pkOracle pkoracle.PkOracle
level0Mut sync.RWMutex
// stream delete buffer
deleteMut sync.RWMutex
deleteBuffer deletebuffer.DeleteBuffer[*deletebuffer.Item]
Expand Down Expand Up @@ -876,7 +875,6 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
workerManager: workerManager,
lifetime: lifetime.NewLifetime(lifetime.Initializing),
distribution: NewDistribution(),
level0Deletions: make(map[int64]*storage.DeleteData),
deleteBuffer: deletebuffer.NewListDeleteBuffer[*deletebuffer.Item](startTs, sizePerBlock),
pkOracle: pkoracle.NewPkOracle(),
tsafeManager: tsafeManager,
Expand Down
114 changes: 33 additions & 81 deletions internal/querynodev2/delegator/delegator_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (sd *shardDelegator) LoadGrowing(ctx context.Context, infos []*querypb.Segm
log := log.With(
zap.Int64("segmentID", segment.ID()),
)
deletedPks, deletedTss := sd.GetLevel0Deletions(segment.Partition())
deletedPks, deletedTss := sd.GetLevel0Deletions(segment.Partition(), pkoracle.NewCandidateKey(segment.ID(), segment.Partition(), segments.SegmentTypeGrowing))
if len(deletedPks) == 0 {
continue
}
Expand Down Expand Up @@ -488,7 +488,7 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
}
})
if req.GetInfos()[0].GetLevel() == datapb.SegmentLevel_L0 {
sd.GenerateLevel0DeletionCache()
sd.RefreshLevel0DeletionStats()
} else {
log.Debug("load delete...")
err = sd.loadStreamDelete(ctx, candidates, infos, req.GetDeltaPositions(), targetNodeID, worker, entries)
Expand All @@ -512,94 +512,51 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
return nil
}

func (sd *shardDelegator) GetLevel0Deletions(partitionID int64) ([]storage.PrimaryKey, []storage.Timestamp) {
sd.level0Mut.RLock()
deleteData, ok1 := sd.level0Deletions[partitionID]
allPartitionsDeleteData, ok2 := sd.level0Deletions[common.AllPartitionsID]
sd.level0Mut.RUnlock()
// we may need to merge the specified partition deletions and the all partitions deletions,
// so release the mutex as early as possible.

if ok1 && ok2 {
pks := make([]storage.PrimaryKey, 0, deleteData.RowCount+allPartitionsDeleteData.RowCount)
tss := make([]storage.Timestamp, 0, deleteData.RowCount+allPartitionsDeleteData.RowCount)

i := 0
j := 0
for i < int(deleteData.RowCount) || j < int(allPartitionsDeleteData.RowCount) {
if i == int(deleteData.RowCount) {
pks = append(pks, allPartitionsDeleteData.Pks[j])
tss = append(tss, allPartitionsDeleteData.Tss[j])
j++
} else if j == int(allPartitionsDeleteData.RowCount) {
pks = append(pks, deleteData.Pks[i])
tss = append(tss, deleteData.Tss[i])
i++
} else if deleteData.Tss[i] < allPartitionsDeleteData.Tss[j] {
pks = append(pks, deleteData.Pks[i])
tss = append(tss, deleteData.Tss[i])
i++
} else {
pks = append(pks, allPartitionsDeleteData.Pks[j])
tss = append(tss, allPartitionsDeleteData.Tss[j])
j++
}
}

return pks, tss
} else if ok1 {
return deleteData.Pks, deleteData.Tss
} else if ok2 {
return allPartitionsDeleteData.Pks, allPartitionsDeleteData.Tss
}

return nil, nil
}
func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkoracle.Candidate) ([]storage.PrimaryKey, []storage.Timestamp) {
sd.level0Mut.Lock()
defer sd.level0Mut.Unlock()

func (sd *shardDelegator) GenerateLevel0DeletionCache() {
// TODO: this could be large, host all L0 delete on delegator might be a dangerous, consider mmap it on local segment and stream processing it
level0Segments := sd.segmentManager.GetBy(segments.WithLevel(datapb.SegmentLevel_L0), segments.WithChannel(sd.vchannelName))
deletions := make(map[int64]*storage.DeleteData)
pks := make([]storage.PrimaryKey, 0)
tss := make([]storage.Timestamp, 0)

for _, segment := range level0Segments {
segment := segment.(*segments.L0Segment)
pks, tss := segment.DeleteRecords()
deleteData, ok := deletions[segment.Partition()]
if !ok {
deleteData = storage.NewDeleteData(pks, tss)
} else {
deleteData.AppendBatch(pks, tss)
if segment.Partition() == partitionID || segment.Partition() == common.AllPartitionsID {
segmentPks, segmentTss := segment.DeleteRecords()
for i, pk := range segmentPks {
if candidate.MayPkExist(pk) {
pks = append(pks, pk)
tss = append(tss, segmentTss[i])
}
}
}
deletions[segment.Partition()] = deleteData
}

type DeletePair struct {
Pk storage.PrimaryKey
Ts storage.Timestamp
}
for _, deleteData := range deletions {
pairs := make([]DeletePair, deleteData.RowCount)
for i := range deleteData.Pks {
pairs[i] = DeletePair{deleteData.Pks[i], deleteData.Tss[i]}
}
sort.Slice(pairs, func(i, j int) bool {
return pairs[i].Ts < pairs[j].Ts
})
for i := range pairs {
deleteData.Pks[i], deleteData.Tss[i] = pairs[i].Pk, pairs[i].Ts
}
}
sort.Slice(pks, func(i, j int) bool {
return tss[i] < tss[j]
})

return pks, tss
}

func (sd *shardDelegator) RefreshLevel0DeletionStats() {
sd.level0Mut.Lock()
defer sd.level0Mut.Unlock()
level0Segments := sd.segmentManager.GetBy(segments.WithLevel(datapb.SegmentLevel_L0), segments.WithChannel(sd.vchannelName))
totalSize := int64(0)
for _, delete := range deletions {
totalSize += delete.Size()
for _, segment := range level0Segments {
segment := segment.(*segments.L0Segment)
pks, tss := segment.DeleteRecords()
totalSize += lo.SumBy(pks, func(pk storage.PrimaryKey) int64 { return pk.Size() }) + int64(len(tss)*8)
}

metrics.QueryNodeLevelZeroSize.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(sd.collectionID),
sd.vchannelName,
).Set(float64(totalSize))
sd.level0Deletions = deletions
}

func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
Expand Down Expand Up @@ -635,14 +592,9 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
position = deltaPositions[0]
}

deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition())
deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition(), candidate)
deleteData := &storage.DeleteData{}
for i, pk := range deletedPks {
if candidate.MayPkExist(pk) {
deleteData.Append(pk, deletedTss[i])
}
}

deleteData.AppendBatch(deletedPks, deletedTss)
if deleteData.RowCount > 0 {
log.Info("forward L0 delete to worker...",
zap.Int64("deleteRowNum", deleteData.RowCount),
Expand Down Expand Up @@ -900,7 +852,7 @@ func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.Rele
}

if hasLevel0 {
sd.GenerateLevel0DeletionCache()
sd.RefreshLevel0DeletionStats()
}
partitionsToReload := make([]UniqueID, 0)
lo.ForEach(req.GetSegmentIDs(), func(segmentID int64, _ int) {
Expand Down
68 changes: 43 additions & 25 deletions internal/querynodev2/delegator/delegator_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,44 +1110,62 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() {
partitionID := int64(10)
partitionDeleteData := storage.NewDeleteData([]storage.PrimaryKey{storage.NewInt64PrimaryKey(1)}, []storage.Timestamp{100})
allPartitionDeleteData := storage.NewDeleteData([]storage.PrimaryKey{storage.NewInt64PrimaryKey(2)}, []storage.Timestamp{101})
delegator.level0Deletions[partitionID] = partitionDeleteData

pks, _ := delegator.GetLevel0Deletions(partitionID)
schema := segments.GenTestCollectionSchema("test_stop", schemapb.DataType_Int64, true)
collection := segments.NewCollection(1, schema, nil, &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
})

l0, _ := segments.NewL0Segment(collection, segments.SegmentTypeSealed, 1, &querypb.SegmentLoadInfo{
CollectionID: 1,
SegmentID: 2,
PartitionID: partitionID,
InsertChannel: delegator.vchannelName,
Level: datapb.SegmentLevel_L0,
NumOfRows: 1,
})
l0.LoadDeltaData(context.TODO(), partitionDeleteData)
delegator.segmentManager.Put(context.TODO(), segments.SegmentTypeSealed, l0)

l0Global, _ := segments.NewL0Segment(collection, segments.SegmentTypeSealed, 2, &querypb.SegmentLoadInfo{
CollectionID: 1,
SegmentID: 3,
PartitionID: common.AllPartitionsID,
InsertChannel: delegator.vchannelName,
Level: datapb.SegmentLevel_L0,
NumOfRows: int64(1),
})
l0Global.LoadDeltaData(context.TODO(), allPartitionDeleteData)

pks, _ := delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.True(pks[0].EQ(partitionDeleteData.Pks[0]))

pks, _ = delegator.GetLevel0Deletions(partitionID + 1)
pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.Empty(pks)

delegator.level0Deletions[common.AllPartitionsID] = allPartitionDeleteData
pks, _ = delegator.GetLevel0Deletions(partitionID)
s.Len(pks, 2)
delegator.segmentManager.Put(context.TODO(), segments.SegmentTypeSealed, l0Global)
pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.True(pks[0].EQ(partitionDeleteData.Pks[0]))
s.True(pks[1].EQ(allPartitionDeleteData.Pks[0]))

delete(delegator.level0Deletions, partitionID)
pks, _ = delegator.GetLevel0Deletions(partitionID)
s.True(pks[0].EQ(allPartitionDeleteData.Pks[0]))

// exchange the order
delegator.level0Deletions = make(map[int64]*storage.DeleteData)
partitionDeleteData, allPartitionDeleteData = allPartitionDeleteData, partitionDeleteData
delegator.level0Deletions[partitionID] = partitionDeleteData
bfs := pkoracle.NewBloomFilterSet(3, l0.Partition(), commonpb.SegmentState_Sealed)
bfs.UpdateBloomFilter(allPartitionDeleteData.Pks)

pks, _ = delegator.GetLevel0Deletions(partitionID)
s.True(pks[0].EQ(partitionDeleteData.Pks[0]))

pks, _ = delegator.GetLevel0Deletions(partitionID + 1)
s.Empty(pks)
pks, _ = delegator.GetLevel0Deletions(partitionID, bfs)
// bf filtered segment
s.Equal(len(pks), 1)
s.True(pks[0].EQ(allPartitionDeleteData.Pks[0]))

delegator.level0Deletions[common.AllPartitionsID] = allPartitionDeleteData
pks, _ = delegator.GetLevel0Deletions(partitionID)
s.Len(pks, 2)
delegator.segmentManager.Remove(context.TODO(), l0.ID(), querypb.DataScope_All)
pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.True(pks[0].EQ(allPartitionDeleteData.Pks[0]))
s.True(pks[1].EQ(partitionDeleteData.Pks[0]))

delete(delegator.level0Deletions, partitionID)
pks, _ = delegator.GetLevel0Deletions(partitionID)
pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.True(pks[0].EQ(allPartitionDeleteData.Pks[0]))

delegator.segmentManager.Remove(context.TODO(), l0Global.ID(), querypb.DataScope_All)
pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.Empty(pks)
}

func (s *DelegatorDataSuite) TestReadDeleteFromMsgstream() {
Expand Down

0 comments on commit 3d105fc

Please sign in to comment.