Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Remove l0 delete cache #32990

Merged
merged 1 commit into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions internal/querynodev2/delegator/delegator.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,11 @@ type shardDelegator struct {

lifetime lifetime.Lifetime[lifetime.State]

distribution *distribution
segmentManager segments.SegmentManager
tsafeManager tsafe.Manager
pkOracle pkoracle.PkOracle
level0Mut sync.RWMutex
level0Deletions map[int64]*storage.DeleteData // partitionID -> deletions
distribution *distribution
segmentManager segments.SegmentManager
tsafeManager tsafe.Manager
pkOracle pkoracle.PkOracle
level0Mut sync.RWMutex
// stream delete buffer
deleteMut sync.RWMutex
deleteBuffer deletebuffer.DeleteBuffer[*deletebuffer.Item]
Expand Down Expand Up @@ -876,7 +875,6 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
workerManager: workerManager,
lifetime: lifetime.NewLifetime(lifetime.Initializing),
distribution: NewDistribution(),
level0Deletions: make(map[int64]*storage.DeleteData),
deleteBuffer: deletebuffer.NewListDeleteBuffer[*deletebuffer.Item](startTs, sizePerBlock),
pkOracle: pkoracle.NewPkOracle(),
tsafeManager: tsafeManager,
Expand Down
114 changes: 33 additions & 81 deletions internal/querynodev2/delegator/delegator_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@
log := log.With(
zap.Int64("segmentID", segment.ID()),
)
deletedPks, deletedTss := sd.GetLevel0Deletions(segment.Partition())
deletedPks, deletedTss := sd.GetLevel0Deletions(segment.Partition(), pkoracle.NewCandidateKey(segment.ID(), segment.Partition(), segments.SegmentTypeGrowing))
if len(deletedPks) == 0 {
continue
}
Expand Down Expand Up @@ -488,7 +488,7 @@
}
})
if req.GetInfos()[0].GetLevel() == datapb.SegmentLevel_L0 {
sd.GenerateLevel0DeletionCache()
sd.RefreshLevel0DeletionStats()
} else {
log.Debug("load delete...")
err = sd.loadStreamDelete(ctx, candidates, infos, req.GetDeltaPositions(), targetNodeID, worker, entries)
Expand All @@ -512,94 +512,51 @@
return nil
}

