From 4dd0c54ca0f5ac9af76a9853107d00a5940914c6 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Thu, 6 Jun 2024 14:33:52 +0800 Subject: [PATCH] fix: Fix l0 compactor may cause DN from OOM (#33554) See also: #33547 --------- Signed-off-by: yangxuan --- internal/datanode/compaction/l0_compactor.go | 385 +++++++++++++++ .../{ => compaction}/l0_compactor_test.go | 243 +++------ .../datanode/compaction/segment_writer.go | 72 +++ internal/datanode/data_node.go | 2 - internal/datanode/data_sync_service.go | 148 +----- internal/datanode/io/binlog_io.go | 11 +- internal/datanode/io/binlog_io_test.go | 26 +- internal/datanode/{ => io}/io_pool.go | 13 +- internal/datanode/{ => io}/io_pool_test.go | 13 +- internal/datanode/io/mock_binlogio.go | 55 --- internal/datanode/l0_compactor.go | 460 ------------------ internal/datanode/services.go | 9 +- internal/datanode/util/load_stats.go | 166 +++++++ 13 files changed, 710 insertions(+), 893 deletions(-) create mode 100644 internal/datanode/compaction/l0_compactor.go rename internal/datanode/{ => compaction}/l0_compactor_test.go (70%) rename internal/datanode/{ => io}/io_pool.go (74%) rename internal/datanode/{ => io}/io_pool_test.go (61%) delete mode 100644 internal/datanode/l0_compactor.go create mode 100644 internal/datanode/util/load_stats.go diff --git a/internal/datanode/compaction/l0_compactor.go b/internal/datanode/compaction/l0_compactor.go new file mode 100644 index 0000000000000..2024308d877ef --- /dev/null +++ b/internal/datanode/compaction/l0_compactor.go @@ -0,0 +1,385 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compaction + +import ( + "context" + "fmt" + "math" + "sync" + + "github.com/cockroachdb/errors" + "github.com/samber/lo" + "go.opentelemetry.io/otel" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/datanode/allocator" + "github.com/milvus-io/milvus/internal/datanode/io" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/datanode/util" + "github.com/milvus-io/milvus/internal/metastore/kv/binlog" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/hardware" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/timerecord" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +type LevelZeroCompactionTask struct { + io.BinlogIO + allocator allocator.Allocator + cm storage.ChunkManager + + plan *datapb.CompactionPlan + + ctx context.Context + cancel context.CancelFunc + + done chan struct{} + tr *timerecord.TimeRecorder +} + +// make sure compactionTask implements compactor interface +var _ Compactor = (*LevelZeroCompactionTask)(nil) + +func NewLevelZeroCompactionTask( + ctx context.Context, + binlogIO io.BinlogIO, + alloc allocator.Allocator, + cm storage.ChunkManager, + plan *datapb.CompactionPlan, +) *LevelZeroCompactionTask { + ctx, cancel := context.WithCancel(ctx) + return &LevelZeroCompactionTask{ + ctx: ctx, + cancel: cancel, + + BinlogIO: binlogIO, + allocator: alloc, + cm: cm, + plan: plan, + tr: timerecord.NewTimeRecorder("levelzero compaction"), + done: make(chan struct{}, 1), + } +} + +func (t *LevelZeroCompactionTask) Complete() { + t.done <- struct{}{} +} + +func (t *LevelZeroCompactionTask) Stop() { + t.cancel() + <-t.done +} + +func (t *LevelZeroCompactionTask) GetPlanID() typeutil.UniqueID { + return t.plan.GetPlanID() +} + +func (t *LevelZeroCompactionTask) GetChannelName() string { + return t.plan.GetChannel() +} + +func (t *LevelZeroCompactionTask) GetCollection() int64 { + // The length of SegmentBinlogs is checked before task enqueueing. + return t.plan.GetSegmentBinlogs()[0].GetCollectionID() +} + +func (t *LevelZeroCompactionTask) Compact() (*datapb.CompactionPlanResult, error) { + ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(t.ctx, "L0Compact") + defer span.End() + log := log.Ctx(t.ctx).With(zap.Int64("planID", t.plan.GetPlanID()), zap.String("type", t.plan.GetType().String())) + log.Info("L0 compaction", zap.Duration("wait in queue elapse", t.tr.RecordSpan())) + + if !funcutil.CheckCtxValid(ctx) { + log.Warn("compact wrong, task context done or timeout") + return nil, ctx.Err() + } + + l0Segments := lo.Filter(t.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) bool { + return s.Level == datapb.SegmentLevel_L0 + }) + + targetSegments := lo.Filter(t.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) bool { + return s.Level != datapb.SegmentLevel_L0 + }) + if len(targetSegments) == 0 { + log.Warn("compact wrong, not target sealed segments") + return nil, errors.New("illegal compaction plan with empty target segments") + } + err := binlog.DecompressCompactionBinlogs(l0Segments) + if err != nil { + log.Warn("DecompressCompactionBinlogs failed", zap.Error(err)) + return nil, err + } + + var ( + totalSize int64 + totalDeltalogs = make(map[int64][]string) + ) + for _, s := range l0Segments { + paths := []string{} + for _, d := range s.GetDeltalogs() { + for _, l := range d.GetBinlogs() { + paths = append(paths, l.GetLogPath()) + totalSize += l.GetMemorySize() + } + } + if len(paths) > 0 { + totalDeltalogs[s.GetSegmentID()] = paths + } + } + + batchSize := getMaxBatchSize(totalSize) + resultSegments, err := t.process(ctx, batchSize, targetSegments, lo.Values(totalDeltalogs)...) + if err != nil { + return nil, err + } + + result := &datapb.CompactionPlanResult{ + PlanID: t.plan.GetPlanID(), + State: commonpb.CompactionState_Completed, + Segments: resultSegments, + Channel: t.plan.GetChannel(), + Type: t.plan.GetType(), + } + + metrics.DataNodeCompactionLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.plan.GetType().String()). + Observe(float64(t.tr.ElapseSpan().Milliseconds())) + log.Info("L0 compaction finished", zap.Duration("elapse", t.tr.ElapseSpan())) + + return result, nil +} + +// batch size means segment count +func getMaxBatchSize(totalSize int64) int { + max := 1 + memLimit := float64(hardware.GetFreeMemoryCount()) * paramtable.Get().DataNodeCfg.L0BatchMemoryRatio.GetAsFloat() + if memLimit > float64(totalSize) { + max = int(memLimit / float64(totalSize)) + } + + return max +} + +func (t *LevelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWriters map[int64]*SegmentDeltaWriter) ([]*datapb.CompactionSegment, error) { + allBlobs := make(map[string][]byte) + results := make([]*datapb.CompactionSegment, 0) + for segID, writer := range segmentWriters { + blob, tr, err := writer.Finish() + if err != nil { + log.Warn("L0 compaction serializeUpload serialize failed", zap.Error(err)) + return nil, err + } + + logID, err := t.allocator.AllocOne() + if err != nil { + log.Warn("L0 compaction serializeUpload alloc failed", zap.Error(err)) + return nil, err + } + + blobKey, _ := binlog.BuildLogPath(storage.DeleteBinlog, writer.collectionID, writer.partitionID, writer.segmentID, -1, logID) + + allBlobs[blobKey] = blob.GetValue() + deltalog := &datapb.Binlog{ + EntriesNum: writer.GetRowNum(), + LogSize: int64(len(blob.GetValue())), + MemorySize: blob.GetMemorySize(), + LogPath: blobKey, + LogID: logID, + TimestampFrom: tr.GetMinTimestamp(), + TimestampTo: tr.GetMaxTimestamp(), + } + + results = append(results, &datapb.CompactionSegment{ + SegmentID: segID, + Deltalogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{deltalog}}}, + Channel: t.plan.GetChannel(), + }) + } + + if len(allBlobs) == 0 { + return nil, nil + } + + if err := t.Upload(ctx, allBlobs); err != nil { + log.Warn("L0 compaction serializeUpload upload failed", zap.Error(err)) + return nil, err + } + + return results, nil +} + +func (t *LevelZeroCompactionTask) splitDelta( + ctx context.Context, + allDelta []*storage.DeleteData, + segmentBfs map[int64]*metacache.BloomFilterSet, +) map[int64]*SegmentDeltaWriter { + _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta") + defer span.End() + + split := func(pk storage.PrimaryKey) []int64 { + lc := storage.NewLocationsCache(pk) + predicts := make([]int64, 0, len(segmentBfs)) + for segmentID, bf := range segmentBfs { + if bf.PkExists(lc) { + predicts = append(predicts, segmentID) + } + } + return predicts + } + + allSeg := lo.Associate(t.plan.GetSegmentBinlogs(), func(segment *datapb.CompactionSegmentBinlogs) (int64, *datapb.CompactionSegmentBinlogs) { + return segment.GetSegmentID(), segment + }) + + // spilt all delete data to segments + targetSegBuffer := make(map[int64]*SegmentDeltaWriter) + for _, delta := range allDelta { + for i, pk := range delta.Pks { + predicted := split(pk) + + for _, gotSeg := range predicted { + writer, ok := targetSegBuffer[gotSeg] + if !ok { + segment := allSeg[gotSeg] + writer = NewSegmentDeltaWriter(gotSeg, segment.GetPartitionID(), segment.GetCollectionID()) + targetSegBuffer[gotSeg] = writer + } + writer.Write(pk, delta.Tss[i]) + } + } + } + + return targetSegBuffer +} + +func (t *LevelZeroCompactionTask) process(ctx context.Context, batchSize int, targetSegments []*datapb.CompactionSegmentBinlogs, deltaLogs ...[]string) ([]*datapb.CompactionSegment, error) { + _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact process") + defer span.End() + + results := make([]*datapb.CompactionSegment, 0) + batch := int(math.Ceil(float64(len(targetSegments)) / float64(batchSize))) + log := log.Ctx(t.ctx).With( + zap.Int64("planID", t.plan.GetPlanID()), + zap.Int("max conc segment counts", batchSize), + zap.Int("total segment counts", len(targetSegments)), + zap.Int("total batch", batch), + ) + + log.Info("L0 compaction process start") + allDelta, err := t.loadDelta(ctx, lo.Flatten(deltaLogs)) + if err != nil { + log.Warn("L0 compaction loadDelta fail", zap.Error(err)) + return nil, err + } + + for i := 0; i < batch; i++ { + left, right := i*batchSize, (i+1)*batchSize + if right >= len(targetSegments) { + right = len(targetSegments) + } + batchSegments := targetSegments[left:right] + segmentBFs, err := t.loadBF(ctx, batchSegments) + if err != nil { + log.Warn("L0 compaction loadBF fail", zap.Error(err)) + return nil, err + } + + batchSegWriter := t.splitDelta(ctx, allDelta, segmentBFs) + batchResults, err := t.serializeUpload(ctx, batchSegWriter) + if err != nil { + log.Warn("L0 compaction serialize upload fail", zap.Error(err)) + return nil, err + } + + log.Info("L0 compaction finished one batch", zap.Int("batch no.", i), zap.Int("batch segment count", len(batchResults))) + results = append(results, batchResults...) + } + + log.Info("L0 compaction process done") + return results, nil +} + +func (t *LevelZeroCompactionTask) loadDelta(ctx context.Context, deltaLogs ...[]string) ([]*storage.DeleteData, error) { + _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact loadDelta") + defer span.End() + allData := make([]*storage.DeleteData, 0, len(deltaLogs)) + for _, paths := range deltaLogs { + blobBytes, err := t.Download(ctx, paths) + if err != nil { + return nil, err + } + blobs := make([]*storage.Blob, 0, len(blobBytes)) + for _, blob := range blobBytes { + blobs = append(blobs, &storage.Blob{Value: blob}) + } + _, _, dData, err := storage.NewDeleteCodec().Deserialize(blobs) + if err != nil { + return nil, err + } + + allData = append(allData, dData) + } + return allData, nil +} + +func (t *LevelZeroCompactionTask) loadBF(ctx context.Context, targetSegments []*datapb.CompactionSegmentBinlogs) (map[int64]*metacache.BloomFilterSet, error) { + _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact loadBF") + defer span.End() + + var ( + futures = make([]*conc.Future[any], 0, len(targetSegments)) + pool = io.GetOrCreateStatsPool() + + mu = &sync.Mutex{} + bfs = make(map[int64]*metacache.BloomFilterSet) + ) + + for _, segment := range targetSegments { + segment := segment + innerCtx := ctx + future := pool.Submit(func() (any, error) { + _ = binlog.DecompressBinLog(storage.StatsBinlog, segment.GetCollectionID(), + segment.GetPartitionID(), segment.GetSegmentID(), segment.GetField2StatslogPaths()) + pks, err := util.LoadStats(innerCtx, t.cm, + t.plan.GetSchema(), segment.GetSegmentID(), segment.GetField2StatslogPaths()) + if err != nil { + log.Warn("failed to load segment stats log", + zap.Int64("planID", t.plan.GetPlanID()), + zap.String("type", t.plan.GetType().String()), + zap.Error(err)) + return err, err + } + bf := metacache.NewBloomFilterSet(pks...) + mu.Lock() + defer mu.Unlock() + bfs[segment.GetSegmentID()] = bf + return nil, nil + }) + futures = append(futures, future) + } + + err := conc.AwaitAll(futures...) + return bfs, err +} diff --git a/internal/datanode/l0_compactor_test.go b/internal/datanode/compaction/l0_compactor_test.go similarity index 70% rename from internal/datanode/l0_compactor_test.go rename to internal/datanode/compaction/l0_compactor_test.go index 8c833df21f69c..8f0a7b2f6aefc 100644 --- a/internal/datanode/l0_compactor_test.go +++ b/internal/datanode/compaction/l0_compactor_test.go @@ -14,11 +14,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -package datanode +package compaction import ( "context" - "path" "testing" "github.com/cockroachdb/errors" @@ -30,16 +29,15 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/datanode/io" - iter "github.com/milvus-io/milvus/internal/datanode/iterators" "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) func TestLevelZeroCompactionTaskSuite(t *testing.T) { @@ -51,17 +49,18 @@ type LevelZeroCompactionTaskSuite struct { mockBinlogIO *io.MockBinlogIO mockAlloc *allocator.MockAllocator - task *levelZeroCompactionTask + task *LevelZeroCompactionTask dData *storage.DeleteData dBlob []byte } func (s *LevelZeroCompactionTaskSuite) SetupTest() { + paramtable.Init() s.mockAlloc = allocator.NewMockAllocator(s.T()) s.mockBinlogIO = io.NewMockBinlogIO(s.T()) // plan of the task is unset - s.task = newLevelZeroCompactionTask(context.Background(), s.mockBinlogIO, s.mockAlloc, nil, nil) + s.task = NewLevelZeroCompactionTask(context.Background(), s.mockBinlogIO, s.mockAlloc, nil, nil) pk2ts := map[int64]uint64{ 1: 20000, @@ -69,7 +68,7 @@ func (s *LevelZeroCompactionTaskSuite) SetupTest() { 3: 20002, } - s.dData = storage.NewDeleteData([]storage.PrimaryKey{}, []Timestamp{}) + s.dData = storage.NewDeleteData([]storage.PrimaryKey{}, []typeutil.Timestamp{}) for pk, ts := range pk2ts { s.dData.Append(storage.NewInt64PrimaryKey(pk), ts) } @@ -80,7 +79,7 @@ func (s *LevelZeroCompactionTaskSuite) SetupTest() { s.dBlob = blob.GetValue() } -func (s *LevelZeroCompactionTaskSuite) TestLinearBatchLoadDeltaFail() { +func (s *LevelZeroCompactionTaskSuite) TestProcessLoadDeltaFail() { plan := &datapb.CompactionPlan{ PlanID: 19530, Type: datapb.CompactionType_Level0DeleteCompaction, @@ -110,23 +109,19 @@ func (s *LevelZeroCompactionTaskSuite) TestLinearBatchLoadDeltaFail() { s.task.plan = plan s.task.tr = timerecord.NewTimeRecorder("test") - s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return(nil, errors.New("mock download fail")).Twice() + s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return(nil, errors.New("mock download fail")).Once() targetSegments := lo.Filter(plan.SegmentBinlogs, func(s *datapb.CompactionSegmentBinlogs, _ int) bool { return s.Level == datapb.SegmentLevel_L1 }) deltaLogs := map[int64][]string{100: {"a/b/c1"}} - segments, err := s.task.linearProcess(context.Background(), targetSegments, deltaLogs) - s.Error(err) - s.Empty(segments) - - segments, err = s.task.batchProcess(context.Background(), targetSegments, lo.Values(deltaLogs)...) + segments, err := s.task.process(context.Background(), 1, targetSegments, lo.Values(deltaLogs)...) s.Error(err) s.Empty(segments) } -func (s *LevelZeroCompactionTaskSuite) TestLinearBatchUploadByCheckFail() { +func (s *LevelZeroCompactionTaskSuite) TestProcessUploadByCheckFail() { plan := &datapb.CompactionPlan{ PlanID: 19530, Type: datapb.CompactionType_Level0DeleteCompaction, @@ -173,7 +168,7 @@ func (s *LevelZeroCompactionTaskSuite) TestLinearBatchUploadByCheckFail() { cm.EXPECT().MultiRead(mock.Anything, mock.Anything).Return([][]byte{sw.GetBuffer()}, nil) s.task.cm = cm - s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Times(2) + s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Once() mockAlloc := allocator.NewMockAllocator(s.T()) mockAlloc.EXPECT().AllocOne().Return(0, errors.New("mock alloc err")) s.task.allocator = mockAlloc @@ -183,11 +178,7 @@ func (s *LevelZeroCompactionTaskSuite) TestLinearBatchUploadByCheckFail() { }) deltaLogs := map[int64][]string{100: {"a/b/c1"}} - segments, err := s.task.linearProcess(context.Background(), targetSegments, deltaLogs) - s.Error(err) - s.Empty(segments) - - segments, err = s.task.batchProcess(context.Background(), targetSegments, lo.Values(deltaLogs)...) + segments, err := s.task.process(context.Background(), 2, targetSegments, lo.Values(deltaLogs)...) s.Error(err) s.Empty(segments) } @@ -266,14 +257,9 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() { cm.EXPECT().MultiRead(mock.Anything, mock.Anything).Return([][]byte{sw.GetBuffer()}, nil) s.task.cm = cm - s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Times(2) - + s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Times(1) + s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Twice() s.mockAlloc.EXPECT().AllocOne().Return(19530, nil).Times(2) - s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything). - RunAndReturn(func(paths ...string) string { - return path.Join(paths...) - }).Times(2) - s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Once() s.Require().Equal(plan.GetPlanID(), s.task.GetPlanID()) s.Require().Equal(plan.GetChannel(), s.task.GetChannelName()) @@ -286,7 +272,7 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() { targetSegments := lo.Filter(s.task.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) bool { return s.Level == datapb.SegmentLevel_L1 }) - totalDeltalogs := make(map[UniqueID][]string) + totalDeltalogs := make(map[int64][]string) for _, s := range l0Segments { paths := []string{} @@ -299,7 +285,7 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() { totalDeltalogs[s.GetSegmentID()] = paths } } - segments, err := s.task.linearProcess(context.Background(), targetSegments, totalDeltalogs) + segments, err := s.task.process(context.Background(), 1, targetSegments, lo.Values(totalDeltalogs)...) s.NoError(err) s.NotEmpty(segments) s.Equal(2, len(segments)) @@ -380,13 +366,8 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() { cm.EXPECT().MultiRead(mock.Anything, mock.Anything).Return([][]byte{sw.GetBuffer()}, nil) s.task.cm = cm - s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Once() - s.mockAlloc.EXPECT().AllocOne().Return(19530, nil).Times(2) - s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything). - RunAndReturn(func(paths ...string) string { - return path.Join(paths...) - }).Times(2) + s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Once() s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Once() l0Segments := lo.Filter(s.task.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) bool { @@ -396,7 +377,7 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() { targetSegments := lo.Filter(s.task.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) bool { return s.Level == datapb.SegmentLevel_L1 }) - totalDeltalogs := make(map[UniqueID][]string) + totalDeltalogs := make(map[int64][]string) for _, s := range l0Segments { paths := []string{} @@ -409,7 +390,7 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() { totalDeltalogs[s.GetSegmentID()] = paths } } - segments, err := s.task.batchProcess(context.TODO(), targetSegments, lo.Values(totalDeltalogs)...) + segments, err := s.task.process(context.TODO(), 2, targetSegments, lo.Values(totalDeltalogs)...) s.NoError(err) s.NotEmpty(segments) s.Equal(2, len(segments)) @@ -424,9 +405,8 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() { log.Info("test segment results", zap.Any("result", segments)) } -func (s *LevelZeroCompactionTaskSuite) TestUploadByCheck() { +func (s *LevelZeroCompactionTaskSuite) TestSerializeUpload() { ctx := context.Background() - plan := &datapb.CompactionPlan{ SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ { @@ -435,136 +415,57 @@ func (s *LevelZeroCompactionTaskSuite) TestUploadByCheck() { }, } - s.Run("uploadByCheck directly composeDeltalog failed", func() { + s.Run("serializeUpload allocator Alloc failed", func() { s.SetupTest() s.task.plan = plan mockAlloc := allocator.NewMockAllocator(s.T()) mockAlloc.EXPECT().AllocOne().Return(0, errors.New("mock alloc err")) s.task.allocator = mockAlloc - segments := map[int64]*storage.DeleteData{100: s.dData} - results := make(map[int64]*datapb.CompactionSegment) - err := s.task.uploadByCheck(ctx, false, segments, results) + + writer := NewSegmentDeltaWriter(100, 10, 1) + writer.WriteBatch(s.dData.Pks, s.dData.Tss) + writers := map[int64]*SegmentDeltaWriter{100: writer} + + result, err := s.task.serializeUpload(ctx, writers) s.Error(err) - s.Equal(0, len(results)) + s.Equal(0, len(result)) }) - s.Run("uploadByCheck directly Upload failed", func() { + s.Run("serializeUpload Upload failed", func() { s.SetupTest() s.task.plan = plan s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(errors.New("mock upload failed")) - + writer := NewSegmentDeltaWriter(100, 10, 1) + writer.WriteBatch(s.dData.Pks, s.dData.Tss) + writers := map[int64]*SegmentDeltaWriter{100: writer} s.mockAlloc.EXPECT().AllocOne().Return(19530, nil) - blobKey := metautil.JoinIDPath(1, 10, 100, 19530) - blobPath := path.Join(common.SegmentDeltaLogPath, blobKey) - s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything).Return(blobPath) - segments := map[int64]*storage.DeleteData{100: s.dData} - results := make(map[int64]*datapb.CompactionSegment) - err := s.task.uploadByCheck(ctx, false, segments, results) + results, err := s.task.serializeUpload(ctx, writers) s.Error(err) s.Equal(0, len(results)) }) - s.Run("upload directly", func() { + s.Run("upload success", func() { s.SetupTest() s.task.plan = plan s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil) s.mockAlloc.EXPECT().AllocOne().Return(19530, nil) - blobKey := metautil.JoinIDPath(1, 10, 100, 19530) - blobPath := path.Join(common.SegmentDeltaLogPath, blobKey) - s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything).Return(blobPath) - segments := map[int64]*storage.DeleteData{100: s.dData} - results := make(map[int64]*datapb.CompactionSegment) - err := s.task.uploadByCheck(ctx, false, segments, results) - s.NoError(err) - s.Equal(1, len(results)) - - seg1, ok := results[100] - s.True(ok) - s.EqualValues(100, seg1.GetSegmentID()) - s.Equal(1, len(seg1.GetDeltalogs())) - s.Equal(1, len(seg1.GetDeltalogs()[0].GetBinlogs())) - }) + writer := NewSegmentDeltaWriter(100, 10, 1) + writer.WriteBatch(s.dData.Pks, s.dData.Tss) + writers := map[int64]*SegmentDeltaWriter{100: writer} - s.Run("check without upload", func() { - s.SetupTest() - segments := map[int64]*storage.DeleteData{100: s.dData} - results := make(map[int64]*datapb.CompactionSegment) - s.Require().Empty(results) - - err := s.task.uploadByCheck(ctx, true, segments, results) + results, err := s.task.serializeUpload(ctx, writers) s.NoError(err) - s.Empty(results) - }) - - s.Run("check with upload", func() { - s.task.plan = plan - blobKey := metautil.JoinIDPath(1, 10, 100, 19530) - blobPath := path.Join(common.SegmentDeltaLogPath, blobKey) - - s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil) - - s.mockAlloc.EXPECT().AllocOne().Return(19530, nil) - s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything).Return(blobPath) - - segments := map[int64]*storage.DeleteData{100: s.dData} - results := map[int64]*datapb.CompactionSegment{ - 100: {SegmentID: 100, Deltalogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{LogID: 1}}}}}, - } - s.Require().Equal(1, len(results)) - - paramtable.Get().Save(paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.Key, "1") - defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.Key) - err := s.task.uploadByCheck(ctx, true, segments, results) - s.NoError(err) - s.NotEmpty(results) s.Equal(1, len(results)) - seg1, ok := results[100] - s.True(ok) + seg1 := results[0] s.EqualValues(100, seg1.GetSegmentID()) s.Equal(1, len(seg1.GetDeltalogs())) - s.Equal(2, len(seg1.GetDeltalogs()[0].GetBinlogs())) + s.Equal(1, len(seg1.GetDeltalogs()[0].GetBinlogs())) }) } -func (s *LevelZeroCompactionTaskSuite) TestComposeDeltalog() { - plan := &datapb.CompactionPlan{ - SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ - { - SegmentID: 100, - }, - { - SegmentID: 101, - }, - }, - } - s.task.plan = plan - - s.mockAlloc.EXPECT().AllocOne().Return(19530, nil) - - blobKey := metautil.JoinIDPath(1, 10, 100, 19530) - blobPath := path.Join(common.SegmentDeltaLogPath, blobKey) - s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything).Return(blobPath) - - kvs, binlog, err := s.task.composeDeltalog(100, s.dData) - s.NoError(err) - s.Equal(1, len(kvs)) - v, ok := kvs[blobPath] - s.True(ok) - s.NotNil(v) - s.Equal(blobPath, binlog.LogPath) - - kvs, _, err = s.task.composeDeltalog(101, s.dData) - s.NoError(err) - s.Equal(1, len(kvs)) - v, ok = kvs[blobPath] - s.True(ok) - s.NotNil(v) - s.Equal(blobPath, binlog.LogPath) -} - func (s *LevelZeroCompactionTaskSuite) TestSplitDelta() { bfs1 := metacache.NewBloomFilterSetWithBatchSize(100) bfs1.UpdatePKRange(&storage.Int64FieldData{Data: []int64{1, 3}}) @@ -574,27 +475,22 @@ func (s *LevelZeroCompactionTaskSuite) TestSplitDelta() { bfs3.UpdatePKRange(&storage.Int64FieldData{Data: []int64{3}}) predicted := []int64{100, 101, 102} - - diter := iter.NewDeltalogIterator([][]byte{s.dBlob}, nil) - s.Require().NotNil(diter) - - targetSegBuffer := make(map[int64]*storage.DeleteData) segmentBFs := map[int64]*metacache.BloomFilterSet{ 100: bfs1, 101: bfs2, 102: bfs3, } - s.task.splitDelta(context.TODO(), []*iter.DeltalogIterator{diter}, targetSegBuffer, segmentBFs) + deltaWriters := s.task.splitDelta(context.TODO(), []*storage.DeleteData{s.dData}, segmentBFs) - s.NotEmpty(targetSegBuffer) - s.ElementsMatch(predicted, lo.Keys(targetSegBuffer)) - s.EqualValues(2, targetSegBuffer[100].RowCount) - s.EqualValues(1, targetSegBuffer[101].RowCount) - s.EqualValues(1, targetSegBuffer[102].RowCount) + s.NotEmpty(deltaWriters) + s.ElementsMatch(predicted, lo.Keys(deltaWriters)) + s.EqualValues(2, deltaWriters[100].GetRowNum()) + s.EqualValues(1, deltaWriters[101].GetRowNum()) + s.EqualValues(1, deltaWriters[102].GetRowNum()) - s.ElementsMatch([]storage.PrimaryKey{storage.NewInt64PrimaryKey(1), storage.NewInt64PrimaryKey(3)}, targetSegBuffer[100].Pks) - s.Equal(storage.NewInt64PrimaryKey(3), targetSegBuffer[101].Pks[0]) - s.Equal(storage.NewInt64PrimaryKey(3), targetSegBuffer[102].Pks[0]) + s.ElementsMatch([]storage.PrimaryKey{storage.NewInt64PrimaryKey(1), storage.NewInt64PrimaryKey(3)}, deltaWriters[100].deleteData.Pks) + s.Equal(storage.NewInt64PrimaryKey(3), deltaWriters[101].deleteData.Pks[0]) + s.Equal(storage.NewInt64PrimaryKey(3), deltaWriters[102].deleteData.Pks[0]) } func (s *LevelZeroCompactionTaskSuite) TestLoadDelta() { @@ -619,47 +515,24 @@ func (s *LevelZeroCompactionTaskSuite) TestLoadDelta() { description string paths []string - expectNilIter bool - expectError bool + expectError bool }{ - {"no error", []string{"correct"}, false, false}, - {"download error", []string{"error"}, true, true}, - {"new iter error", []string{"invalid-blobs"}, true, false}, + {"no error", []string{"correct"}, false}, + {"download error", []string{"error"}, true}, + {"deserialize error", []string{"invalid-blobs"}, true}, } for _, test := range tests { - iters, err := s.task.loadDelta(ctx, test.paths) - if test.expectNilIter { - if len(iters) > 0 { - for _, iter := range iters { - s.False(iter.HasNext()) - } - } else { - s.Nil(iters) - } - } else { - s.NotNil(iters) - s.Equal(1, len(iters)) - s.True(iters[0].HasNext()) - - iter := iters[0] - var pks []storage.PrimaryKey - var tss []storage.Timestamp - for iter.HasNext() { - labeled, err := iter.Next() - s.NoError(err) - pks = append(pks, labeled.GetPk()) - tss = append(tss, labeled.GetTimestamp()) - } - - s.ElementsMatch(pks, s.dData.Pks) - s.ElementsMatch(tss, s.dData.Tss) - } + dDatas, err := s.task.loadDelta(ctx, test.paths) if test.expectError { s.Error(err) } else { s.NoError(err) + s.NotEmpty(dDatas) + s.EqualValues(1, len(dDatas)) + s.ElementsMatch(s.dData.Pks, dDatas[0].Pks) + s.Equal(s.dData.RowCount, dDatas[0].RowCount) } } } diff --git a/internal/datanode/compaction/segment_writer.go b/internal/datanode/compaction/segment_writer.go index 3d458aad9974e..0628fc4d662e5 100644 --- a/internal/datanode/compaction/segment_writer.go +++ b/internal/datanode/compaction/segment_writer.go @@ -19,6 +19,78 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) +func NewSegmentDeltaWriter(segmentID, partitionID, collectionID int64) *SegmentDeltaWriter { + return &SegmentDeltaWriter{ + deleteData: &storage.DeleteData{}, + segmentID: segmentID, + partitionID: partitionID, + collectionID: collectionID, + tsFrom: math.MaxUint64, + tsTo: 0, + } +} + +type SegmentDeltaWriter struct { + deleteData *storage.DeleteData + segmentID int64 + partitionID int64 + collectionID int64 + + tsFrom typeutil.Timestamp + tsTo typeutil.Timestamp +} + +func (w *SegmentDeltaWriter) GetCollectionID() int64 { + return w.collectionID +} + +func (w *SegmentDeltaWriter) GetPartitionID() int64 { + return w.partitionID +} + +func (w *SegmentDeltaWriter) GetSegmentID() int64 { + return w.segmentID +} + +func (w *SegmentDeltaWriter) GetRowNum() int64 { + return w.deleteData.RowCount +} + +func (w *SegmentDeltaWriter) GetTimeRange() *writebuffer.TimeRange { + return writebuffer.NewTimeRange(w.tsFrom, w.tsTo) +} + +func (w *SegmentDeltaWriter) updateRange(ts typeutil.Timestamp) { + if ts < w.tsFrom { + w.tsFrom = ts + } + if ts > w.tsTo { + w.tsTo = ts + } +} + +func (w *SegmentDeltaWriter) Write(pk storage.PrimaryKey, ts typeutil.Timestamp) { + w.deleteData.Append(pk, ts) + w.updateRange(ts) +} + +func (w *SegmentDeltaWriter) WriteBatch(pks []storage.PrimaryKey, tss []typeutil.Timestamp) { + w.deleteData.AppendBatch(pks, tss) + + for _, ts := range tss { + w.updateRange(ts) + } +} + +func (w *SegmentDeltaWriter) Finish() (*storage.Blob, *writebuffer.TimeRange, error) { + blob, err := storage.NewDeleteCodec().Serialize(w.collectionID, w.partitionID, w.segmentID, w.deleteData) + if err != nil { + return nil, nil, err + } + + return blob, w.GetTimeRange(), nil +} + type SegmentWriter struct { writer *storage.SerializeWriter[*storage.Value] closers []func() (*storage.Blob, error) diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 135f501fe746c..922ba5cd70545 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -299,8 +299,6 @@ func (node *DataNode) Init() error { } else { node.eventManager = NewEventManager() } - node.pool = getOrCreateIOPool() - log.Info("init datanode done", zap.String("Address", node.address)) }) return initError diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 1a3ff514bb5c6..73dcd166e5433 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -19,18 +19,16 @@ package datanode import ( "context" "fmt" - "path" "sync" - "time" - "go.opentelemetry.io/otel" "go.uber.org/zap" - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/datanode/broker" + "github.com/milvus-io/milvus/internal/datanode/io" "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/syncmgr" + "github.com/milvus-io/milvus/internal/datanode/util" "github.com/milvus-io/milvus/internal/datanode/writebuffer" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/querycoordv2/params" @@ -158,13 +156,13 @@ func initMetaCache(initCtx context.Context, storageV2Cache *metacache.StorageV2C ) segment := item - future := getOrCreateIOPool().Submit(func() (any, error) { + future := io.GetOrCreateStatsPool().Submit(func() (any, error) { var stats []*storage.PkStatistics var err error if params.Params.CommonCfg.EnableStorageV2.GetAsBool() { - stats, err = loadStatsV2(storageV2Cache, segment, info.GetSchema()) + stats, err = util.LoadStatsV2(storageV2Cache, segment, info.GetSchema()) } else { - stats, err = loadStats(initCtx, chunkManager, info.GetSchema(), segment.GetID(), segment.GetStatslogs()) + stats, err = util.LoadStats(initCtx, chunkManager, info.GetSchema(), segment.GetID(), segment.GetStatslogs()) } if err != nil { return nil, err @@ -199,142 +197,6 @@ func initMetaCache(initCtx context.Context, storageV2Cache *metacache.StorageV2C return metacache, nil } -func loadStatsV2(storageCache *metacache.StorageV2Cache, segment *datapb.SegmentInfo, schema *schemapb.CollectionSchema) ([]*storage.PkStatistics, error) { - space, err := storageCache.GetOrCreateSpace(segment.ID, syncmgr.SpaceCreatorFunc(segment.ID, schema, storageCache.ArrowSchema())) - if err != nil { - return nil, err - } - - getResult := func(stats []*storage.PrimaryKeyStats) []*storage.PkStatistics { - result := make([]*storage.PkStatistics, 0, len(stats)) - for _, stat := range stats { - pkStat := &storage.PkStatistics{ - PkFilter: stat.BF, - MinPK: stat.MinPk, - MaxPK: stat.MaxPk, - } - result = append(result, pkStat) - } - return result - } - - blobs := space.StatisticsBlobs() - deserBlobs := make([]*Blob, 0) - for _, b := range blobs { - if b.Name == storage.CompoundStatsType.LogIdx() { - blobData := make([]byte, b.Size) - _, err = space.ReadBlob(b.Name, blobData) - if err != nil { - return nil, err - } - stats, err := storage.DeserializeStatsList(&Blob{Value: blobData}) - if err != nil { - return nil, err - } - return getResult(stats), nil - } - } - - for _, b := range blobs { - blobData := make([]byte, b.Size) - _, err = space.ReadBlob(b.Name, blobData) - if err != nil { - return nil, err - } - deserBlobs = append(deserBlobs, &Blob{Value: blobData}) - } - stats, err := storage.DeserializeStats(deserBlobs) - if err != nil { - return nil, err - } - return getResult(stats), nil -} - -func loadStats(ctx context.Context, chunkManager storage.ChunkManager, schema *schemapb.CollectionSchema, segmentID int64, statsBinlogs []*datapb.FieldBinlog) ([]*storage.PkStatistics, error) { - _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "loadStats") - defer span.End() - - startTs := time.Now() - log := log.Ctx(ctx).With(zap.Int64("segmentID", segmentID)) - log.Info("begin to init pk bloom filter", zap.Int("statsBinLogsLen", len(statsBinlogs))) - - pkField, err := typeutil.GetPrimaryFieldSchema(schema) - if err != nil { - return nil, err - } - - // filter stats binlog files which is pk field stats log - bloomFilterFiles := []string{} - logType := storage.DefaultStatsType - - for _, binlog := range statsBinlogs { - if binlog.FieldID != pkField.GetFieldID() { - continue - } - Loop: - for _, log := range binlog.GetBinlogs() { - _, logidx := path.Split(log.GetLogPath()) - // if special status log exist - // only load one file - switch logidx { - case storage.CompoundStatsType.LogIdx(): - bloomFilterFiles = []string{log.GetLogPath()} - logType = storage.CompoundStatsType - break Loop - default: - bloomFilterFiles = append(bloomFilterFiles, log.GetLogPath()) - } - } - } - - // no stats log to parse, initialize a new BF - if len(bloomFilterFiles) == 0 { - log.Warn("no stats files to load") - return nil, nil - } - - // read historical PK filter - values, err := chunkManager.MultiRead(ctx, bloomFilterFiles) - if err != nil { - log.Warn("failed to load bloom filter files", zap.Error(err)) - return nil, err - } - blobs := make([]*Blob, 0) - for i := 0; i < len(values); i++ { - blobs = append(blobs, &Blob{Value: values[i]}) - } - - var stats []*storage.PrimaryKeyStats - if logType == storage.CompoundStatsType { - stats, err = storage.DeserializeStatsList(blobs[0]) - if err != nil { - log.Warn("failed to deserialize stats list", zap.Error(err)) - return nil, err - } - } else { - stats, err = storage.DeserializeStats(blobs) - if err != nil { - log.Warn("failed to deserialize stats", zap.Error(err)) - return nil, err - } - } - - var size uint - result := make([]*storage.PkStatistics, 0, len(stats)) - for _, stat := range stats { - pkStat := &storage.PkStatistics{ - PkFilter: stat.BF, - MinPK: stat.MinPk, - MaxPK: stat.MaxPk, - } - size += stat.BF.Cap() - result = append(result, pkStat) - } - - log.Info("Successfully load pk stats", zap.Any("time", time.Since(startTs)), zap.Uint("size", size)) - return result, nil -} - func getServiceWithChannel(initCtx context.Context, node *DataNode, info *datapb.ChannelWatchInfo, metacache metacache.MetaCache, storageV2Cache *metacache.StorageV2Cache, unflushed, flushed []*datapb.SegmentInfo) (*dataSyncService, error) { var ( channelName = info.GetVchan().GetChannelName() diff --git a/internal/datanode/io/binlog_io.go b/internal/datanode/io/binlog_io.go index 317f267978132..61a2ccf97615f 100644 --- a/internal/datanode/io/binlog_io.go +++ b/internal/datanode/io/binlog_io.go @@ -18,7 +18,6 @@ package io import ( "context" - "path" "github.com/samber/lo" "go.opentelemetry.io/otel" @@ -34,8 +33,6 @@ import ( type BinlogIO interface { Download(ctx context.Context, paths []string) ([][]byte, error) Upload(ctx context.Context, kvs map[string][]byte) error - // JoinFullPath returns the full path by join the paths with the chunkmanager's rootpath - JoinFullPath(paths ...string) string } type BinlogIoImpl struct { @@ -43,8 +40,8 @@ type BinlogIoImpl struct { pool *conc.Pool[any] } -func NewBinlogIO(cm storage.ChunkManager, ioPool *conc.Pool[any]) BinlogIO { - return &BinlogIoImpl{cm, ioPool} +func NewBinlogIO(cm storage.ChunkManager) BinlogIO { + return &BinlogIoImpl{cm, GetOrCreateIOPool()} } func (b *BinlogIoImpl) Download(ctx context.Context, paths []string) ([][]byte, error) { @@ -106,7 +103,3 @@ func (b *BinlogIoImpl) Upload(ctx context.Context, kvs map[string][]byte) error return conc.AwaitAll(futures...) } - -func (b *BinlogIoImpl) JoinFullPath(paths ...string) string { - return path.Join(b.ChunkManager.RootPath(), path.Join(paths...)) -} diff --git a/internal/datanode/io/binlog_io_test.go b/internal/datanode/io/binlog_io_test.go index 70ad89b69b5fc..df5cc6fbe1605 100644 --- a/internal/datanode/io/binlog_io_test.go +++ b/internal/datanode/io/binlog_io_test.go @@ -9,7 +9,7 @@ import ( "golang.org/x/net/context" "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) const binlogIOTestDir = "/tmp/milvus_test/binlog_io" @@ -26,11 +26,10 @@ type BinlogIOSuite struct { } func (s *BinlogIOSuite) SetupTest() { - pool := conc.NewDefaultPool[any]() - + paramtable.Init() s.cm = storage.NewLocalChunkManager(storage.RootPath(binlogIOTestDir)) - s.b = NewBinlogIO(s.cm, pool) + s.b = NewBinlogIO(s.cm) } func (s *BinlogIOSuite) TeardownTest() { @@ -52,22 +51,3 @@ func (s *BinlogIOSuite) TestUploadDownload() { s.NoError(err) s.ElementsMatch(lo.Values(kvs), vs) } - -func (s *BinlogIOSuite) TestJoinFullPath() { - tests := []struct { - description string - inPaths []string - outPath string - }{ - {"no input", nil, path.Join(binlogIOTestDir)}, - {"input one", []string{"a"}, path.Join(binlogIOTestDir, "a")}, - {"input two", []string{"a", "b"}, path.Join(binlogIOTestDir, "a/b")}, - } - - for _, test := range tests { - s.Run(test.description, func() { - out := s.b.JoinFullPath(test.inPaths...) - s.Equal(test.outPath, out) - }) - } -} diff --git a/internal/datanode/io_pool.go b/internal/datanode/io/io_pool.go similarity index 74% rename from internal/datanode/io_pool.go rename to internal/datanode/io/io_pool.go index 892012a0d975e..334fc623f5191 100644 --- a/internal/datanode/io_pool.go +++ b/internal/datanode/io/io_pool.go @@ -1,10 +1,11 @@ -package datanode +package io import ( "sync" "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/hardware" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) var ( @@ -18,7 +19,7 @@ var ( ) func initIOPool() { - capacity := Params.DataNodeCfg.IOConcurrency.GetAsInt() + capacity := paramtable.Get().DataNodeCfg.IOConcurrency.GetAsInt() if capacity > 32 { capacity = 32 } @@ -26,26 +27,26 @@ func initIOPool() { ioPool = conc.NewPool[any](capacity) } -func getOrCreateIOPool() *conc.Pool[any] { +func GetOrCreateIOPool() *conc.Pool[any] { ioPoolInitOnce.Do(initIOPool) return ioPool } func initStatsPool() { - poolSize := Params.DataNodeCfg.ChannelWorkPoolSize.GetAsInt() + poolSize := paramtable.Get().DataNodeCfg.ChannelWorkPoolSize.GetAsInt() if poolSize <= 0 { poolSize = hardware.GetCPUNum() } statsPool = conc.NewPool[any](poolSize, conc.WithPreAlloc(false), conc.WithNonBlocking(false)) } -func getOrCreateStatsPool() *conc.Pool[any] { +func GetOrCreateStatsPool() *conc.Pool[any] { statsPoolInitOnce.Do(initStatsPool) return statsPool } func initMultiReadPool() { - capacity := Params.DataNodeCfg.FileReadConcurrency.GetAsInt() + capacity := paramtable.Get().DataNodeCfg.FileReadConcurrency.GetAsInt() if capacity > hardware.GetCPUNum() { capacity = hardware.GetCPUNum() } diff --git a/internal/datanode/io_pool_test.go b/internal/datanode/io/io_pool_test.go similarity index 61% rename from internal/datanode/io_pool_test.go rename to internal/datanode/io/io_pool_test.go index 20abbcbeca070..d6fb981000615 100644 --- a/internal/datanode/io_pool_test.go +++ b/internal/datanode/io/io_pool_test.go @@ -1,4 +1,4 @@ -package datanode +package io import ( "sync" @@ -10,10 +10,11 @@ import ( "github.com/milvus-io/milvus/pkg/util/paramtable" ) -func Test_getOrCreateIOPool(t *testing.T) { - ioConcurrency := Params.DataNodeCfg.IOConcurrency.GetValue() - paramtable.Get().Save(Params.DataNodeCfg.IOConcurrency.Key, "64") - defer func() { Params.Save(Params.DataNodeCfg.IOConcurrency.Key, ioConcurrency) }() +func TestGetOrCreateIOPool(t *testing.T) { + paramtable.Init() + ioConcurrency := paramtable.Get().DataNodeCfg.IOConcurrency.GetValue() + paramtable.Get().Save(paramtable.Get().DataNodeCfg.IOConcurrency.Key, "64") + defer func() { paramtable.Get().Save(paramtable.Get().DataNodeCfg.IOConcurrency.Key, ioConcurrency) }() nP := 10 nTask := 10 wg := sync.WaitGroup{} @@ -21,7 +22,7 @@ func Test_getOrCreateIOPool(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - p := getOrCreateIOPool() + p := GetOrCreateIOPool() futures := make([]*conc.Future[any], 0, nTask) for j := 0; j < nTask; j++ { future := p.Submit(func() (interface{}, error) { diff --git a/internal/datanode/io/mock_binlogio.go b/internal/datanode/io/mock_binlogio.go index 4202a7ed55679..b0132f16299a7 100644 --- a/internal/datanode/io/mock_binlogio.go +++ b/internal/datanode/io/mock_binlogio.go @@ -76,61 +76,6 @@ func (_c *MockBinlogIO_Download_Call) RunAndReturn(run func(context.Context, []s return _c } -// JoinFullPath provides a mock function with given fields: paths -func (_m *MockBinlogIO) JoinFullPath(paths ...string) string { - _va := make([]interface{}, len(paths)) - for _i := range paths { - _va[_i] = paths[_i] - } - var _ca []interface{} - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) - - var r0 string - if rf, ok := ret.Get(0).(func(...string) string); ok { - r0 = rf(paths...) - } else { - r0 = ret.Get(0).(string) - } - - return r0 -} - -// MockBinlogIO_JoinFullPath_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'JoinFullPath' -type MockBinlogIO_JoinFullPath_Call struct { - *mock.Call -} - -// JoinFullPath is a helper method to define mock.On call -// - paths ...string -func (_e *MockBinlogIO_Expecter) JoinFullPath(paths ...interface{}) *MockBinlogIO_JoinFullPath_Call { - return &MockBinlogIO_JoinFullPath_Call{Call: _e.mock.On("JoinFullPath", - append([]interface{}{}, paths...)...)} -} - -func (_c *MockBinlogIO_JoinFullPath_Call) Run(run func(paths ...string)) *MockBinlogIO_JoinFullPath_Call { - _c.Call.Run(func(args mock.Arguments) { - variadicArgs := make([]string, len(args)-0) - for i, a := range args[0:] { - if a != nil { - variadicArgs[i] = a.(string) - } - } - run(variadicArgs...) - }) - return _c -} - -func (_c *MockBinlogIO_JoinFullPath_Call) Return(_a0 string) *MockBinlogIO_JoinFullPath_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockBinlogIO_JoinFullPath_Call) RunAndReturn(run func(...string) string) *MockBinlogIO_JoinFullPath_Call { - _c.Call.Return(run) - return _c -} - // Upload provides a mock function with given fields: ctx, kvs func (_m *MockBinlogIO) Upload(ctx context.Context, kvs map[string][]byte) error { ret := _m.Called(ctx, kvs) diff --git a/internal/datanode/l0_compactor.go b/internal/datanode/l0_compactor.go deleted file mode 100644 index f3367b639feb5..0000000000000 --- a/internal/datanode/l0_compactor.go +++ /dev/null @@ -1,460 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package datanode - -import ( - "context" - "fmt" - "math" - "sync" - "time" - - "github.com/cockroachdb/errors" - "github.com/samber/lo" - "go.opentelemetry.io/otel" - "go.uber.org/zap" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus/internal/datanode/allocator" - "github.com/milvus-io/milvus/internal/datanode/compaction" - "github.com/milvus-io/milvus/internal/datanode/io" - iter "github.com/milvus-io/milvus/internal/datanode/iterators" - "github.com/milvus-io/milvus/internal/datanode/metacache" - "github.com/milvus-io/milvus/internal/metastore/kv/binlog" - "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/pkg/common" - "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/metrics" - "github.com/milvus-io/milvus/pkg/util/conc" - "github.com/milvus-io/milvus/pkg/util/funcutil" - "github.com/milvus-io/milvus/pkg/util/hardware" - "github.com/milvus-io/milvus/pkg/util/merr" - "github.com/milvus-io/milvus/pkg/util/metautil" - "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/timerecord" - "github.com/milvus-io/milvus/pkg/util/typeutil" -) - -type levelZeroCompactionTask struct { - io.BinlogIO - allocator allocator.Allocator - cm storage.ChunkManager - - plan *datapb.CompactionPlan - - ctx context.Context - cancel context.CancelFunc - - done chan struct{} - tr *timerecord.TimeRecorder -} - -// make sure compactionTask implements compactor interface -var _ compaction.Compactor = (*levelZeroCompactionTask)(nil) - -func newLevelZeroCompactionTask( - ctx context.Context, - binlogIO io.BinlogIO, - alloc allocator.Allocator, - cm storage.ChunkManager, - plan *datapb.CompactionPlan, -) *levelZeroCompactionTask { - ctx, cancel := context.WithCancel(ctx) - return &levelZeroCompactionTask{ - ctx: ctx, - cancel: cancel, - - BinlogIO: binlogIO, - allocator: alloc, - cm: cm, - plan: plan, - tr: timerecord.NewTimeRecorder("levelzero compaction"), - done: make(chan struct{}, 1), - } -} - -func (t *levelZeroCompactionTask) Complete() { - t.done <- struct{}{} -} - -func (t *levelZeroCompactionTask) Stop() { - t.cancel() - <-t.done -} - -func (t *levelZeroCompactionTask) GetPlanID() UniqueID { - return t.plan.GetPlanID() -} - -func (t *levelZeroCompactionTask) GetChannelName() string { - return t.plan.GetChannel() -} - -func (t *levelZeroCompactionTask) GetCollection() int64 { - // The length of SegmentBinlogs is checked before task enqueueing. - return t.plan.GetSegmentBinlogs()[0].GetCollectionID() -} - -func (t *levelZeroCompactionTask) Compact() (*datapb.CompactionPlanResult, error) { - ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(t.ctx, "L0Compact") - defer span.End() - log := log.Ctx(t.ctx).With(zap.Int64("planID", t.plan.GetPlanID()), zap.String("type", t.plan.GetType().String())) - log.Info("L0 compaction", zap.Duration("wait in queue elapse", t.tr.RecordSpan())) - - if !funcutil.CheckCtxValid(ctx) { - log.Warn("compact wrong, task context done or timeout") - return nil, ctx.Err() - } - - ctxTimeout, cancelAll := context.WithTimeout(ctx, time.Duration(t.plan.GetTimeoutInSeconds())*time.Second) - defer cancelAll() - - l0Segments := lo.Filter(t.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) bool { - return s.Level == datapb.SegmentLevel_L0 - }) - - targetSegments := lo.Filter(t.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) bool { - return s.Level != datapb.SegmentLevel_L0 - }) - if len(targetSegments) == 0 { - log.Warn("compact wrong, not target sealed segments") - return nil, errors.New("illegal compaction plan with empty target segments") - } - err := binlog.DecompressCompactionBinlogs(l0Segments) - if err != nil { - log.Warn("DecompressCompactionBinlogs failed", zap.Error(err)) - return nil, err - } - - var ( - totalSize int64 - totalDeltalogs = make(map[UniqueID][]string) - ) - for _, s := range l0Segments { - paths := []string{} - for _, d := range s.GetDeltalogs() { - for _, l := range d.GetBinlogs() { - paths = append(paths, l.GetLogPath()) - totalSize += l.GetMemorySize() - } - } - if len(paths) > 0 { - totalDeltalogs[s.GetSegmentID()] = paths - } - } - - var resultSegments []*datapb.CompactionSegment - - if float64(hardware.GetFreeMemoryCount())*paramtable.Get().DataNodeCfg.L0BatchMemoryRatio.GetAsFloat() < float64(totalSize) { - resultSegments, err = t.linearProcess(ctxTimeout, targetSegments, totalDeltalogs) - } else { - resultSegments, err = t.batchProcess(ctxTimeout, targetSegments, lo.Values(totalDeltalogs)...) - } - if err != nil { - return nil, err - } - - result := &datapb.CompactionPlanResult{ - PlanID: t.plan.GetPlanID(), - State: commonpb.CompactionState_Completed, - Segments: resultSegments, - Channel: t.plan.GetChannel(), - Type: t.plan.GetType(), - } - - metrics.DataNodeCompactionLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), t.plan.GetType().String()). - Observe(float64(t.tr.ElapseSpan().Milliseconds())) - log.Info("L0 compaction finished", zap.Duration("elapse", t.tr.ElapseSpan())) - - return result, nil -} - -func (t *levelZeroCompactionTask) linearProcess(ctx context.Context, targetSegments []*datapb.CompactionSegmentBinlogs, totalDeltalogs map[int64][]string) ([]*datapb.CompactionSegment, error) { - log := log.Ctx(t.ctx).With( - zap.Int64("planID", t.plan.GetPlanID()), - zap.String("type", t.plan.GetType().String()), - zap.Int("target segment counts", len(targetSegments)), - ) - - // just for logging - targetSegmentIDs := lo.Map(targetSegments, func(segment *datapb.CompactionSegmentBinlogs, _ int) int64 { - return segment.GetSegmentID() - }) - - var ( - resultSegments = make(map[int64]*datapb.CompactionSegment) - alteredSegments = make(map[int64]*storage.DeleteData) - ) - - segmentBFs, err := t.loadBF(ctx, targetSegments) - if err != nil { - return nil, err - } - for segID, deltaLogs := range totalDeltalogs { - log := log.With(zap.Int64("levelzero segment", segID)) - - log.Info("Linear L0 compaction start processing segment") - allIters, err := t.loadDelta(ctx, deltaLogs) - if err != nil { - log.Warn("Linear L0 compaction loadDelta fail", zap.Int64s("target segments", targetSegmentIDs), zap.Error(err)) - return nil, err - } - - t.splitDelta(ctx, allIters, alteredSegments, segmentBFs) - - err = t.uploadByCheck(ctx, true, alteredSegments, resultSegments) - if err != nil { - log.Warn("Linear L0 compaction upload buffer fail", zap.Int64s("target segments", targetSegmentIDs), zap.Error(err)) - return nil, err - } - } - - err = t.uploadByCheck(ctx, false, alteredSegments, resultSegments) - if err != nil { - log.Warn("Linear L0 compaction upload all buffer fail", zap.Int64s("target segment", targetSegmentIDs), zap.Error(err)) - return nil, err - } - log.Info("Linear L0 compaction finished", zap.Duration("elapse", t.tr.RecordSpan())) - return lo.Values(resultSegments), nil -} - -func (t *levelZeroCompactionTask) batchProcess(ctx context.Context, targetSegments []*datapb.CompactionSegmentBinlogs, deltaLogs ...[]string) ([]*datapb.CompactionSegment, error) { - log := log.Ctx(t.ctx).With( - zap.Int64("planID", t.plan.GetPlanID()), - zap.String("type", t.plan.GetType().String()), - zap.Int("target segment counts", len(targetSegments)), - ) - - // just for logging - targetSegmentIDs := lo.Map(targetSegments, func(segment *datapb.CompactionSegmentBinlogs, _ int) int64 { - return segment.GetSegmentID() - }) - - log.Info("Batch L0 compaction start processing") - resultSegments := make(map[int64]*datapb.CompactionSegment) - - iters, err := t.loadDelta(ctx, lo.Flatten(deltaLogs)) - if err != nil { - log.Warn("Batch L0 compaction loadDelta fail", zap.Int64s("target segments", targetSegmentIDs), zap.Error(err)) - return nil, err - } - - segmentBFs, err := t.loadBF(ctx, targetSegments) - if err != nil { - return nil, err - } - - alteredSegments := make(map[int64]*storage.DeleteData) - t.splitDelta(ctx, iters, alteredSegments, segmentBFs) - - err = t.uploadByCheck(ctx, false, alteredSegments, resultSegments) - if err != nil { - log.Warn("Batch L0 compaction upload fail", zap.Int64s("target segments", targetSegmentIDs), zap.Error(err)) - return nil, err - } - log.Info("Batch L0 compaction finished", zap.Duration("elapse", t.tr.RecordSpan())) - return lo.Values(resultSegments), nil -} - -func (t *levelZeroCompactionTask) loadDelta(ctx context.Context, deltaLogs ...[]string) ([]*iter.DeltalogIterator, error) { - allIters := make([]*iter.DeltalogIterator, 0) - - for _, paths := range deltaLogs { - blobs, err := t.Download(ctx, paths) - if err != nil { - return nil, err - } - - allIters = append(allIters, iter.NewDeltalogIterator(blobs, nil)) - } - return allIters, nil -} - -func (t *levelZeroCompactionTask) splitDelta( - ctx context.Context, - allIters []*iter.DeltalogIterator, - targetSegBuffer map[int64]*storage.DeleteData, - segmentBfs map[int64]*metacache.BloomFilterSet, -) { - _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta") - defer span.End() - - split := func(pk storage.PrimaryKey) []int64 { - lc := storage.NewLocationsCache(pk) - predicts := make([]int64, 0, len(segmentBfs)) - for segmentID, bf := range segmentBfs { - if bf.PkExists(lc) { - predicts = append(predicts, segmentID) - } - } - return predicts - } - - // spilt all delete data to segments - for _, deltaIter := range allIters { - for deltaIter.HasNext() { - // checked by HasNext, no error here - labeled, _ := deltaIter.Next() - - predicted := split(labeled.GetPk()) - - for _, gotSeg := range predicted { - delBuffer, ok := targetSegBuffer[gotSeg] - if !ok { - delBuffer = &storage.DeleteData{} - targetSegBuffer[gotSeg] = delBuffer - } - - delBuffer.Append(labeled.GetPk(), labeled.GetTimestamp()) - } - } - } -} - -func (t *levelZeroCompactionTask) composeDeltalog(segmentID int64, dData *storage.DeleteData) (map[string][]byte, *datapb.Binlog, error) { - segment, ok := lo.Find(t.plan.GetSegmentBinlogs(), func(segment *datapb.CompactionSegmentBinlogs) bool { - return segment.GetSegmentID() == segmentID - }) - if !ok { - return nil, nil, merr.WrapErrSegmentNotFound(segmentID, "cannot find segment in compaction plan") - } - - var ( - collectionID = segment.GetCollectionID() - partitionID = segment.GetPartitionID() - uploadKv = make(map[string][]byte) - ) - - blob, err := storage.NewDeleteCodec().Serialize(collectionID, partitionID, segmentID, dData) - if err != nil { - return nil, nil, err - } - - logID, err := t.allocator.AllocOne() - if err != nil { - return nil, nil, err - } - - blobKey := metautil.JoinIDPath(collectionID, partitionID, segmentID, logID) - blobPath := t.BinlogIO.JoinFullPath(common.SegmentDeltaLogPath, blobKey) - - uploadKv[blobPath] = blob.GetValue() - - minTs := uint64(math.MaxUint64) - maxTs := uint64(0) - for _, ts := range dData.Tss { - if ts > maxTs { - maxTs = ts - } - if ts < minTs { - minTs = ts - } - } - - deltalog := &datapb.Binlog{ - EntriesNum: dData.RowCount, - LogSize: int64(len(blob.GetValue())), - LogPath: blobPath, - LogID: logID, - TimestampFrom: minTs, - TimestampTo: maxTs, - MemorySize: dData.Size(), - } - - return uploadKv, deltalog, nil -} - -func (t *levelZeroCompactionTask) uploadByCheck(ctx context.Context, requireCheck bool, alteredSegments map[int64]*storage.DeleteData, resultSegments map[int64]*datapb.CompactionSegment) error { - allBlobs := make(map[string][]byte) - tmpResults := make(map[int64]*datapb.CompactionSegment) - for segID, dData := range alteredSegments { - if !requireCheck || (dData.Size() >= paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64()) { - blobs, binlog, err := t.composeDeltalog(segID, dData) - if err != nil { - log.Warn("L0 compaction composeDelta fail", zap.Int64("segmentID", segID), zap.Error(err)) - return err - } - allBlobs = lo.Assign(blobs, allBlobs) - tmpResults[segID] = &datapb.CompactionSegment{ - SegmentID: segID, - Deltalogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{binlog}}}, - Channel: t.plan.GetChannel(), - } - delete(alteredSegments, segID) - } - } - - if len(allBlobs) == 0 { - return nil - } - - if err := t.Upload(ctx, allBlobs); err != nil { - log.Warn("L0 compaction upload blobs fail", zap.Error(err)) - return err - } - - for segID, compSeg := range tmpResults { - if _, ok := resultSegments[segID]; !ok { - resultSegments[segID] = compSeg - } else { - binlog := compSeg.Deltalogs[0].Binlogs[0] - resultSegments[segID].Deltalogs[0].Binlogs = append(resultSegments[segID].Deltalogs[0].Binlogs, binlog) - } - } - - return nil -} - -func (t *levelZeroCompactionTask) loadBF(ctx context.Context, targetSegments []*datapb.CompactionSegmentBinlogs) (map[int64]*metacache.BloomFilterSet, error) { - _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact loadBF") - defer span.End() - - var ( - futures = make([]*conc.Future[any], 0, len(targetSegments)) - pool = getOrCreateStatsPool() - - mu = &sync.Mutex{} - bfs = make(map[int64]*metacache.BloomFilterSet) - ) - - for _, segment := range targetSegments { - segment := segment - innerCtx := ctx - future := pool.Submit(func() (any, error) { - _ = binlog.DecompressBinLog(storage.StatsBinlog, segment.GetCollectionID(), - segment.GetPartitionID(), segment.GetSegmentID(), segment.GetField2StatslogPaths()) - pks, err := loadStats(innerCtx, t.cm, t.plan.GetSchema(), segment.GetSegmentID(), segment.GetField2StatslogPaths()) - if err != nil { - log.Warn("failed to load segment stats log", - zap.Int64("planID", t.plan.GetPlanID()), - zap.String("type", t.plan.GetType().String()), - zap.Error(err)) - return err, err - } - bf := metacache.NewBloomFilterSet(pks...) - mu.Lock() - defer mu.Unlock() - bfs[segment.GetSegmentID()] = bf - return nil, nil - }) - futures = append(futures, future) - } - - err := conc.AwaitAll(futures...) - return bfs, err -} diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 53d81fb6206c6..5a5e7a297688d 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus/internal/datanode/importv2" "github.com/milvus-io/milvus/internal/datanode/io" "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/datanode/util" "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" @@ -219,10 +220,10 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan taskCtx := tracer.Propagate(ctx, node.ctx) var task compaction.Compactor - binlogIO := io.NewBinlogIO(node.chunkManager, getOrCreateIOPool()) + binlogIO := io.NewBinlogIO(node.chunkManager) switch req.GetType() { case datapb.CompactionType_Level0DeleteCompaction: - task = newLevelZeroCompactionTask( + task = compaction.NewLevelZeroCompactionTask( taskCtx, binlogIO, node.allocator, @@ -310,7 +311,7 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments for _, segID := range missingSegments { segID := segID - future := node.pool.Submit(func() (any, error) { + future := io.GetOrCreateStatsPool().Submit(func() (any, error) { newSeg := req.GetSegmentInfos()[segID] var val *metacache.BloomFilterSet var err error @@ -319,7 +320,7 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments log.Warn("failed to DecompressBinLog", zap.Error(err)) return val, err } - pks, err := loadStats(ctx, node.chunkManager, ds.metacache.Schema(), newSeg.GetSegmentId(), []*datapb.FieldBinlog{newSeg.GetPkStatsLog()}) + pks, err := util.LoadStats(ctx, node.chunkManager, ds.metacache.Schema(), newSeg.GetSegmentId(), []*datapb.FieldBinlog{newSeg.GetPkStatsLog()}) if err != nil { log.Warn("failed to load segment stats log", zap.Error(err)) return val, err diff --git a/internal/datanode/util/load_stats.go b/internal/datanode/util/load_stats.go new file mode 100644 index 0000000000000..f0932329b86c6 --- /dev/null +++ b/internal/datanode/util/load_stats.go @@ -0,0 +1,166 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "context" + "path" + "time" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/datanode/syncmgr" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +func LoadStats(ctx context.Context, chunkManager storage.ChunkManager, schema *schemapb.CollectionSchema, segmentID int64, statsBinlogs []*datapb.FieldBinlog) ([]*storage.PkStatistics, error) { + startTs := time.Now() + log := log.With(zap.Int64("segmentID", segmentID)) + log.Info("begin to init pk bloom filter", zap.Int("statsBinLogsLen", len(statsBinlogs))) + + pkField, err := typeutil.GetPrimaryFieldSchema(schema) + if err != nil { + return nil, err + } + + // filter stats binlog files which is pk field stats log + bloomFilterFiles := []string{} + logType := storage.DefaultStatsType + + for _, binlog := range statsBinlogs { + if binlog.FieldID != pkField.GetFieldID() { + continue + } + Loop: + for _, log := range binlog.GetBinlogs() { + _, logidx := path.Split(log.GetLogPath()) + // if special status log exist + // only load one file + switch logidx { + case storage.CompoundStatsType.LogIdx(): + bloomFilterFiles = []string{log.GetLogPath()} + logType = storage.CompoundStatsType + break Loop + default: + bloomFilterFiles = append(bloomFilterFiles, log.GetLogPath()) + } + } + } + + // no stats log to parse, initialize a new BF + if len(bloomFilterFiles) == 0 { + log.Warn("no stats files to load") + return nil, nil + } + + // read historical PK filter + values, err := chunkManager.MultiRead(ctx, bloomFilterFiles) + if err != nil { + log.Warn("failed to load bloom filter files", zap.Error(err)) + return nil, err + } + blobs := make([]*storage.Blob, 0) + for i := 0; i < len(values); i++ { + blobs = append(blobs, &storage.Blob{Value: values[i]}) + } + + var stats []*storage.PrimaryKeyStats + if logType == storage.CompoundStatsType { + stats, err = storage.DeserializeStatsList(blobs[0]) + if err != nil { + log.Warn("failed to deserialize stats list", zap.Error(err)) + return nil, err + } + } else { + stats, err = storage.DeserializeStats(blobs) + if err != nil { + log.Warn("failed to deserialize stats", zap.Error(err)) + return nil, err + } + } + + var size uint + result := make([]*storage.PkStatistics, 0, len(stats)) + for _, stat := range stats { + pkStat := &storage.PkStatistics{ + PkFilter: stat.BF, + MinPK: stat.MinPk, + MaxPK: stat.MaxPk, + } + size += stat.BF.Cap() + result = append(result, pkStat) + } + + log.Info("Successfully load pk stats", zap.Any("time", time.Since(startTs)), zap.Uint("size", size)) + return result, nil +} + +func LoadStatsV2(storageCache *metacache.StorageV2Cache, segment *datapb.SegmentInfo, schema *schemapb.CollectionSchema) ([]*storage.PkStatistics, error) { + space, err := storageCache.GetOrCreateSpace(segment.ID, syncmgr.SpaceCreatorFunc(segment.ID, schema, storageCache.ArrowSchema())) + if err != nil { + return nil, err + } + + getResult := func(stats []*storage.PrimaryKeyStats) []*storage.PkStatistics { + result := make([]*storage.PkStatistics, 0, len(stats)) + for _, stat := range stats { + pkStat := &storage.PkStatistics{ + PkFilter: stat.BF, + MinPK: stat.MinPk, + MaxPK: stat.MaxPk, + } + result = append(result, pkStat) + } + return result + } + + blobs := space.StatisticsBlobs() + deserBlobs := make([]*storage.Blob, 0) + for _, b := range blobs { + if b.Name == storage.CompoundStatsType.LogIdx() { + blobData := make([]byte, b.Size) + _, err = space.ReadBlob(b.Name, blobData) + if err != nil { + return nil, err + } + stats, err := storage.DeserializeStatsList(&storage.Blob{Value: blobData}) + if err != nil { + return nil, err + } + return getResult(stats), nil + } + } + + for _, b := range blobs { + blobData := make([]byte, b.Size) + _, err = space.ReadBlob(b.Name, blobData) + if err != nil { + return nil, err + } + deserBlobs = append(deserBlobs, &storage.Blob{Value: blobData}) + } + stats, err := storage.DeserializeStats(deserBlobs) + if err != nil { + return nil, err + } + return getResult(stats), nil +}