Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: add l0 metric and fix datacoord no need drop l0 segment when flush #28373

Merged
merged 1 commit into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 30 additions & 25 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ type meta struct {

// A local cache of segment metric update. Must call commit() to take effect.
type segMetricMutation struct {
stateChange map[string]int // segment state -> state change count (to increase or decrease).
rowCountChange int64 // Change in # of rows.
rowCountAccChange int64 // Total # of historical added rows, accumulated.
stateChange map[string]map[string]int // segment state, seg level -> state change count (to increase or decrease).
rowCountChange int64 // Change in # of rows.
rowCountAccChange int64 // Total # of historical added rows, accumulated.
}

type collectionInfo struct {
Expand Down Expand Up @@ -107,15 +107,11 @@ func (m *meta) reloadFromKV() error {
return err
}
metrics.DataCoordNumCollections.WithLabelValues().Set(0)
metrics.DataCoordNumSegments.WithLabelValues(metrics.SealedSegmentLabel).Set(0)
metrics.DataCoordNumSegments.WithLabelValues(metrics.GrowingSegmentLabel).Set(0)
metrics.DataCoordNumSegments.WithLabelValues(metrics.FlushedSegmentLabel).Set(0)
metrics.DataCoordNumSegments.WithLabelValues(metrics.FlushingSegmentLabel).Set(0)
metrics.DataCoordNumSegments.WithLabelValues(metrics.DroppedSegmentLabel).Set(0)
metrics.DataCoordNumSegments.Reset()
numStoredRows := int64(0)
for _, segment := range segments {
m.segments.SetSegment(segment.ID, NewSegmentInfo(segment))
metrics.DataCoordNumSegments.WithLabelValues(segment.State.String()).Inc()
metrics.DataCoordNumSegments.WithLabelValues(segment.State.String(), segment.GetLevel().String()).Inc()
if segment.State == commonpb.SegmentState_Flushed {
numStoredRows += segment.NumOfRows

Expand Down Expand Up @@ -310,7 +306,7 @@ func (m *meta) AddSegment(ctx context.Context, segment *SegmentInfo) error {
return err
}
m.segments.SetSegment(segment.GetID(), segment)
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String()).Inc()
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String()).Inc()
log.Info("meta update: adding segment - complete", zap.Int64("segmentID", segment.GetID()))
return nil
}
Expand All @@ -332,7 +328,7 @@ func (m *meta) DropSegment(segmentID UniqueID) error {
zap.Error(err))
return err
}
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String()).Dec()
metrics.DataCoordNumSegments.WithLabelValues(segment.GetState().String(), segment.GetLevel().String()).Dec()
m.segments.DropSegment(segmentID)
log.Info("meta update: dropping segment - complete",
zap.Int64("segmentID", segmentID))
Expand Down Expand Up @@ -388,7 +384,7 @@ func (m *meta) SetState(segmentID UniqueID, targetState commonpb.SegmentState) e
// Persist segment updates first.
clonedSegment := curSegInfo.Clone()
metricMutation := &segMetricMutation{
stateChange: make(map[string]int),
stateChange: make(map[string]map[string]int),
}
if clonedSegment != nil && isSegmentHealthy(clonedSegment) {
// Update segment state and prepare segment metric update.
Expand Down Expand Up @@ -604,7 +600,7 @@ func (m *meta) UpdateSegmentsInfo(operators ...UpdateOperator) error {
segments: make(map[int64]*SegmentInfo),
increments: make(map[int64]metastore.BinlogsIncrement),
metricMutation: &segMetricMutation{
stateChange: make(map[string]int),
stateChange: make(map[string]map[string]int),
},
}

Expand Down Expand Up @@ -643,7 +639,7 @@ func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentI

// Prepare segment metric mutation.
metricMutation := &segMetricMutation{
stateChange: make(map[string]int),
stateChange: make(map[string]map[string]int),
}
modSegments := make(map[UniqueID]*SegmentInfo)
// save new segments flushed from buffer data
Expand Down Expand Up @@ -684,7 +680,7 @@ func (m *meta) UpdateDropChannelSegmentInfo(channel string, segments []*SegmentI
// mergeDropSegment merges drop segment information with meta segments
func (m *meta) mergeDropSegment(seg2Drop *SegmentInfo) (*SegmentInfo, *segMetricMutation) {
metricMutation := &segMetricMutation{
stateChange: make(map[string]int),
stateChange: make(map[string]map[string]int),
}

segment := m.segments.GetSegment(seg2Drop.ID)
Expand Down Expand Up @@ -1010,7 +1006,7 @@ func (m *meta) PrepareCompleteCompactionMutation(plan *datapb.CompactionPlan,
)

metricMutation := &segMetricMutation{
stateChange: make(map[string]int),
stateChange: make(map[string]map[string]int),
}
for _, cl := range compactionLogs {
if segment := m.segments.GetSegment(cl.GetSegmentID()); segment != nil {
Expand Down Expand Up @@ -1081,7 +1077,7 @@ func (m *meta) PrepareCompleteCompactionMutation(plan *datapb.CompactionPlan,
LastExpireTime: plan.GetStartTime(),
}
segment := NewSegmentInfo(segmentInfo)
metricMutation.addNewSeg(segment.GetState(), segment.GetNumOfRows())
metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetNumOfRows())
log.Info("meta update: prepare for complete compaction mutation - complete",
zap.Int64("collectionID", segment.GetCollectionID()),
zap.Int64("partitionID", segment.GetPartitionID()),
Expand Down Expand Up @@ -1336,26 +1332,35 @@ func (m *meta) GcConfirm(ctx context.Context, collectionID, partitionID UniqueID
}

// addNewSeg update metrics update for a new segment.
func (s *segMetricMutation) addNewSeg(state commonpb.SegmentState, rowCount int64) {
s.stateChange[state.String()]++
func (s *segMetricMutation) addNewSeg(state commonpb.SegmentState, level datapb.SegmentLevel, rowCount int64) {
if _, ok := s.stateChange[level.String()]; !ok {
s.stateChange[level.String()] = make(map[string]int)
}
s.stateChange[level.String()][state.String()] += 1

s.rowCountChange += rowCount
s.rowCountAccChange += rowCount
}

// commit persists all updates in current segMetricMutation, should and must be called AFTER segment state change
// has persisted in Etcd.
func (s *segMetricMutation) commit() {
for state, change := range s.stateChange {
metrics.DataCoordNumSegments.WithLabelValues(state).Add(float64(change))
for level, submap := range s.stateChange {
for state, change := range submap {
metrics.DataCoordNumSegments.WithLabelValues(state, level).Add(float64(change))
}
}
metrics.DataCoordNumStoredRowsCounter.WithLabelValues().Add(float64(s.rowCountAccChange))
}

// append updates current segMetricMutation when segment state change happens.
func (s *segMetricMutation) append(oldState, newState commonpb.SegmentState, rowCountUpdate int64) {
func (s *segMetricMutation) append(oldState, newState commonpb.SegmentState, level datapb.SegmentLevel, rowCountUpdate int64) {
if oldState != newState {
s.stateChange[oldState.String()]--
s.stateChange[newState.String()]++
if _, ok := s.stateChange[level.String()]; !ok {
s.stateChange[level.String()] = make(map[string]int)
}
s.stateChange[level.String()][oldState.String()] -= 1
s.stateChange[level.String()][newState.String()] += 1
}
// Update # of rows on new flush operations and drop operations.
if isFlushState(newState) && !isFlushState(oldState) {
Expand All @@ -1379,6 +1384,6 @@ func updateSegStateAndPrepareMetrics(segToUpdate *SegmentInfo, targetState commo
zap.String("old state", segToUpdate.GetState().String()),
zap.String("new state", targetState.String()),
zap.Int64("# of rows", segToUpdate.GetNumOfRows()))
metricMutation.append(segToUpdate.GetState(), targetState, segToUpdate.GetNumOfRows())
metricMutation.append(segToUpdate.GetState(), targetState, segToUpdate.GetLevel(), segToUpdate.GetNumOfRows())
segToUpdate.State = targetState
}
4 changes: 2 additions & 2 deletions internal/datacoord/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (suite *MetaReloadSuite) TestReloadFromKV() {
suite.NoError(err)
suite.NotNil(meta)

suite.MetricsEqual(metrics.DataCoordNumSegments.WithLabelValues(metrics.FlushedSegmentLabel), 1)
suite.MetricsEqual(metrics.DataCoordNumSegments.WithLabelValues(metrics.FlushedSegmentLabel, datapb.SegmentLevel_Legacy.String()), 1)
})
}

Expand Down Expand Up @@ -731,7 +731,7 @@ func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) {
assert.NotNil(t, beforeCompact)
assert.NotNil(t, afterCompact)
assert.NotNil(t, newSegment)
assert.Equal(t, 3, len(metricMutation.stateChange))
assert.Equal(t, 3, len(metricMutation.stateChange[datapb.SegmentLevel_Legacy.String()]))
assert.Equal(t, int64(0), metricMutation.rowCountChange)
assert.Equal(t, int64(2), metricMutation.rowCountAccChange)

Expand Down
10 changes: 8 additions & 2 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,15 @@
log.Info("flush segment with meta", zap.Any("meta", req.GetField2BinlogPaths()))

if req.GetFlushed() {
s.segmentManager.DropSegment(ctx, req.SegmentID)
s.flushCh <- req.SegmentID
if req.GetSegLevel() == datapb.SegmentLevel_L0 {
metrics.DataCoordSizeStoredL0Segment.WithLabelValues().Observe(calculateL0SegmentSize(req.GetField2StatslogPaths()))
metrics.DataCoordRateStoredL0Segment.WithLabelValues().Inc()

Check warning on line 504 in internal/datacoord/services.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/services.go#L503-L504

Added lines #L503 - L504 were not covered by tests
} else {
// because segmentMananger only manage growing segment
s.segmentManager.DropSegment(ctx, req.SegmentID)
}

s.flushCh <- req.SegmentID
if !req.Importing && Params.DataCoordCfg.EnableCompaction.GetAsBool() {
err = s.compactionTrigger.triggerSingleCompaction(segment.GetCollectionID(), segment.GetPartitionID(),
segmentID, segment.GetInsertChannel())
Expand Down
10 changes: 10 additions & 0 deletions internal/datacoord/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,13 @@ func mergeFieldBinlogs(currentBinlogs []*datapb.FieldBinlog, newBinlogs []*datap
}
return currentBinlogs
}

func calculateL0SegmentSize(fields []*datapb.FieldBinlog) float64 {
size := int64(0)
for _, field := range fields {
for _, binlog := range field.GetBinlogs() {
size += binlog.GetLogSize()
}
}
return float64(size)
}
11 changes: 11 additions & 0 deletions internal/datacoord/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
Expand Down Expand Up @@ -184,3 +185,13 @@ func (suite *UtilSuite) TestGetCollectionAutoCompactionEnabled() {
suite.NoError(err)
suite.Equal(Params.DataCoordCfg.EnableAutoCompaction.GetAsBool(), enabled)
}

func (suite *UtilSuite) TestCalculateL0SegmentSize() {
logsize := int64(100)
fields := []*datapb.FieldBinlog{{
FieldID: 102,
Binlogs: []*datapb.Binlog{{LogSize: logsize}},
}}

suite.Equal(calculateL0SegmentSize(fields), float64(logsize))
}
19 changes: 19 additions & 0 deletions pkg/metrics/datacoord_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ var (
Help: "number of segments",
}, []string{
segmentStateLabelName,
segmentLevelLabelName,
})

// DataCoordCollectionNum records the num of collections managed by DataCoord.
Expand All @@ -70,6 +71,22 @@ var (
Help: "number of collections",
}, []string{})

DataCoordSizeStoredL0Segment = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataCoordRole,
Name: "store_l0_segment_size",
Help: "stored l0 segment size",
aoiasd marked this conversation as resolved.
Show resolved Hide resolved
}, []string{})
aoiasd marked this conversation as resolved.
Show resolved Hide resolved

DataCoordRateStoredL0Segment = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataCoordRole,
Name: "store_l0_segment_rate",
Help: "stored l0 segment rate",
}, []string{})

DataCoordNumStoredRows = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Expand Down Expand Up @@ -254,6 +271,8 @@ func RegisterDataCoord(registry *prometheus.Registry) {
registry.MustRegister(DataCoordSegmentBinLogFileCount)
registry.MustRegister(DataCoordDmlChannelNum)
registry.MustRegister(DataCoordCompactedSegmentSize)
registry.MustRegister(DataCoordSizeStoredL0Segment)
registry.MustRegister(DataCoordRateStoredL0Segment)
registry.MustRegister(FlushedSegmentFileNum)
registry.MustRegister(IndexRequestCounter)
registry.MustRegister(IndexTaskNum)
Expand Down
1 change: 1 addition & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ const (
collectionName = "collection_name"
segmentStateLabelName = "segment_state"
segmentIDLabelName = "segment_id"
segmentLevelLabelName = "segment_level"
usernameLabelName = "username"
roleNameLabelName = "role_name"
cacheNameLabelName = "cache_name"
Expand Down
Loading