Skip to content

Commit

Permalink
enhance: Add more tracing for l0 compactor (milvus-io#33435)
Browse files Browse the repository at this point in the history
Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn authored and yellow-shine committed Jul 2, 2024
1 parent 0858fe4 commit 0bbf61b
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 13 deletions.
6 changes: 5 additions & 1 deletion internal/datanode/data_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"
"time"

"go.opentelemetry.io/otel"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
Expand Down Expand Up @@ -250,8 +251,11 @@ func loadStatsV2(storageCache *metacache.StorageV2Cache, segment *datapb.Segment
}

func loadStats(ctx context.Context, chunkManager storage.ChunkManager, schema *schemapb.CollectionSchema, segmentID int64, statsBinlogs []*datapb.FieldBinlog) ([]*storage.PkStatistics, error) {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "loadStats")
defer span.End()

startTs := time.Now()
log := log.With(zap.Int64("segmentID", segmentID))
log := log.Ctx(ctx).With(zap.Int64("segmentID", segmentID))
log.Info("begin to init pk bloom filter", zap.Int("statsBinLogsLen", len(statsBinlogs)))

pkField, err := typeutil.GetPrimaryFieldSchema(schema)
Expand Down
21 changes: 11 additions & 10 deletions internal/datanode/l0_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (t *levelZeroCompactionTask) linearProcess(ctx context.Context, targetSegme
alteredSegments = make(map[int64]*storage.DeleteData)
)

segmentBFs, err := t.loadBF(targetSegments)
segmentBFs, err := t.loadBF(ctx, targetSegments)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -254,7 +254,7 @@ func (t *levelZeroCompactionTask) batchProcess(ctx context.Context, targetSegmen
return nil, err
}

segmentBFs, err := t.loadBF(targetSegments)
segmentBFs, err := t.loadBF(ctx, targetSegments)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -420,11 +420,9 @@ func (t *levelZeroCompactionTask) uploadByCheck(ctx context.Context, requireChec
return nil
}

func (t *levelZeroCompactionTask) loadBF(targetSegments []*datapb.CompactionSegmentBinlogs) (map[int64]*metacache.BloomFilterSet, error) {
log := log.Ctx(t.ctx).With(
zap.Int64("planID", t.plan.GetPlanID()),
zap.String("type", t.plan.GetType().String()),
)
func (t *levelZeroCompactionTask) loadBF(ctx context.Context, targetSegments []*datapb.CompactionSegmentBinlogs) (map[int64]*metacache.BloomFilterSet, error) {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact loadBF")
defer span.End()

var (
futures = make([]*conc.Future[any], 0, len(targetSegments))
Expand All @@ -436,13 +434,16 @@ func (t *levelZeroCompactionTask) loadBF(targetSegments []*datapb.CompactionSegm

for _, segment := range targetSegments {
segment := segment
innerCtx := ctx
future := pool.Submit(func() (any, error) {
_ = binlog.DecompressBinLog(storage.StatsBinlog, segment.GetCollectionID(),
segment.GetPartitionID(), segment.GetSegmentID(), segment.GetField2StatslogPaths())
pks, err := loadStats(t.ctx, t.cm,
t.plan.GetSchema(), segment.GetSegmentID(), segment.GetField2StatslogPaths())
pks, err := loadStats(innerCtx, t.cm, t.plan.GetSchema(), segment.GetSegmentID(), segment.GetField2StatslogPaths())
if err != nil {
log.Warn("failed to load segment stats log", zap.Error(err))
log.Warn("failed to load segment stats log",
zap.Int64("planID", t.plan.GetPlanID()),
zap.String("type", t.plan.GetType().String()),
zap.Error(err))
return err, err
}
bf := metacache.NewBloomFilterSet(pks...)
Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/l0_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ func (s *LevelZeroCompactionTaskSuite) TestLoadBF() {
cm.EXPECT().MultiRead(mock.Anything, mock.Anything).Return([][]byte{sw.GetBuffer()}, nil)
s.task.cm = cm

bfs, err := s.task.loadBF(plan.SegmentBinlogs)
bfs, err := s.task.loadBF(context.Background(), plan.SegmentBinlogs)
s.NoError(err)

s.Len(bfs, 1)
Expand Down Expand Up @@ -733,7 +733,7 @@ func (s *LevelZeroCompactionTaskSuite) TestFailed() {

s.task.plan = plan

_, err := s.task.loadBF(plan.SegmentBinlogs)
_, err := s.task.loadBF(context.Background(), plan.SegmentBinlogs)
s.Error(err)
})

Expand Down

0 comments on commit 0bbf61b

Please sign in to comment.