diff --git a/configs/milvus.yaml b/configs/milvus.yaml index c24c0201871a5..66e69863ff24d 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -528,8 +528,8 @@ dataCoord: # The max idle time of segment in seconds, 10*60. maxIdleTime: 600 minSizeFromIdleToSealed: 16 # The min size in MB of segment which can be idle from sealed. - # The max number of binlog file for one segment, the segment will be sealed if - # the number of binlog file reaches to max value. + # The max number of binlog (which is equal to the binlog file num of primary key) for one segment, + # the segment will be sealed if the number of binlog file reaches to max value. maxBinlogFileNumber: 32 smallProportion: 0.5 # The segment is considered as "small segment" when its # of rows is smaller than # (smallProportion * segment max # of rows). diff --git a/internal/datacoord/segment_allocation_policy.go b/internal/datacoord/segment_allocation_policy.go index cf467ffa09ac2..380183146dd38 100644 --- a/internal/datacoord/segment_allocation_policy.go +++ b/internal/datacoord/segment_allocation_policy.go @@ -161,10 +161,12 @@ func sealL1SegmentByLifetime(lifetime time.Duration) segmentSealPolicyFunc { func sealL1SegmentByBinlogFileNumber(maxBinlogFileNumber int) segmentSealPolicyFunc { return func(segment *SegmentInfo, ts Timestamp) (bool, string) { logFileCounter := 0 - for _, fieldBinlog := range segment.GetStatslogs() { + for _, fieldBinlog := range segment.GetBinlogs() { + // Only count the binlog file number of the first field which is equal to the binlog file number of the primary field. + // Remove the multiplier generated by the number of fields. logFileCounter += len(fieldBinlog.GetBinlogs()) + break } - return logFileCounter >= maxBinlogFileNumber, fmt.Sprintf("Segment binlog number too large, binlog number: %d, max binlog number: %d", logFileCounter, maxBinlogFileNumber) } diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index 4624e7f77351d..6634cf4f069cf 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -626,7 +626,7 @@ func TestTryToSealSegment(t *testing.T) { segments := segmentManager.meta.segments.segments assert.Equal(t, 1, len(segments)) for _, seg := range segments { - seg.Statslogs = []*datapb.FieldBinlog{ + seg.Binlogs = []*datapb.FieldBinlog{ { FieldID: 1, Binlogs: []*datapb.Binlog{ diff --git a/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go b/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go index ace4d80c2f5c4..15819bc6591c5 100644 --- a/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go +++ b/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go @@ -124,9 +124,10 @@ func (c *channelLifetime) Run() error { } if tt, ok := t.(*syncmgr.SyncTask); ok { insertLogs, _, _ := tt.Binlogs() - resource.Resource().SegmentAssignStatsManager().UpdateOnSync(tt.SegmentID(), - stats.SyncOperationMetrics{BinLogCounterIncr: uint64(len(insertLogs))}, - ) + resource.Resource().SegmentAssignStatsManager().UpdateOnSync(tt.SegmentID(), stats.SyncOperationMetrics{ + BinLogCounterIncr: 1, + BinLogFileCounterIncr: uint64(len(insertLogs)), + }) } }) if err != nil { diff --git a/internal/streamingnode/server/wal/interceptors/segment/policy/segment_seal_policy.go b/internal/streamingnode/server/wal/interceptors/segment/policy/segment_seal_policy.go index 4e2839e96dbe1..f62b408b9f1f1 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/policy/segment_seal_policy.go +++ b/internal/streamingnode/server/wal/interceptors/segment/policy/segment_seal_policy.go @@ -23,7 +23,7 @@ func GetSegmentAsyncSealPolicy() []SegmentAsyncSealPolicy { // TODO: dynamic policy can be applied here in future. return []SegmentAsyncSealPolicy{ &sealByCapacity{}, - &sealByBinlogFileNumber{}, + &sealByBinlogNumber{}, &sealByLifetime{}, &sealByIdleTime{}, } @@ -57,23 +57,23 @@ func (p *sealByCapacity) ShouldBeSealed(stats *stats.SegmentStats) SealPolicyRes } } -// sealByBinlogFileNumberExtraInfo is the extra info of the seal by binlog file number policy. -type sealByBinlogFileNumberExtraInfo struct { - BinLogFileNumberLimit int +// sealByBinlogFileExtraInfo is the extra info of the seal by binlog file number policy. +type sealByBinlogFileExtraInfo struct { + BinLogNumberLimit int } -// sealByBinlogFileNumber is a policy to seal the segment by the binlog file number. -type sealByBinlogFileNumber struct{} +// sealByBinlogNumber is a policy to seal the segment by the binlog file number. +type sealByBinlogNumber struct{} // ShouldBeSealed checks if the segment should be sealed, and return the reason string. -func (p *sealByBinlogFileNumber) ShouldBeSealed(stats *stats.SegmentStats) SealPolicyResult { +func (p *sealByBinlogNumber) ShouldBeSealed(stats *stats.SegmentStats) SealPolicyResult { limit := paramtable.Get().DataCoordCfg.SegmentMaxBinlogFileNumber.GetAsInt() shouldBeSealed := stats.BinLogCounter >= uint64(limit) return SealPolicyResult{ - PolicyName: "binlog_file_number", + PolicyName: "binlog_number", ShouldBeSealed: shouldBeSealed, - ExtraInfo: &sealByBinlogFileNumberExtraInfo{ - BinLogFileNumberLimit: limit, + ExtraInfo: &sealByBinlogFileExtraInfo{ + BinLogNumberLimit: limit, }, } } diff --git a/internal/streamingnode/server/wal/interceptors/segment/stats/stats.go b/internal/streamingnode/server/wal/interceptors/segment/stats/stats.go index 7d160d9daea69..2d9e8007c1ad5 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/stats/stats.go +++ b/internal/streamingnode/server/wal/interceptors/segment/stats/stats.go @@ -7,13 +7,15 @@ import ( ) // SegmentStats is the usage stats of a segment. +// The SegmentStats is imprecise, so it is not promised to be recoverable for performance. type SegmentStats struct { - Insert InsertMetrics - MaxBinarySize uint64 // MaxBinarySize of current segment should be assigned, it's a fixed value when segment is transfer int growing. - CreateTime time.Time // created timestamp of this segment, it's a fixed value when segment is created, not a tso. - LastModifiedTime time.Time // LastWriteTime is the last write time of this segment, it's not a tso, just a local time. - BinLogCounter uint64 // BinLogCounter is the counter of binlog, it's an async stat not real time. - ReachLimit bool // ReachLimit is a flag to indicate the segment reach the limit once. + Insert InsertMetrics + MaxBinarySize uint64 // MaxBinarySize of current segment should be assigned, it's a fixed value when segment is transfer int growing. + CreateTime time.Time // created timestamp of this segment, it's a fixed value when segment is created, not a tso. + LastModifiedTime time.Time // LastWriteTime is the last write time of this segment, it's not a tso, just a local time. + BinLogCounter uint64 // BinLogCounter is the counter of binlog (equal to the binlog file count of primary key), it's an async stat not real time. + BinLogFileCounter uint64 // BinLogFileCounter is the counter of binlog files, it's an async stat not real time. + ReachLimit bool // ReachLimit is a flag to indicate the segment reach the limit once. } // NewSegmentStatFromProto creates a new segment assignment stat from proto. @@ -50,7 +52,8 @@ func NewProtoFromSegmentStat(stat *SegmentStats) *streamingpb.SegmentAssignmentS // SyncOperationMetrics is the metrics of sync operation. type SyncOperationMetrics struct { - BinLogCounterIncr uint64 // the counter increment of bin log. + BinLogCounterIncr uint64 // the counter increment of bin log + BinLogFileCounterIncr uint64 // the counter increment of bin log file } // AllocRows alloc space of rows on current segment. @@ -74,6 +77,7 @@ func (s *SegmentStats) BinaryCanBeAssign() uint64 { // UpdateOnSync updates the stats of segment on sync. func (s *SegmentStats) UpdateOnSync(f SyncOperationMetrics) { s.BinLogCounter += f.BinLogCounterIncr + s.BinLogFileCounter += f.BinLogFileCounterIncr } // Copy copies the segment stats. diff --git a/internal/streamingnode/server/wal/interceptors/segment/stats/stats_test.go b/internal/streamingnode/server/wal/interceptors/segment/stats/stats_test.go index 2224be9d462dd..7f294f6560411 100644 --- a/internal/streamingnode/server/wal/interceptors/segment/stats/stats_test.go +++ b/internal/streamingnode/server/wal/interceptors/segment/stats/stats_test.go @@ -43,10 +43,11 @@ func TestSegmentStats(t *testing.T) { Rows: 100, BinarySize: 200, }, - MaxBinarySize: 400, - CreateTime: now, - LastModifiedTime: now, - BinLogCounter: 3, + MaxBinarySize: 400, + CreateTime: now, + LastModifiedTime: now, + BinLogCounter: 3, + BinLogFileCounter: 4, } insert1 := InsertMetrics{ @@ -69,7 +70,9 @@ func TestSegmentStats(t *testing.T) { assert.Equal(t, stat.Insert.BinarySize, uint64(320)) stat.UpdateOnSync(SyncOperationMetrics{ - BinLogCounterIncr: 4, + BinLogCounterIncr: 4, + BinLogFileCounterIncr: 9, }) assert.Equal(t, uint64(7), stat.BinLogCounter) + assert.Equal(t, uint64(13), stat.BinLogFileCounter) } diff --git a/pkg/metrics/streaming_service_metrics.go b/pkg/metrics/streaming_service_metrics.go index eb84012acaac5..e412225622688 100644 --- a/pkg/metrics/streaming_service_metrics.go +++ b/pkg/metrics/streaming_service_metrics.go @@ -217,13 +217,13 @@ var ( Help: "Total of append message to wal", }, WALChannelLabelName, WALMessageTypeLabelName, StatusLabelName) - WALAppendMessageDurationSeconds = newStreamingNodeHistogramVec(prometheus.HistogramOpts{ + WALAppendMessageDurationSeconds = newWALHistogramVec(prometheus.HistogramOpts{ Name: "append_message_duration_seconds", Help: "Duration of wal append message", Buckets: secondsBuckets, }, WALChannelLabelName, StatusLabelName) - WALImplsAppendMessageDurationSeconds = newStreamingNodeHistogramVec(prometheus.HistogramOpts{ + WALImplsAppendMessageDurationSeconds = newWALHistogramVec(prometheus.HistogramOpts{ Name: "impls_append_message_duration_seconds", Help: "Duration of wal impls append message", Buckets: secondsBuckets, diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 8fdeb738244b8..7784f7eabf2d4 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3417,8 +3417,8 @@ The max idle time of segment in seconds, 10*60.`, Key: "dataCoord.segment.maxBinlogFileNumber", Version: "2.2.0", DefaultValue: "32", - Doc: `The max number of binlog file for one segment, the segment will be sealed if -the number of binlog file reaches to max value.`, + Doc: `The max number of binlog (which is equal to the binlog file num of primary key) for one segment, +the segment will be sealed if the number of binlog file reaches to max value.`, Export: true, } p.SegmentMaxBinlogFileNumber.Init(base.mgr)