Skip to content

Commit

Permalink
fix: use binlog counter to trigger flush but not stats log
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <chyezh@outlook.com>
  • Loading branch information
chyezh committed Oct 23, 2024
1 parent 3b024f9 commit 9e4982a
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 34 deletions.
4 changes: 2 additions & 2 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -528,8 +528,8 @@ dataCoord:
# The max idle time of segment in seconds, 10*60.
maxIdleTime: 600
minSizeFromIdleToSealed: 16 # The min size in MB of segment which can be idle from sealed.
# The max number of binlog file for one segment, the segment will be sealed if
# the number of binlog file reaches to max value.
# The max number of binlog (which is equal to the binlog file num of primary key) for one segment,
# the segment will be sealed if the number of binlog file reaches to max value.
maxBinlogFileNumber: 32
smallProportion: 0.5 # The segment is considered as "small segment" when its # of rows is smaller than
# (smallProportion * segment max # of rows).
Expand Down
6 changes: 4 additions & 2 deletions internal/datacoord/segment_allocation_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,12 @@ func sealL1SegmentByLifetime(lifetime time.Duration) segmentSealPolicyFunc {
func sealL1SegmentByBinlogFileNumber(maxBinlogFileNumber int) segmentSealPolicyFunc {
return func(segment *SegmentInfo, ts Timestamp) (bool, string) {
logFileCounter := 0
for _, fieldBinlog := range segment.GetStatslogs() {
for _, fieldBinlog := range segment.GetBinlogs() {
// Only count the binlog file number of the first field which is equal to the binlog file number of the primary field.
// Remove the multiplier generated by the number of fields.
logFileCounter += len(fieldBinlog.GetBinlogs())
break
}

return logFileCounter >= maxBinlogFileNumber,
fmt.Sprintf("Segment binlog number too large, binlog number: %d, max binlog number: %d", logFileCounter, maxBinlogFileNumber)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/segment_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ func TestTryToSealSegment(t *testing.T) {
segments := segmentManager.meta.segments.segments
assert.Equal(t, 1, len(segments))
for _, seg := range segments {
seg.Statslogs = []*datapb.FieldBinlog{
seg.Binlogs = []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,10 @@ func (c *channelLifetime) Run() error {
}
if tt, ok := t.(*syncmgr.SyncTask); ok {
insertLogs, _, _ := tt.Binlogs()
resource.Resource().SegmentAssignStatsManager().UpdateOnSync(tt.SegmentID(),
stats.SyncOperationMetrics{BinLogCounterIncr: uint64(len(insertLogs))},
)
resource.Resource().SegmentAssignStatsManager().UpdateOnSync(tt.SegmentID(), stats.SyncOperationMetrics{
BinLogCounterIncr: 1,
BinLogFileCounterIncr: uint64(len(insertLogs)),
})
}
})
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func GetSegmentAsyncSealPolicy() []SegmentAsyncSealPolicy {
// TODO: dynamic policy can be applied here in future.
return []SegmentAsyncSealPolicy{
&sealByCapacity{},
&sealByBinlogFileNumber{},
&sealByBinlogNumber{},
&sealByLifetime{},
&sealByIdleTime{},
}
Expand Down Expand Up @@ -57,23 +57,23 @@ func (p *sealByCapacity) ShouldBeSealed(stats *stats.SegmentStats) SealPolicyRes
}
}

// sealByBinlogFileNumberExtraInfo is the extra info of the seal by binlog file number policy.
type sealByBinlogFileNumberExtraInfo struct {
BinLogFileNumberLimit int
// sealByBinlogFileExtraInfo is the extra info of the seal by binlog file number policy.
type sealByBinlogFileExtraInfo struct {
BinLogNumberLimit int
}

// sealByBinlogFileNumber is a policy to seal the segment by the binlog file number.
type sealByBinlogFileNumber struct{}
// sealByBinlogNumber is a policy to seal the segment by the binlog file number.
type sealByBinlogNumber struct{}

// ShouldBeSealed checks if the segment should be sealed, and return the reason string.
func (p *sealByBinlogFileNumber) ShouldBeSealed(stats *stats.SegmentStats) SealPolicyResult {
func (p *sealByBinlogNumber) ShouldBeSealed(stats *stats.SegmentStats) SealPolicyResult {
limit := paramtable.Get().DataCoordCfg.SegmentMaxBinlogFileNumber.GetAsInt()
shouldBeSealed := stats.BinLogCounter >= uint64(limit)
return SealPolicyResult{
PolicyName: "binlog_file_number",
PolicyName: "binlog_number",
ShouldBeSealed: shouldBeSealed,
ExtraInfo: &sealByBinlogFileNumberExtraInfo{
BinLogFileNumberLimit: limit,
ExtraInfo: &sealByBinlogFileExtraInfo{
BinLogNumberLimit: limit,
},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import (
)

// SegmentStats is the usage stats of a segment.
// The SegmentStats is imprecise, so it is not promised to be recoverable for performance.
type SegmentStats struct {
Insert InsertMetrics
MaxBinarySize uint64 // MaxBinarySize of current segment should be assigned, it's a fixed value when segment is transfer int growing.
CreateTime time.Time // created timestamp of this segment, it's a fixed value when segment is created, not a tso.
LastModifiedTime time.Time // LastWriteTime is the last write time of this segment, it's not a tso, just a local time.
BinLogCounter uint64 // BinLogCounter is the counter of binlog, it's an async stat not real time.
ReachLimit bool // ReachLimit is a flag to indicate the segment reach the limit once.
Insert InsertMetrics
MaxBinarySize uint64 // MaxBinarySize of current segment should be assigned, it's a fixed value when segment is transfer int growing.
CreateTime time.Time // created timestamp of this segment, it's a fixed value when segment is created, not a tso.
LastModifiedTime time.Time // LastWriteTime is the last write time of this segment, it's not a tso, just a local time.
BinLogCounter uint64 // BinLogCounter is the counter of binlog (equal to the binlog file count of primary key), it's an async stat not real time.
BinLogFileCounter uint64 // BinLogFileCounter is the counter of binlog files, it's an async stat not real time.
ReachLimit bool // ReachLimit is a flag to indicate the segment reach the limit once.
}

// NewSegmentStatFromProto creates a new segment assignment stat from proto.
Expand Down Expand Up @@ -50,7 +52,8 @@ func NewProtoFromSegmentStat(stat *SegmentStats) *streamingpb.SegmentAssignmentS

// SyncOperationMetrics is the metrics of sync operation.
type SyncOperationMetrics struct {
BinLogCounterIncr uint64 // the counter increment of bin log.
BinLogCounterIncr uint64 // the counter increment of bin log
BinLogFileCounterIncr uint64 // the counter increment of bin log file
}

// AllocRows alloc space of rows on current segment.
Expand All @@ -74,6 +77,7 @@ func (s *SegmentStats) BinaryCanBeAssign() uint64 {
// UpdateOnSync updates the stats of segment on sync.
func (s *SegmentStats) UpdateOnSync(f SyncOperationMetrics) {
s.BinLogCounter += f.BinLogCounterIncr
s.BinLogFileCounter += f.BinLogFileCounterIncr
}

// Copy copies the segment stats.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ func TestSegmentStats(t *testing.T) {
Rows: 100,
BinarySize: 200,
},
MaxBinarySize: 400,
CreateTime: now,
LastModifiedTime: now,
BinLogCounter: 3,
MaxBinarySize: 400,
CreateTime: now,
LastModifiedTime: now,
BinLogCounter: 3,
BinLogFileCounter: 4,
}

insert1 := InsertMetrics{
Expand All @@ -69,7 +70,9 @@ func TestSegmentStats(t *testing.T) {
assert.Equal(t, stat.Insert.BinarySize, uint64(320))

stat.UpdateOnSync(SyncOperationMetrics{
BinLogCounterIncr: 4,
BinLogCounterIncr: 4,
BinLogFileCounterIncr: 9,
})
assert.Equal(t, uint64(7), stat.BinLogCounter)
assert.Equal(t, uint64(13), stat.BinLogFileCounter)
}
4 changes: 2 additions & 2 deletions pkg/metrics/streaming_service_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,13 @@ var (
Help: "Total of append message to wal",
}, WALChannelLabelName, WALMessageTypeLabelName, StatusLabelName)

WALAppendMessageDurationSeconds = newStreamingNodeHistogramVec(prometheus.HistogramOpts{
WALAppendMessageDurationSeconds = newWALHistogramVec(prometheus.HistogramOpts{
Name: "append_message_duration_seconds",
Help: "Duration of wal append message",
Buckets: secondsBuckets,
}, WALChannelLabelName, StatusLabelName)

WALImplsAppendMessageDurationSeconds = newStreamingNodeHistogramVec(prometheus.HistogramOpts{
WALImplsAppendMessageDurationSeconds = newWALHistogramVec(prometheus.HistogramOpts{
Name: "impls_append_message_duration_seconds",
Help: "Duration of wal impls append message",
Buckets: secondsBuckets,
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -3417,8 +3417,8 @@ The max idle time of segment in seconds, 10*60.`,
Key: "dataCoord.segment.maxBinlogFileNumber",
Version: "2.2.0",
DefaultValue: "32",
Doc: `The max number of binlog file for one segment, the segment will be sealed if
the number of binlog file reaches to max value.`,
Doc: `The max number of binlog (which is equal to the binlog file num of primary key) for one segment,
the segment will be sealed if the number of binlog file reaches to max value.`,
Export: true,
}
p.SegmentMaxBinlogFileNumber.Init(base.mgr)
Expand Down

0 comments on commit 9e4982a

Please sign in to comment.