Skip to content

Commit

Permalink
enhance: load deltalogs on demand when doing compactions (#37310)
Browse files Browse the repository at this point in the history
See #37234

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
  • Loading branch information
tedxu authored Nov 1, 2024
1 parent d119a25 commit b792b19
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 98 deletions.
29 changes: 16 additions & 13 deletions internal/datanode/compaction/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,17 +243,11 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro
log.Warn("compact wrong, task context done or timeout")
return nil, ctx.Err()
}
ctxTimeout, cancelAll := context.WithTimeout(ctx, time.Duration(t.plan.GetTimeoutInSeconds())*time.Second)
defer cancelAll()
defer t.cleanUp(ctx)

// 1, download delta logs to build deltaMap
deltaBlobs, _, err := composePaths(t.plan.GetSegmentBinlogs())
if err != nil {
return nil, err
}
deltaPk2Ts, err := mergeDeltalogs(ctxTimeout, t.binlogIO, deltaBlobs)
if err != nil {
// 1, decompose binlogs as preparation for later mapping
if err := binlog.DecompressCompactionBinlogs(t.plan.SegmentBinlogs); err != nil {
log.Warn("compact wrong, fail to decompress compaction binlogs", zap.Error(err))
return nil, err
}

Expand All @@ -272,7 +266,7 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro

// 3, mapping
log.Info("Clustering compaction start mapping", zap.Int("bufferNum", len(t.clusterBuffers)))
uploadSegments, partitionStats, err := t.mapping(ctx, deltaPk2Ts)
uploadSegments, partitionStats, err := t.mapping(ctx)
if err != nil {
log.Error("failed in mapping", zap.Error(err))
return nil, err
Expand Down Expand Up @@ -418,7 +412,6 @@ func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) e

// mapping read and split input segments into buffers
func (t *clusteringCompactionTask) mapping(ctx context.Context,
deltaPk2Ts map[interface{}]typeutil.Timestamp,
) ([]*datapb.CompactionSegment, *storage.PartitionStatsSnapshot, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("mapping-%d", t.GetPlanID()))
defer span.End()
Expand All @@ -436,7 +429,7 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context,
FieldBinlogs: segment.FieldBinlogs,
}
future := t.mappingPool.Submit(func() (any, error) {
err := t.mappingSegment(ctx, segmentClone, deltaPk2Ts)
err := t.mappingSegment(ctx, segmentClone)
return struct{}{}, err
})
futures = append(futures, future)
Expand Down Expand Up @@ -511,7 +504,6 @@ func (t *clusteringCompactionTask) getBufferTotalUsedMemorySize() int64 {
func (t *clusteringCompactionTask) mappingSegment(
ctx context.Context,
segment *datapb.CompactionSegmentBinlogs,
delta map[interface{}]typeutil.Timestamp,
) error {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, fmt.Sprintf("mappingSegment-%d-%d", t.GetPlanID(), segment.GetSegmentID()))
defer span.End()
Expand All @@ -528,6 +520,17 @@ func (t *clusteringCompactionTask) mappingSegment(
remained int64 = 0
)

deltaPaths := make([]string, 0)
for _, d := range segment.GetDeltalogs() {
for _, l := range d.GetBinlogs() {
deltaPaths = append(deltaPaths, l.GetLogPath())
}
}
delta, err := mergeDeltalogs(ctx, t.binlogIO, deltaPaths)
if err != nil {
return err
}

isDeletedValue := func(v *storage.Value) bool {
ts, ok := delta[v.PK.GetValue()]
// insert task and delete task has the same ts when upsert
Expand Down
45 changes: 21 additions & 24 deletions internal/datanode/compaction/compactor_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,31 +47,25 @@ func isExpiredEntity(ttl int64, now, ts typeutil.Timestamp) bool {
return expireTime.Before(pnow)
}

func mergeDeltalogs(ctx context.Context, io io.BinlogIO, dpaths map[typeutil.UniqueID][]string) (map[interface{}]typeutil.Timestamp, error) {
func mergeDeltalogs(ctx context.Context, io io.BinlogIO, paths []string) (map[interface{}]typeutil.Timestamp, error) {
pk2ts := make(map[interface{}]typeutil.Timestamp)

if len(dpaths) == 0 {
log.Info("compact with no deltalogs, skip merge deltalogs")
if len(paths) == 0 {
log.Debug("compact with no deltalogs, skip merge deltalogs")
return pk2ts, nil
}

blobs := make([]*storage.Blob, 0)
for segID, paths := range dpaths {
if len(paths) == 0 {
continue
}
binaries, err := io.Download(ctx, paths)
if err != nil {
log.Warn("compact wrong, fail to download deltalogs",
zap.Int64("segment", segID),
zap.Strings("path", paths),
zap.Error(err))
return nil, err
}
binaries, err := io.Download(ctx, paths)
if err != nil {
log.Warn("compact wrong, fail to download deltalogs",
zap.Strings("path", paths),
zap.Error(err))
return nil, err
}

for i := range binaries {
blobs = append(blobs, &storage.Blob{Value: binaries[i]})
}
for i := range binaries {
blobs = append(blobs, &storage.Blob{Value: binaries[i]})
}
reader, err := storage.CreateDeltalogReader(blobs)
if err != nil {
Expand Down Expand Up @@ -104,15 +98,18 @@ func mergeDeltalogs(ctx context.Context, io io.BinlogIO, dpaths map[typeutil.Uni
return pk2ts, nil
}

func composePaths(segments []*datapb.CompactionSegmentBinlogs) (map[typeutil.UniqueID][]string, [][]string, error) {
func composePaths(segments []*datapb.CompactionSegmentBinlogs) (
deltaPaths map[typeutil.UniqueID][]string, insertPaths map[typeutil.UniqueID][]string, err error,
) {
if err := binlog.DecompressCompactionBinlogs(segments); err != nil {
log.Warn("compact wrong, fail to decompress compaction binlogs", zap.Error(err))
return nil, nil, err
}

deltaPaths := make(map[typeutil.UniqueID][]string) // segmentID to deltalog paths
allPath := make([][]string, 0) // group by binlog batch
deltaPaths = make(map[typeutil.UniqueID][]string) // segmentID to deltalog paths
insertPaths = make(map[typeutil.UniqueID][]string, 0) // segmentID to binlog paths
for _, s := range segments {
segId := s.GetSegmentID()
// Get the batch count of field binlog files from non-empty segment
// each segment might contain different batches
var binlogBatchCount int
Expand All @@ -132,17 +129,17 @@ func composePaths(segments []*datapb.CompactionSegmentBinlogs) (map[typeutil.Uni
for _, f := range s.GetFieldBinlogs() {
batchPaths = append(batchPaths, f.GetBinlogs()[idx].GetLogPath())
}
allPath = append(allPath, batchPaths)
insertPaths[segId] = append(insertPaths[segId], batchPaths...)
}

deltaPaths[s.GetSegmentID()] = []string{}
for _, d := range s.GetDeltalogs() {
for _, l := range d.GetBinlogs() {
deltaPaths[s.GetSegmentID()] = append(deltaPaths[s.GetSegmentID()], l.GetLogPath())
deltaPaths[segId] = append(deltaPaths[s.GetSegmentID()], l.GetLogPath())
}
}
}
return deltaPaths, allPath, nil
return deltaPaths, insertPaths, nil
}

func serializeWrite(ctx context.Context, allocator allocator.Interface, writer *SegmentWriter) (kvs map[string][]byte, fieldBinlogs map[int64]*datapb.FieldBinlog, err error) {
Expand Down
53 changes: 33 additions & 20 deletions internal/datanode/compaction/merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ func mergeSortMultipleSegments(ctx context.Context,
collectionID, partitionID, maxRows int64,
binlogIO io.BinlogIO,
binlogs []*datapb.CompactionSegmentBinlogs,
delta map[interface{}]typeutil.Timestamp,
tr *timerecord.TimeRecorder,
currentTs typeutil.Timestamp,
collectionTtl int64,
Expand All @@ -47,17 +46,6 @@ func mergeSortMultipleSegments(ctx context.Context,
deletedRowCount int64
)

isValueDeleted := func(v *storage.Value) bool {
ts, ok := delta[v.PK.GetValue()]
// insert task and delete task has the same ts when upsert
// here should be < instead of <=
// to avoid the upsert data to be deleted after compact
if ok && uint64(v.Timestamp) < ts {
return true
}
return false
}

pkField, err := typeutil.GetPrimaryFieldSchema(plan.GetSchema())
if err != nil {
log.Warn("failed to get pk field from schema")
Expand All @@ -66,6 +54,7 @@ func mergeSortMultipleSegments(ctx context.Context,

// SegmentDeserializeReaderTest(binlogPaths, t.binlogIO, writer.GetPkID())
segmentReaders := make([]*SegmentDeserializeReader, len(binlogs))
segmentDelta := make([]map[interface{}]storage.Timestamp, len(binlogs))
for i, s := range binlogs {
var binlogBatchCount int
for _, b := range s.GetFieldBinlogs() {
Expand All @@ -89,13 +78,42 @@ func mergeSortMultipleSegments(ctx context.Context,
binlogPaths[idx] = batchPaths
}
segmentReaders[i] = NewSegmentDeserializeReader(ctx, binlogPaths, binlogIO, pkField.GetFieldID(), bm25FieldIds)
deltalogPaths := make([]string, 0)
for _, d := range s.GetDeltalogs() {
for _, l := range d.GetBinlogs() {
deltalogPaths = append(deltalogPaths, l.GetLogPath())
}
}
segmentDelta[i], err = mergeDeltalogs(ctx, binlogIO, deltalogPaths)
if err != nil {
return nil, err
}
}

advanceRow := func(i int) (*storage.Value, error) {
for {
v, err := segmentReaders[i].Next()
if err != nil {
return nil, err
}

ts, ok := segmentDelta[i][v.PK.GetValue()]
// insert task and delete task has the same ts when upsert
// here should be < instead of <=
// to avoid the upsert data to be deleted after compact
if ok && uint64(v.Timestamp) < ts {
deletedRowCount++
continue
}
return v, nil
}
}

pq := make(PriorityQueue, 0)
heap.Init(&pq)

for i, r := range segmentReaders {
if v, err := r.Next(); err == nil {
for i := range segmentReaders {
if v, err := advanceRow(i); err == nil {
heap.Push(&pq, &PQItem{
Value: v,
Index: i,
Expand All @@ -107,11 +125,6 @@ func mergeSortMultipleSegments(ctx context.Context,
smallest := heap.Pop(&pq).(*PQItem)
v := smallest.Value

if isValueDeleted(v) {
deletedRowCount++
continue
}

// Filtering expired entity
if isExpiredEntity(collectionTtl, currentTs, typeutil.Timestamp(v.Timestamp)) {
expiredRowCount++
Expand All @@ -124,7 +137,7 @@ func mergeSortMultipleSegments(ctx context.Context,
return nil, err
}

iv, err := segmentReaders[smallest.Index].Next()
iv, err := advanceRow(smallest.Index)
if err != nil && err != sio.EOF {
return nil, err
}
Expand Down
72 changes: 41 additions & 31 deletions internal/datanode/compaction/mix_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ func (t *mixCompactionTask) preCompact() error {

func (t *mixCompactionTask) mergeSplit(
ctx context.Context,
binlogPaths [][]string,
delta map[interface{}]typeutil.Timestamp,
insertPaths map[int64][]string,
deltaPaths map[int64][]string,
) ([]*datapb.CompactionSegment, error) {
_ = t.tr.RecordSpan()

Expand All @@ -153,8 +153,9 @@ func (t *mixCompactionTask) mergeSplit(
log.Warn("failed to get pk field from schema")
return nil, err
}
for _, paths := range binlogPaths {
del, exp, err := t.writePaths(ctx, delta, mWriter, pkField, paths)
for segId, binlogPaths := range insertPaths {
deltaPaths := deltaPaths[segId]
del, exp, err := t.writeSegment(ctx, binlogPaths, deltaPaths, mWriter, pkField)
if err != nil {
return nil, err
}
Expand All @@ -177,31 +178,38 @@ func (t *mixCompactionTask) mergeSplit(
return res, nil
}

func isValueDeleted(v *storage.Value, delta map[interface{}]typeutil.Timestamp) bool {
ts, ok := delta[v.PK.GetValue()]
// insert task and delete task has the same ts when upsert
// here should be < instead of <=
// to avoid the upsert data to be deleted after compact
if ok && uint64(v.Timestamp) < ts {
return true
}
return false
}

func (t *mixCompactionTask) writePaths(ctx context.Context, delta map[interface{}]typeutil.Timestamp,
mWriter *MultiSegmentWriter, pkField *schemapb.FieldSchema, paths []string,
func (t *mixCompactionTask) writeSegment(ctx context.Context,
binlogPaths []string,
deltaPaths []string,
mWriter *MultiSegmentWriter, pkField *schemapb.FieldSchema,
) (deletedRowCount, expiredRowCount int64, err error) {
log := log.With(zap.Strings("paths", paths))
allValues, err := t.binlogIO.Download(ctx, paths)
log := log.With(zap.Strings("paths", binlogPaths))
allValues, err := t.binlogIO.Download(ctx, binlogPaths)
if err != nil {
log.Warn("compact wrong, fail to download insertLogs", zap.Error(err))
return
}

blobs := lo.Map(allValues, func(v []byte, i int) *storage.Blob {
return &storage.Blob{Key: paths[i], Value: v}
return &storage.Blob{Key: binlogPaths[i], Value: v}
})

delta, err := mergeDeltalogs(ctx, t.binlogIO, deltaPaths)
if err != nil {
log.Warn("compact wrong, fail to merge deltalogs", zap.Error(err))
return
}
isValueDeleted := func(v *storage.Value) bool {
ts, ok := delta[v.PK.GetValue()]
// insert task and delete task has the same ts when upsert
// here should be < instead of <=
// to avoid the upsert data to be deleted after compact
if ok && uint64(v.Timestamp) < ts {
return true
}
return false
}

iter, err := storage.NewBinlogDeserializeReader(blobs, pkField.GetFieldID())
if err != nil {
log.Warn("compact wrong, failed to new insert binlogs reader", zap.Error(err))
Expand All @@ -221,7 +229,8 @@ func (t *mixCompactionTask) writePaths(ctx context.Context, delta map[interface{
}
}
v := iter.Value()
if isValueDeleted(v, delta) {

if isValueDeleted(v) {
deletedRowCount++
continue
}
Expand Down Expand Up @@ -261,23 +270,24 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
defer cancelAll()

log.Info("compact start")
deltaPaths, allBatchPaths, err := composePaths(t.plan.GetSegmentBinlogs())
deltaPaths, insertPaths, err := composePaths(t.plan.GetSegmentBinlogs())
if err != nil {
log.Warn("compact wrong, failed to composePaths", zap.Error(err))
return nil, err
}
// Unable to deal with all empty segments cases, so return error
if len(allBatchPaths) == 0 {
isEmpty := true
for _, paths := range insertPaths {
if len(paths) > 0 {
isEmpty = false
break
}
}
if isEmpty {
log.Warn("compact wrong, all segments' binlogs are empty")
return nil, errors.New("illegal compaction plan")
}

deltaPk2Ts, err := mergeDeltalogs(ctxTimeout, t.binlogIO, deltaPaths)
if err != nil {
log.Warn("compact wrong, fail to merge deltalogs", zap.Error(err))
return nil, err
}

allSorted := true
for _, segment := range t.plan.GetSegmentBinlogs() {
if !segment.GetIsSorted() {
Expand All @@ -290,13 +300,13 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
if allSorted && len(t.plan.GetSegmentBinlogs()) > 1 {
log.Info("all segments are sorted, use merge sort")
res, err = mergeSortMultipleSegments(ctxTimeout, t.plan, t.collectionID, t.partitionID, t.maxRows, t.binlogIO,
t.plan.GetSegmentBinlogs(), deltaPk2Ts, t.tr, t.currentTs, t.plan.GetCollectionTtl(), t.bm25FieldIDs)
t.plan.GetSegmentBinlogs(), t.tr, t.currentTs, t.plan.GetCollectionTtl(), t.bm25FieldIDs)
if err != nil {
log.Warn("compact wrong, fail to merge sort segments", zap.Error(err))
return nil, err
}
} else {
res, err = t.mergeSplit(ctxTimeout, allBatchPaths, deltaPk2Ts)
res, err = t.mergeSplit(ctxTimeout, insertPaths, deltaPaths)
if err != nil {
log.Warn("compact wrong, failed to mergeSplit", zap.Error(err))
return nil, err
Expand Down
Loading

0 comments on commit b792b19

Please sign in to comment.