Skip to content

Commit

Permalink
enhance: add l0 metric and fix datacoord no need drop l0 segment when…
Browse files Browse the repository at this point in the history
… flush (#28373)

relate: #27675

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
  • Loading branch information
aoiasd authored Nov 24, 2023
1 parent 39be358 commit 8a4cfb7
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 29 deletions.
55 changes: 30 additions & 25 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ type meta struct {

// A local cache of segment metric update. Must call commit() to take effect.
type segMetricMutation struct {
stateChange map[string]int // segment state -> state change count (to increase or decrease).
rowCountChange int64 // Change in # of rows.
rowCountAccChange int64 // Total # of historical added rows, accumulated.
stateChange map[string]map[string]int // segment state, seg level -> state change count (to increase or decrease).
rowCountChange int64 // Change in # of rows.
rowCountAccChange int64 // Total # of historical added rows, accumulated.
}

type collectionInfo struct {
Expand Down Expand Up @@ -107,15 +107,11 @@ func (m *meta) reloadFromKV() error {
return err
}
metrics.DataCoordNumCollections.WithLabelValues().Set(0)
metrics.DataCoordNumSegments.WithLabelValues(metrics.SealedSegmentLabel).Set(0)
metrics.DataCoordNumSegments.WithLabelValues(metrics.GrowingSegmentLabel).Set(0)
metrics.DataCoordNumSegments.WithLabelValues(metrics.FlushedSegmentLabel).Set(0)
metrics.DataCoordNumSegments.WithLabelValues(metrics.FlushingSegmentLabel).Set(0)
metrics.DataCoordNumSegments.WithLabelValues(metrics.DroppedSegmentLabel).Set(0)
metrics.DataCoordNumSegments.Reset()
numStoredRows := int64(0)
for _, segment := range segments {
m.segments.SetSegment(segment.ID, NewSegmentInfo(segment))
metrics.DataCoordNumSegments.WithLabelValues(segment.State.String()).Inc()
metrics.DataCoordNumSegments.WithLabelValues(segment.State.String(), segment.GetLevel().String()).Inc()
if segment.State == commonpb.SegmentState_Flushed {
numStoredRows += segment.NumOfRows

Expand Down Expand Up @@ -302,7 +298,7 @@ func (m *meta) AddSegment(ctx context.Context, segment *SegmentInfo) error {
return err
}
m.segments.SetSegment(segment.GetID(), segment)
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String()).Inc()
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String()).Inc()
log.Info("meta update: adding segment - complete", zap.Int64("segmentID", segment.GetID()))
return nil
}
Expand All @@ -324,7 +320,7 @@ func (m *meta) DropSegment(segmentID UniqueID) error {
zap.Error(err))
return err
}
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String()).Dec()
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String()).Dec()
m.segments.DropSegment(segmentID)
log.Info("meta update: dropping segment - complete",
zap.Int64("segmentID", segmentID))
Expand Down Expand Up @@ -380,7 +376,7 @@ func (m *meta) SetState(segmentID UniqueID, targetState commonpb.SegmentState) e
// Persist segment updates first.
clonedSegment := curSegInfo.Clone()
metricMutation := &segMetricMutation{
stateChange: make(map[string]int),
stateChange: make(map[string]map[string]int),
}
if clonedSegment != nil && isSegmentHealthy(clonedSegment) {
// Update segment state and prepare segment metric update.
Expand Down Expand Up @@ -611,7 +607,7 @@ func (m *meta) UpdateSegmentsInfo(operators ...UpdateOperator) error {
segments: make(map[int64]*SegmentInfo),
increments: make(map[int64]metastore.BinlogsIncrement),
metricMutation: &segMetricMutation{
stateChange: make(map[string]int),
stateChange: make(map[string]map[string]int),
},
}

Expand Down Expand Up @@ -650,7 +646,7 @@ func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentI

// Prepare segment metric mutation.
metricMutation := &segMetricMutation{
stateChange: make(map[string]int),
stateChange: make(map[string]map[string]int),
}
modSegments := make(map[UniqueID]*SegmentInfo)
// save new segments flushed from buffer data
Expand Down Expand Up @@ -691,7 +687,7 @@ func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentI
// mergeDropSegment merges drop segment information with meta segments
func (m *meta) mergeDropSegment(seg2Drop *SegmentInfo) (*SegmentInfo, *segMetricMutation) {
metricMutation := &segMetricMutation{
stateChange: make(map[string]int),
stateChange: make(map[string]map[string]int),
}

segment := m.segments.GetSegment(seg2Drop.ID)
Expand Down Expand Up @@ -976,7 +972,7 @@ func (m *meta) PrepareCompleteCompactionMutation(plan *datapb.CompactionPlan,
)

metricMutation := &segMetricMutation{
stateChange: make(map[string]int),
stateChange: make(map[string]map[string]int),
}
for _, cl := range compactionLogs {
if segment := m.segments.GetSegment(cl.GetSegmentID()); segment != nil {
Expand Down Expand Up @@ -1047,7 +1043,7 @@ func (m *meta) PrepareCompleteCompactionMutation(plan *datapb.CompactionPlan,
LastExpireTime: plan.GetStartTime(),
}
segment := NewSegmentInfo(segmentInfo)
metricMutation.addNewSeg(segment.GetState(), segment.GetNumOfRows())
metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetNumOfRows())
log.Info("meta update: prepare for complete compaction mutation - complete",
zap.Int64("collectionID", segment.GetCollectionID()),
zap.Int64("partitionID", segment.GetPartitionID()),
Expand Down Expand Up @@ -1339,26 +1335,35 @@ func (m *meta) GetEarliestStartPositionOfGrowingSegments(label *CompactionGroupL
}

// addNewSeg update metrics update for a new segment.
func (s *segMetricMutation) addNewSeg(state commonpb.SegmentState, rowCount int64) {
s.stateChange[state.String()]++
func (s *segMetricMutation) addNewSeg(state commonpb.SegmentState, level datapb.SegmentLevel, rowCount int64) {
if _, ok := s.stateChange[level.String()]; !ok {
s.stateChange[level.String()] = make(map[string]int)
}
s.stateChange[level.String()][state.String()] += 1

s.rowCountChange += rowCount
s.rowCountAccChange += rowCount
}

// commit persists all updates in current segMetricMutation, should and must be called AFTER segment state change
// has persisted in Etcd.
func (s *segMetricMutation) commit() {
for state, change := range s.stateChange {
metrics.DataCoordNumSegments.WithLabelValues(state).Add(float64(change))
for level, submap := range s.stateChange {
for state, change := range submap {
metrics.DataCoordNumSegments.WithLabelValues(state, level).Add(float64(change))
}
}
metrics.DataCoordNumStoredRowsCounter.WithLabelValues().Add(float64(s.rowCountAccChange))
}

// append updates current segMetricMutation when segment state change happens.
func (s *segMetricMutation) append(oldState, newState commonpb.SegmentState, rowCountUpdate int64) {
func (s *segMetricMutation) append(oldState, newState commonpb.SegmentState, level datapb.SegmentLevel, rowCountUpdate int64) {
if oldState != newState {
s.stateChange[oldState.String()]--
s.stateChange[newState.String()]++
if _, ok := s.stateChange[level.String()]; !ok {
s.stateChange[level.String()] = make(map[string]int)
}
s.stateChange[level.String()][oldState.String()] -= 1
s.stateChange[level.String()][newState.String()] += 1
}
// Update # of rows on new flush operations and drop operations.
if isFlushState(newState) && !isFlushState(oldState) {
Expand All @@ -1382,6 +1387,6 @@ func updateSegStateAndPrepareMetrics(segToUpdate *SegmentInfo, targetState commo
zap.String("old state", segToUpdate.GetState().String()),
zap.String("new state", targetState.String()),
zap.Int64("# of rows", segToUpdate.GetNumOfRows()))
metricMutation.append(segToUpdate.GetState(), targetState, segToUpdate.GetNumOfRows())
metricMutation.append(segToUpdate.GetState(), targetState, segToUpdate.GetLevel(), segToUpdate.GetNumOfRows())
segToUpdate.State = targetState
}
4 changes: 2 additions & 2 deletions internal/datacoord/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (suite *MetaReloadSuite) TestReloadFromKV() {
suite.NoError(err)
suite.NotNil(meta)

suite.MetricsEqual(metrics.DataCoordNumSegments.WithLabelValues(metrics.FlushedSegmentLabel), 1)
suite.MetricsEqual(metrics.DataCoordNumSegments.WithLabelValues(metrics.FlushedSegmentLabel, datapb.SegmentLevel_Legacy.String()), 1)
})
}

Expand Down Expand Up @@ -731,7 +731,7 @@ func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) {
assert.NotNil(t, beforeCompact)
assert.NotNil(t, afterCompact)
assert.NotNil(t, newSegment)
assert.Equal(t, 3, len(metricMutation.stateChange))
assert.Equal(t, 3, len(metricMutation.stateChange[datapb.SegmentLevel_Legacy.String()]))
assert.Equal(t, int64(0), metricMutation.rowCountChange)
assert.Equal(t, int64(2), metricMutation.rowCountAccChange)

Expand Down
10 changes: 8 additions & 2 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,9 +502,15 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
log.Info("flush segment with meta", zap.Any("meta", req.GetField2BinlogPaths()))

if req.GetFlushed() {
s.segmentManager.DropSegment(ctx, req.SegmentID)
s.flushCh <- req.SegmentID
if req.GetSegLevel() == datapb.SegmentLevel_L0 {
metrics.DataCoordSizeStoredL0Segment.WithLabelValues().Observe(calculateL0SegmentSize(req.GetField2StatslogPaths()))
metrics.DataCoordRateStoredL0Segment.WithLabelValues().Inc()
} else {
// because segmentMananger only manage growing segment
s.segmentManager.DropSegment(ctx, req.SegmentID)
}

s.flushCh <- req.SegmentID
if !req.Importing && Params.DataCoordCfg.EnableCompaction.GetAsBool() {
if segment == nil && req.GetSegLevel() == datapb.SegmentLevel_L0 {
err = s.compactionTrigger.triggerSingleCompaction(req.GetCollectionID(), req.GetPartitionID(),
Expand Down
10 changes: 10 additions & 0 deletions internal/datacoord/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,13 @@ func mergeFieldBinlogs(currentBinlogs []*datapb.FieldBinlog, newBinlogs []*datap
}
return currentBinlogs
}

func calculateL0SegmentSize(fields []*datapb.FieldBinlog) float64 {
size := int64(0)
for _, field := range fields {
for _, binlog := range field.GetBinlogs() {
size += binlog.GetLogSize()
}
}
return float64(size)
}
11 changes: 11 additions & 0 deletions internal/datacoord/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
Expand Down Expand Up @@ -184,3 +185,13 @@ func (suite *UtilSuite) TestGetCollectionAutoCompactionEnabled() {
suite.NoError(err)
suite.Equal(Params.DataCoordCfg.EnableAutoCompaction.GetAsBool(), enabled)
}

func (suite *UtilSuite) TestCalculateL0SegmentSize() {
logsize := int64(100)
fields := []*datapb.FieldBinlog{{
FieldID: 102,
Binlogs: []*datapb.Binlog{{LogSize: logsize}},
}}

suite.Equal(calculateL0SegmentSize(fields), float64(logsize))
}
19 changes: 19 additions & 0 deletions pkg/metrics/datacoord_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ var (
Help: "number of segments",
}, []string{
segmentStateLabelName,
segmentLevelLabelName,
})

// DataCoordCollectionNum records the num of collections managed by DataCoord.
Expand All @@ -70,6 +71,22 @@ var (
Help: "number of collections",
}, []string{})

DataCoordSizeStoredL0Segment = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataCoordRole,
Name: "store_l0_segment_size",
Help: "stored l0 segment size",
}, []string{})

DataCoordRateStoredL0Segment = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataCoordRole,
Name: "store_l0_segment_rate",
Help: "stored l0 segment rate",
}, []string{})

DataCoordNumStoredRows = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Expand Down Expand Up @@ -254,6 +271,8 @@ func RegisterDataCoord(registry *prometheus.Registry) {
registry.MustRegister(DataCoordSegmentBinLogFileCount)
registry.MustRegister(DataCoordDmlChannelNum)
registry.MustRegister(DataCoordCompactedSegmentSize)
registry.MustRegister(DataCoordSizeStoredL0Segment)
registry.MustRegister(DataCoordRateStoredL0Segment)
registry.MustRegister(FlushedSegmentFileNum)
registry.MustRegister(IndexRequestCounter)
registry.MustRegister(IndexTaskNum)
Expand Down
1 change: 1 addition & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ const (
collectionName = "collection_name"
segmentStateLabelName = "segment_state"
segmentIDLabelName = "segment_id"
segmentLevelLabelName = "segment_level"
usernameLabelName = "username"
roleNameLabelName = "role_name"
cacheNameLabelName = "cache_name"
Expand Down

0 comments on commit 8a4cfb7

Please sign in to comment.