From 0bbf61b3f0a1ad2ee23aad0e41eab7dfe2436758 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Mon, 3 Jun 2024 10:19:49 +0800 Subject: [PATCH] enhance: Add more tracing for l0 compactor (#33435) Signed-off-by: yangxuan --- internal/datanode/data_sync_service.go | 6 +++++- internal/datanode/l0_compactor.go | 21 +++++++++++---------- internal/datanode/l0_compactor_test.go | 4 ++-- 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index ca744d239f1bf..1a3ff514bb5c6 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -23,6 +23,7 @@ import ( "sync" "time" + "go.opentelemetry.io/otel" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" @@ -250,8 +251,11 @@ func loadStatsV2(storageCache *metacache.StorageV2Cache, segment *datapb.Segment } func loadStats(ctx context.Context, chunkManager storage.ChunkManager, schema *schemapb.CollectionSchema, segmentID int64, statsBinlogs []*datapb.FieldBinlog) ([]*storage.PkStatistics, error) { + _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "loadStats") + defer span.End() + startTs := time.Now() - log := log.With(zap.Int64("segmentID", segmentID)) + log := log.Ctx(ctx).With(zap.Int64("segmentID", segmentID)) log.Info("begin to init pk bloom filter", zap.Int("statsBinLogsLen", len(statsBinlogs))) pkField, err := typeutil.GetPrimaryFieldSchema(schema) diff --git a/internal/datanode/l0_compactor.go b/internal/datanode/l0_compactor.go index f04cc280c5c41..f3367b639feb5 100644 --- a/internal/datanode/l0_compactor.go +++ b/internal/datanode/l0_compactor.go @@ -201,7 +201,7 @@ func (t *levelZeroCompactionTask) linearProcess(ctx context.Context, targetSegme alteredSegments = make(map[int64]*storage.DeleteData) ) - segmentBFs, err := t.loadBF(targetSegments) + segmentBFs, err := t.loadBF(ctx, targetSegments) if err != nil { return nil, err } @@ -254,7 +254,7 @@ func (t *levelZeroCompactionTask) batchProcess(ctx context.Context, targetSegmen return nil, err } - segmentBFs, err := t.loadBF(targetSegments) + segmentBFs, err := t.loadBF(ctx, targetSegments) if err != nil { return nil, err } @@ -420,11 +420,9 @@ func (t *levelZeroCompactionTask) uploadByCheck(ctx context.Context, requireChec return nil } -func (t *levelZeroCompactionTask) loadBF(targetSegments []*datapb.CompactionSegmentBinlogs) (map[int64]*metacache.BloomFilterSet, error) { - log := log.Ctx(t.ctx).With( - zap.Int64("planID", t.plan.GetPlanID()), - zap.String("type", t.plan.GetType().String()), - ) +func (t *levelZeroCompactionTask) loadBF(ctx context.Context, targetSegments []*datapb.CompactionSegmentBinlogs) (map[int64]*metacache.BloomFilterSet, error) { + _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact loadBF") + defer span.End() var ( futures = make([]*conc.Future[any], 0, len(targetSegments)) @@ -436,13 +434,16 @@ func (t *levelZeroCompactionTask) loadBF(targetSegments []*datapb.CompactionSegm for _, segment := range targetSegments { segment := segment + innerCtx := ctx future := pool.Submit(func() (any, error) { _ = binlog.DecompressBinLog(storage.StatsBinlog, segment.GetCollectionID(), segment.GetPartitionID(), segment.GetSegmentID(), segment.GetField2StatslogPaths()) - pks, err := loadStats(t.ctx, t.cm, - t.plan.GetSchema(), segment.GetSegmentID(), segment.GetField2StatslogPaths()) + pks, err := loadStats(innerCtx, t.cm, t.plan.GetSchema(), segment.GetSegmentID(), segment.GetField2StatslogPaths()) if err != nil { - log.Warn("failed to load segment stats log", zap.Error(err)) + log.Warn("failed to load segment stats log", + zap.Int64("planID", t.plan.GetPlanID()), + zap.String("type", t.plan.GetType().String()), + zap.Error(err)) return err, err } bf := metacache.NewBloomFilterSet(pks...) diff --git a/internal/datanode/l0_compactor_test.go b/internal/datanode/l0_compactor_test.go index 9aad1fb685443..8c833df21f69c 100644 --- a/internal/datanode/l0_compactor_test.go +++ b/internal/datanode/l0_compactor_test.go @@ -698,7 +698,7 @@ func (s *LevelZeroCompactionTaskSuite) TestLoadBF() { cm.EXPECT().MultiRead(mock.Anything, mock.Anything).Return([][]byte{sw.GetBuffer()}, nil) s.task.cm = cm - bfs, err := s.task.loadBF(plan.SegmentBinlogs) + bfs, err := s.task.loadBF(context.Background(), plan.SegmentBinlogs) s.NoError(err) s.Len(bfs, 1) @@ -733,7 +733,7 @@ func (s *LevelZeroCompactionTaskSuite) TestFailed() { s.task.plan = plan - _, err := s.task.loadBF(plan.SegmentBinlogs) + _, err := s.task.loadBF(context.Background(), plan.SegmentBinlogs) s.Error(err) })