From 04edb07d82a570d025080efb1fd9119fd686c1c6 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Thu, 20 Jun 2024 17:46:01 +0800 Subject: [PATCH] enhance: Add deltaRowCount in l0 compaction (#33997) See also: #33998 Signed-off-by: yangxuan --- internal/datacoord/compaction_l0_view.go | 44 ++++----- internal/datacoord/compaction_l0_view_test.go | 1 + internal/datacoord/compaction_view.go | 31 +++++-- internal/datanode/compaction/l0_compactor.go | 92 +++++++++---------- .../datanode/compaction/l0_compactor_test.go | 12 +-- 5 files changed, 95 insertions(+), 85 deletions(-) diff --git a/internal/datacoord/compaction_l0_view.go b/internal/datacoord/compaction_l0_view.go index d59df36c4b369..5f70ef7102ce9 100644 --- a/internal/datacoord/compaction_l0_view.go +++ b/internal/datacoord/compaction_l0_view.go @@ -22,7 +22,13 @@ func (v *LevelZeroSegmentsView) String() string { l0strings := lo.Map(v.segments, func(v *SegmentView, _ int) string { return v.LevelZeroString() }) - return fmt.Sprintf("label=<%s>, posT=<%v>, l0 segments=%v", + + count := lo.SumBy(v.segments, func(v *SegmentView) int { + return v.DeltaRowCount + }) + return fmt.Sprintf("L0SegCount=%d, DeltaRowCount=%d, label=<%s>, posT=<%v>, L0 segments=%v", + len(v.segments), + count, v.label.String(), v.earliestGrowingSegmentPos.GetTimestamp(), l0strings) @@ -116,19 +122,20 @@ func (v *LevelZeroSegmentsView) minCountSizeTrigger(segments []*SegmentView) (pi maxDeltaCount = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMaxNum.GetAsInt() ) - curSize := float64(0) + pickedSize := float64(0) + pickedCount := 0 // count >= minDeltaCount if lo.SumBy(segments, func(view *SegmentView) int { return view.DeltalogCount }) >= minDeltaCount { - picked, curSize = pickByMaxCountSize(segments, maxDeltaSize, maxDeltaCount) - reason = fmt.Sprintf("level zero segments count reaches minForceTriggerCountLimit=%d, curDeltaSize=%.2f, curDeltaCount=%d", minDeltaCount, curSize, len(segments)) + picked, pickedSize, pickedCount = pickByMaxCountSize(segments, maxDeltaSize, maxDeltaCount) + reason = fmt.Sprintf("level zero segments count reaches minForceTriggerCountLimit=%d, pickedSize=%.2fB, pickedCount=%d", minDeltaCount, pickedSize, pickedCount) return } // size >= minDeltaSize if lo.SumBy(segments, func(view *SegmentView) float64 { return view.DeltaSize }) >= minDeltaSize { - picked, curSize = pickByMaxCountSize(segments, maxDeltaSize, maxDeltaCount) - reason = fmt.Sprintf("level zero segments size reaches minForceTriggerSizeLimit=%.2f, curDeltaSize=%.2f, curDeltaCount=%d", minDeltaSize, curSize, len(segments)) + picked, pickedSize, pickedCount = pickByMaxCountSize(segments, maxDeltaSize, maxDeltaCount) + reason = fmt.Sprintf("level zero segments size reaches minForceTriggerSizeLimit=%.2fB, pickedSize=%.2fB, pickedCount=%d", minDeltaSize, pickedSize, pickedCount) return } @@ -143,30 +150,25 @@ func (v *LevelZeroSegmentsView) forceTrigger(segments []*SegmentView) (picked [] maxDeltaCount = paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMaxNum.GetAsInt() ) - curSize := float64(0) - picked, curSize = pickByMaxCountSize(segments, maxDeltaSize, maxDeltaCount) - reason = fmt.Sprintf("level zero views force to trigger, curDeltaSize=%.2f, curDeltaCount=%d", curSize, len(segments)) - return + picked, pickedSize, pickedCount := pickByMaxCountSize(segments, maxDeltaSize, maxDeltaCount) + reason = fmt.Sprintf("level zero views force to trigger, pickedSize=%.2fB, pickedCount=%d", pickedSize, pickedCount) + return picked, reason } // pickByMaxCountSize picks segments that count <= maxCount or size <= maxSize -func pickByMaxCountSize(segments []*SegmentView, maxSize float64, maxCount int) ([]*SegmentView, float64) { - var ( - curDeltaCount = 0 - curDeltaSize = float64(0) - ) +func pickByMaxCountSize(segments []*SegmentView, maxSize float64, maxCount int) (picked []*SegmentView, pickedSize float64, pickedCount int) { idx := 0 for _, view := range segments { - targetCount := view.DeltalogCount + curDeltaCount - targetSize := view.DeltaSize + curDeltaSize + targetCount := view.DeltalogCount + pickedCount + targetSize := view.DeltaSize + pickedSize - if (curDeltaCount != 0 && curDeltaSize != float64(0)) && (targetSize > maxSize || targetCount > maxCount) { + if (pickedCount != 0 && pickedSize != float64(0)) && (targetSize > maxSize || targetCount > maxCount) { break } - curDeltaCount = targetCount - curDeltaSize = targetSize + pickedCount = targetCount + pickedSize = targetSize idx += 1 } - return segments[:idx], curDeltaSize + return segments[:idx], pickedSize, pickedCount } diff --git a/internal/datacoord/compaction_l0_view_test.go b/internal/datacoord/compaction_l0_view_test.go index 863dbbe5678be..5fa941397b483 100644 --- a/internal/datacoord/compaction_l0_view_test.go +++ b/internal/datacoord/compaction_l0_view_test.go @@ -150,6 +150,7 @@ func (s *LevelZeroSegmentsViewSuite) TestTrigger() { if view.dmlPos.Timestamp < test.prepEarliestT { view.DeltalogCount = test.prepCountEach view.DeltaSize = test.prepSizeEach + view.DeltaRowCount = 1 } } log.Info("LevelZeroSegmentsView", zap.String("view", s.v.String())) diff --git a/internal/datacoord/compaction_view.go b/internal/datacoord/compaction_view.go index 2e2e7905ab8fa..e05eef1f09626 100644 --- a/internal/datacoord/compaction_view.go +++ b/internal/datacoord/compaction_view.go @@ -107,6 +107,9 @@ type SegmentView struct { BinlogCount int StatslogCount int DeltalogCount int + + // row count + DeltaRowCount int } func (s *SegmentView) Clone() *SegmentView { @@ -123,6 +126,7 @@ func (s *SegmentView) Clone() *SegmentView { BinlogCount: s.BinlogCount, StatslogCount: s.StatslogCount, DeltalogCount: s.DeltalogCount, + DeltaRowCount: s.DeltaRowCount, NumOfRows: s.NumOfRows, MaxRowNum: s.MaxRowNum, } @@ -147,6 +151,7 @@ func GetViewsByInfo(segments ...*SegmentInfo) []*SegmentView { DeltaSize: GetBinlogSizeAsBytes(segment.GetDeltalogs()), DeltalogCount: GetBinlogCount(segment.GetDeltalogs()), + DeltaRowCount: GetBinlogEntriesNum(segment.GetDeltalogs()), Size: GetBinlogSizeAsBytes(segment.GetBinlogs()), BinlogCount: GetBinlogCount(segment.GetBinlogs()), @@ -166,17 +171,19 @@ func (v *SegmentView) Equal(other *SegmentView) bool { v.DeltaSize == other.DeltaSize && v.BinlogCount == other.BinlogCount && v.StatslogCount == other.StatslogCount && - v.DeltalogCount == other.DeltalogCount + v.DeltalogCount == other.DeltalogCount && + v.NumOfRows == other.NumOfRows && + v.DeltaRowCount == other.DeltaRowCount } func (v *SegmentView) String() string { - return fmt.Sprintf("ID=%d, label=<%s>, state=%s, level=%s, binlogSize=%.2f, binlogCount=%d, deltaSize=%.2f, deltaCount=%d, expireSize=%.2f", - v.ID, v.label, v.State.String(), v.Level.String(), v.Size, v.BinlogCount, v.DeltaSize, v.DeltalogCount, v.ExpireSize) + return fmt.Sprintf("ID=%d, label=<%s>, state=%s, level=%s, binlogSize=%.2f, binlogCount=%d, deltaSize=%.2f, deltalogCount=%d, deltaRowCount=%d, expireSize=%.2f", + v.ID, v.label, v.State.String(), v.Level.String(), v.Size, v.BinlogCount, v.DeltaSize, v.DeltalogCount, v.DeltaRowCount, v.ExpireSize) } func (v *SegmentView) LevelZeroString() string { - return fmt.Sprintf("", - v.ID, v.Level.String(), v.DeltaSize, v.DeltalogCount) + return fmt.Sprintf("", + v.ID, v.Level.String(), v.DeltaSize, v.DeltalogCount, v.DeltaRowCount) } func GetBinlogCount(fieldBinlogs []*datapb.FieldBinlog) int { @@ -187,9 +194,19 @@ func GetBinlogCount(fieldBinlogs []*datapb.FieldBinlog) int { return num } -func GetBinlogSizeAsBytes(deltaBinlogs []*datapb.FieldBinlog) float64 { +func GetBinlogEntriesNum(fieldBinlogs []*datapb.FieldBinlog) int { + var num int + for _, fbinlog := range fieldBinlogs { + for _, binlog := range fbinlog.GetBinlogs() { + num += int(binlog.GetEntriesNum()) + } + } + return num +} + +func GetBinlogSizeAsBytes(fieldBinlogs []*datapb.FieldBinlog) float64 { var deltaSize float64 - for _, deltaLogs := range deltaBinlogs { + for _, deltaLogs := range fieldBinlogs { for _, l := range deltaLogs.GetBinlogs() { deltaSize += float64(l.GetMemorySize()) } diff --git a/internal/datanode/compaction/l0_compactor.go b/internal/datanode/compaction/l0_compactor.go index 984d7b20e498a..7cbd3487c8272 100644 --- a/internal/datanode/compaction/l0_compactor.go +++ b/internal/datanode/compaction/l0_compactor.go @@ -233,7 +233,7 @@ func (t *LevelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWr func (t *LevelZeroCompactionTask) splitDelta( ctx context.Context, - allDelta []*storage.DeleteData, + allDelta *storage.DeleteData, segmentBfs map[int64]*metacache.BloomFilterSet, ) map[int64]*SegmentDeltaWriter { traceCtx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta") @@ -252,9 +252,6 @@ func (t *LevelZeroCompactionTask) splitDelta( startIdx := value.StartIdx pk2SegmentIDs := value.Segment2Hits - pks := allDelta[value.DeleteDataIdx].Pks - tss := allDelta[value.DeleteDataIdx].Tss - for segmentID, hits := range pk2SegmentIDs { for i, hit := range hits { if hit { @@ -264,23 +261,21 @@ func (t *LevelZeroCompactionTask) splitDelta( writer = NewSegmentDeltaWriter(segmentID, segment.GetPartitionID(), segment.GetCollectionID()) targetSegBuffer[segmentID] = writer } - writer.Write(pks[startIdx+i], tss[startIdx+i]) + writer.Write(allDelta.Pks[startIdx+i], allDelta.Tss[startIdx+i]) } } } return true }) - return targetSegBuffer } type BatchApplyRet = struct { - DeleteDataIdx int - StartIdx int - Segment2Hits map[int64][]bool + StartIdx int + Segment2Hits map[int64][]bool } -func (t *LevelZeroCompactionTask) applyBFInParallel(ctx context.Context, deleteDatas []*storage.DeleteData, pool *conc.Pool[any], segmentBfs map[int64]*metacache.BloomFilterSet) *typeutil.ConcurrentMap[int, *BatchApplyRet] { +func (t *LevelZeroCompactionTask) applyBFInParallel(ctx context.Context, deltaData *storage.DeleteData, pool *conc.Pool[any], segmentBfs map[int64]*metacache.BloomFilterSet) *typeutil.ConcurrentMap[int, *BatchApplyRet] { _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact applyBFInParallel") defer span.End() batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt() @@ -298,32 +293,27 @@ func (t *LevelZeroCompactionTask) applyBFInParallel(ctx context.Context, deleteD retIdx := 0 retMap := typeutil.NewConcurrentMap[int, *BatchApplyRet]() var futures []*conc.Future[any] - for didx, data := range deleteDatas { - pks := data.Pks - for idx := 0; idx < len(pks); idx += batchSize { - startIdx := idx - endIdx := startIdx + batchSize - if endIdx > len(pks) { - endIdx = len(pks) - } + pks := deltaData.Pks + for idx := 0; idx < len(pks); idx += batchSize { + startIdx := idx + endIdx := startIdx + batchSize + if endIdx > len(pks) { + endIdx = len(pks) + } - retIdx += 1 - tmpRetIndex := retIdx - deleteDataId := didx - future := pool.Submit(func() (any, error) { - ret := batchPredict(pks[startIdx:endIdx]) - retMap.Insert(tmpRetIndex, &BatchApplyRet{ - DeleteDataIdx: deleteDataId, - StartIdx: startIdx, - Segment2Hits: ret, - }) - return nil, nil + retIdx += 1 + tmpRetIndex := retIdx + future := pool.Submit(func() (any, error) { + ret := batchPredict(pks[startIdx:endIdx]) + retMap.Insert(tmpRetIndex, &BatchApplyRet{ + StartIdx: startIdx, + Segment2Hits: ret, }) - futures = append(futures, future) - } + return nil, nil + }) + futures = append(futures, future) } conc.AwaitAll(futures...) - return retMap } @@ -333,7 +323,7 @@ func (t *LevelZeroCompactionTask) process(ctx context.Context, batchSize int, ta results := make([]*datapb.CompactionSegment, 0) batch := int(math.Ceil(float64(len(targetSegments)) / float64(batchSize))) - log := log.Ctx(t.ctx).With( + log := log.Ctx(ctx).With( zap.Int64("planID", t.plan.GetPlanID()), zap.Int("max conc segment counts", batchSize), zap.Int("total segment counts", len(targetSegments)), @@ -366,7 +356,10 @@ func (t *LevelZeroCompactionTask) process(ctx context.Context, batchSize int, ta return nil, err } - log.Info("L0 compaction finished one batch", zap.Int("batch no.", i), zap.Int("batch segment count", len(batchResults))) + log.Info("L0 compaction finished one batch", + zap.Int("batch no.", i), + zap.Int("total deltaRowCount", int(allDelta.RowCount)), + zap.Int("batch segment count", len(batchResults))) results = append(results, batchResults...) } @@ -374,27 +367,24 @@ func (t *LevelZeroCompactionTask) process(ctx context.Context, batchSize int, ta return results, nil } -func (t *LevelZeroCompactionTask) loadDelta(ctx context.Context, deltaLogs ...[]string) ([]*storage.DeleteData, error) { +func (t *LevelZeroCompactionTask) loadDelta(ctx context.Context, deltaLogs []string) (*storage.DeleteData, error) { _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact loadDelta") defer span.End() - allData := make([]*storage.DeleteData, 0, len(deltaLogs)) - for _, paths := range deltaLogs { - blobBytes, err := t.Download(ctx, paths) - if err != nil { - return nil, err - } - blobs := make([]*storage.Blob, 0, len(blobBytes)) - for _, blob := range blobBytes { - blobs = append(blobs, &storage.Blob{Value: blob}) - } - _, _, dData, err := storage.NewDeleteCodec().Deserialize(blobs) - if err != nil { - return nil, err - } - allData = append(allData, dData) + blobBytes, err := t.Download(ctx, deltaLogs) + if err != nil { + return nil, err + } + blobs := make([]*storage.Blob, 0, len(blobBytes)) + for _, blob := range blobBytes { + blobs = append(blobs, &storage.Blob{Value: blob}) + } + _, _, dData, err := storage.NewDeleteCodec().Deserialize(blobs) + if err != nil { + return nil, err } - return allData, nil + + return dData, nil } func (t *LevelZeroCompactionTask) loadBF(ctx context.Context, targetSegments []*datapb.CompactionSegmentBinlogs) (map[int64]*metacache.BloomFilterSet, error) { diff --git a/internal/datanode/compaction/l0_compactor_test.go b/internal/datanode/compaction/l0_compactor_test.go index 8f0a7b2f6aefc..040b4a7eb3ba2 100644 --- a/internal/datanode/compaction/l0_compactor_test.go +++ b/internal/datanode/compaction/l0_compactor_test.go @@ -480,7 +480,7 @@ func (s *LevelZeroCompactionTaskSuite) TestSplitDelta() { 101: bfs2, 102: bfs3, } - deltaWriters := s.task.splitDelta(context.TODO(), []*storage.DeleteData{s.dData}, segmentBFs) + deltaWriters := s.task.splitDelta(context.TODO(), s.dData, segmentBFs) s.NotEmpty(deltaWriters) s.ElementsMatch(predicted, lo.Keys(deltaWriters)) @@ -523,16 +523,16 @@ func (s *LevelZeroCompactionTaskSuite) TestLoadDelta() { } for _, test := range tests { - dDatas, err := s.task.loadDelta(ctx, test.paths) + dData, err := s.task.loadDelta(ctx, test.paths) if test.expectError { s.Error(err) } else { s.NoError(err) - s.NotEmpty(dDatas) - s.EqualValues(1, len(dDatas)) - s.ElementsMatch(s.dData.Pks, dDatas[0].Pks) - s.Equal(s.dData.RowCount, dDatas[0].RowCount) + s.NotEmpty(dData) + s.NotNil(dData) + s.ElementsMatch(s.dData.Pks, dData.Pks) + s.Equal(s.dData.RowCount, dData.RowCount) } } }