func (sd *shardDelegator) GetLevel0Deletions(partitionID int64) ([]storage.PrimaryKey, []storage.Timestamp) {
sd.level0Mut.RLock()
deleteData, ok1 := sd.level0Deletions[partitionID]
allPartitionsDeleteData, ok2 := sd.level0Deletions[common.AllPartitionsID]
sd.level0Mut.RUnlock()
// we may need to merge the specified partition deletions and the all partitions deletions,
// so release the mutex as early as possible.

if ok1 && ok2 {
pks := make([]storage.PrimaryKey, 0, deleteData.RowCount+allPartitionsDeleteData.RowCount)
tss := make([]storage.Timestamp, 0, deleteData.RowCount+allPartitionsDeleteData.RowCount)

i := 0
j := 0
for i < int(deleteData.RowCount) || j < int(allPartitionsDeleteData.RowCount) {
if i == int(deleteData.RowCount) {
pks = append(pks, allPartitionsDeleteData.Pks[j])
tss = append(tss, allPartitionsDeleteData.Tss[j])
j++
} else if j == int(allPartitionsDeleteData.RowCount) {
pks = append(pks, deleteData.Pks[i])
tss = append(tss, deleteData.Tss[i])
i++
} else if deleteData.Tss[i] < allPartitionsDeleteData.Tss[j] {
pks = append(pks, deleteData.Pks[i])
tss = append(tss, deleteData.Tss[i])
i++
} else {
pks = append(pks, allPartitionsDeleteData.Pks[j])
tss = append(tss, allPartitionsDeleteData.Tss[j])
j++
}
}

return pks, tss
} else if ok1 {
return deleteData.Pks, deleteData.Tss
} else if ok2 {
return allPartitionsDeleteData.Pks, allPartitionsDeleteData.Tss
}

return nil, nil
}
func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkoracle.Candidate) ([]storage.PrimaryKey, []storage.Timestamp) {
sd.level0Mut.Lock()
defer sd.level0Mut.Unlock()

func (sd *shardDelegator) GenerateLevel0DeletionCache() {
// TODO: this could be large, host all L0 delete on delegator might be a dangerous, consider mmap it on local segment and stream processing it
level0Segments := sd.segmentManager.GetBy(segments.WithLevel(datapb.SegmentLevel_L0), segments.WithChannel(sd.vchannelName))
deletions := make(map[int64]*storage.DeleteData)
pks := make([]storage.PrimaryKey, 0)
tss := make([]storage.Timestamp, 0)

for _, segment := range level0Segments {
segment := segment.(*segments.L0Segment)
pks, tss := segment.DeleteRecords()
deleteData, ok := deletions[segment.Partition()]
if !ok {
deleteData = storage.NewDeleteData(pks, tss)
} else {
deleteData.AppendBatch(pks, tss)
if segment.Partition() == partitionID || segment.Partition() == common.AllPartitionsID {
segmentPks, segmentTss := segment.DeleteRecords()
for i, pk := range segmentPks {
if candidate.MayPkExist(pk) {
pks = append(pks, pk)
tss = append(tss, segmentTss[i])
}
}
}
deletions[segment.Partition()] = deleteData
}

type DeletePair struct {
Pk storage.PrimaryKey
Ts storage.Timestamp
}
for _, deleteData := range deletions {
pairs := make([]DeletePair, deleteData.RowCount)
for i := range deleteData.Pks {
pairs[i] = DeletePair{deleteData.Pks[i], deleteData.Tss[i]}
}
sort.Slice(pairs, func(i, j int) bool {
return pairs[i].Ts < pairs[j].Ts
})
for i := range pairs {
deleteData.Pks[i], deleteData.Tss[i] = pairs[i].Pk, pairs[i].Ts
}
}
sort.Slice(pks, func(i, j int) bool {
return tss[i] < tss[j]
})

return pks, tss
}

func (sd *shardDelegator) RefreshLevel0DeletionStats() {
sd.level0Mut.Lock()
defer sd.level0Mut.Unlock()
level0Segments := sd.segmentManager.GetBy(segments.WithLevel(datapb.SegmentLevel_L0), segments.WithChannel(sd.vchannelName))
totalSize := int64(0)
for _, delete := range deletions {
totalSize += delete.Size()
for _, segment := range level0Segments {
segment := segment.(*segments.L0Segment)
pks, tss := segment.DeleteRecords()
totalSize += lo.SumBy(pks, func(pk storage.PrimaryKey) int64 { return pk.Size() }) + int64(len(tss)*8)
}

metrics.QueryNodeLevelZeroSize.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(sd.collectionID),
sd.vchannelName,
).Set(float64(totalSize))
sd.level0Deletions = deletions
}

func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
Expand Down Expand Up @@ -635,14 +592,9 @@
position = deltaPositions[0]
}

deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition())
deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition(), candidate)
deleteData := &storage.DeleteData{}
for i, pk := range deletedPks {
if candidate.MayPkExist(pk) {
deleteData.Append(pk, deletedTss[i])
}
}

deleteData.AppendBatch(deletedPks, deletedTss)
if deleteData.RowCount > 0 {
log.Info("forward L0 delete to worker...",
zap.Int64("deleteRowNum", deleteData.RowCount),
Expand Down Expand Up @@ -900,7 +852,7 @@
}

if hasLevel0 {
sd.GenerateLevel0DeletionCache()
sd.RefreshLevel0DeletionStats()

Check warning on line 855 in internal/querynodev2/delegator/delegator_data.go

View check run for this annotation

Codecov / codecov/patch

internal/querynodev2/delegator/delegator_data.go#L855

Added line #L855 was not covered by tests
}
partitionsToReload := make([]UniqueID, 0)
lo.ForEach(req.GetSegmentIDs(), func(segmentID int64, _ int) {
Expand Down
68 changes: 43 additions & 25 deletions internal/querynodev2/delegator/delegator_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,44 +1110,62 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() {
partitionID := int64(10)
partitionDeleteData := storage.NewDeleteData([]storage.PrimaryKey{storage.NewInt64PrimaryKey(1)}, []storage.Timestamp{100})
allPartitionDeleteData := storage.NewDeleteData([]storage.PrimaryKey{storage.NewInt64PrimaryKey(2)}, []storage.Timestamp{101})
delegator.level0Deletions[partitionID] = partitionDeleteData

pks, _ := delegator.GetLevel0Deletions(partitionID)
schema := segments.GenTestCollectionSchema("test_stop", schemapb.DataType_Int64, true)
collection := segments.NewCollection(1, schema, nil, &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
})

l0, _ := segments.NewL0Segment(collection, segments.SegmentTypeSealed, 1, &querypb.SegmentLoadInfo{
CollectionID: 1,
SegmentID: 2,
PartitionID: partitionID,
InsertChannel: delegator.vchannelName,
Level: datapb.SegmentLevel_L0,
NumOfRows: 1,
})
l0.LoadDeltaData(context.TODO(), partitionDeleteData)
delegator.segmentManager.Put(context.TODO(), segments.SegmentTypeSealed, l0)

l0Global, _ := segments.NewL0Segment(collection, segments.SegmentTypeSealed, 2, &querypb.SegmentLoadInfo{
CollectionID: 1,
SegmentID: 3,
PartitionID: common.AllPartitionsID,
InsertChannel: delegator.vchannelName,
Level: datapb.SegmentLevel_L0,
NumOfRows: int64(1),
})
l0Global.LoadDeltaData(context.TODO(), allPartitionDeleteData)

pks, _ := delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.True(pks[0].EQ(partitionDeleteData.Pks[0]))

pks, _ = delegator.GetLevel0Deletions(partitionID + 1)
pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.Empty(pks)

delegator.level0Deletions[common.AllPartitionsID] = allPartitionDeleteData
pks, _ = delegator.GetLevel0Deletions(partitionID)
s.Len(pks, 2)
delegator.segmentManager.Put(context.TODO(), segments.SegmentTypeSealed, l0Global)
pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.True(pks[0].EQ(partitionDeleteData.Pks[0]))
s.True(pks[1].EQ(allPartitionDeleteData.Pks[0]))

delete(delegator.level0Deletions, partitionID)
pks, _ = delegator.GetLevel0Deletions(partitionID)
s.True(pks[0].EQ(allPartitionDeleteData.Pks[0]))

// exchange the order
delegator.level0Deletions = make(map[int64]*storage.DeleteData)
partitionDeleteData, allPartitionDeleteData = allPartitionDeleteData, partitionDeleteData
delegator.level0Deletions[partitionID] = partitionDeleteData
bfs := pkoracle.NewBloomFilterSet(3, l0.Partition(), commonpb.SegmentState_Sealed)
bfs.UpdateBloomFilter(allPartitionDeleteData.Pks)

pks, _ = delegator.GetLevel0Deletions(partitionID)
s.True(pks[0].EQ(partitionDeleteData.Pks[0]))

pks, _ = delegator.GetLevel0Deletions(partitionID + 1)
s.Empty(pks)
pks, _ = delegator.GetLevel0Deletions(partitionID, bfs)
// bf filtered segment
s.Equal(len(pks), 1)
s.True(pks[0].EQ(allPartitionDeleteData.Pks[0]))

delegator.level0Deletions[common.AllPartitionsID] = allPartitionDeleteData
pks, _ = delegator.GetLevel0Deletions(partitionID)
s.Len(pks, 2)
delegator.segmentManager.Remove(context.TODO(), l0.ID(), querypb.DataScope_All)
pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.True(pks[0].EQ(allPartitionDeleteData.Pks[0]))
s.True(pks[1].EQ(partitionDeleteData.Pks[0]))

delete(delegator.level0Deletions, partitionID)
pks, _ = delegator.GetLevel0Deletions(partitionID)
pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.True(pks[0].EQ(allPartitionDeleteData.Pks[0]))

delegator.segmentManager.Remove(context.TODO(), l0Global.ID(), querypb.DataScope_All)
pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.Empty(pks)
}

func (s *DelegatorDataSuite) TestReadDeleteFromMsgstream() {
Expand Down
Loading