Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add proportion for capacity seal policy in streaming flusher #36761

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions internal/flushcommon/pipeline/data_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ func initMetaCache(initCtx context.Context, chunkManager storage.ChunkManager, i
func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams,
info *datapb.ChannelWatchInfo, metacache metacache.MetaCache,
unflushed, flushed []*datapb.SegmentInfo, input <-chan *msgstream.MsgPack,
wbTaskObserverCallback writebuffer.TaskObserverCallback,
) (*DataSyncService, error) {
var (
channelName = info.GetVchan().GetChannelName()
Expand Down Expand Up @@ -316,7 +317,8 @@ func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams,
// if fail to init flowgraph nodes.
err = params.WriteBufferManager.Register(channelName, metacache,
writebuffer.WithMetaWriter(syncmgr.BrokerMetaWriter(params.Broker, config.serverID)),
writebuffer.WithIDAllocator(params.Allocator))
writebuffer.WithIDAllocator(params.Allocator),
writebuffer.WithTaskObserverCallback(wbTaskObserverCallback))
if err != nil {
log.Warn("failed to register channel buffer", zap.String("channel", channelName), zap.Error(err))
return nil, err
Expand Down Expand Up @@ -353,10 +355,16 @@ func NewDataSyncService(initCtx context.Context, pipelineParams *util.PipelinePa
if metaCache, err = getMetaCacheWithTickler(initCtx, pipelineParams, info, tickler, unflushedSegmentInfos, flushedSegmentInfos); err != nil {
return nil, err
}
return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, nil)
return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, nil, nil)
}

func NewStreamingNodeDataSyncService(initCtx context.Context, pipelineParams *util.PipelineParams, info *datapb.ChannelWatchInfo, input <-chan *msgstream.MsgPack) (*DataSyncService, error) {
func NewStreamingNodeDataSyncService(
initCtx context.Context,
pipelineParams *util.PipelineParams,
info *datapb.ChannelWatchInfo,
input <-chan *msgstream.MsgPack,
wbTaskObserverCallback writebuffer.TaskObserverCallback,
) (*DataSyncService, error) {
// recover segment checkpoints
var (
err error
Expand All @@ -381,7 +389,7 @@ func NewStreamingNodeDataSyncService(initCtx context.Context, pipelineParams *ut
if metaCache, err = getMetaCacheForStreaming(initCtx, pipelineParams, info, unflushedSegmentInfos, flushedSegmentInfos); err != nil {
return nil, err
}
return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, input)
return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, input, wbTaskObserverCallback)
}

func NewDataSyncServiceWithMetaCache(metaCache metacache.MetaCache) *DataSyncService {
Expand Down
17 changes: 14 additions & 3 deletions internal/flushcommon/writebuffer/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@ const (

type WriteBufferOption func(opt *writeBufferOption)

type TaskObserverCallback func(t syncmgr.Task, err error)

type writeBufferOption struct {
deletePolicy string
idAllocator allocator.Interface
syncPolicies []SyncPolicy

pkStatsFactory metacache.PkStatsFactory
metaWriter syncmgr.MetaWriter
errorHandler func(error)
pkStatsFactory metacache.PkStatsFactory
metaWriter syncmgr.MetaWriter
errorHandler func(error)
taskObserverCallback TaskObserverCallback
}

func defaultWBOption(metacache metacache.MetaCache) *writeBufferOption {
Expand Down Expand Up @@ -85,3 +88,11 @@ func WithErrorHandler(handler func(err error)) WriteBufferOption {
opt.errorHandler = handler
}
}

// WithTaskObserverCallback sets the callback function for observing task status.
// The callback will be called when every task is executed, should be concurrent safe to be called.
func WithTaskObserverCallback(callback TaskObserverCallback) WriteBufferOption {
return func(opt *writeBufferOption) {
opt.taskObserverCallback = callback
}
}
38 changes: 24 additions & 14 deletions internal/flushcommon/writebuffer/write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@
checkpoint *msgpb.MsgPosition
flushTimestamp *atomic.Uint64

errHandler func(err error)
errHandler func(err error)
taskObserverCallback func(t syncmgr.Task, err error) // execute when a sync task finished, should be concurrent safe.

// pre build logger
logger *log.MLogger
Expand Down Expand Up @@ -181,19 +182,20 @@
}

wb := &writeBufferBase{
channelName: channel,
collectionID: metacache.Collection(),
collSchema: schema,
estSizePerRecord: estSize,
syncMgr: syncMgr,
metaWriter: option.metaWriter,
buffers: make(map[int64]*segmentBuffer),
metaCache: metacache,
serializer: serializer,
syncCheckpoint: newCheckpointCandiates(),
syncPolicies: option.syncPolicies,
flushTimestamp: flushTs,
errHandler: option.errorHandler,
channelName: channel,
collectionID: metacache.Collection(),
collSchema: schema,
estSizePerRecord: estSize,
syncMgr: syncMgr,
metaWriter: option.metaWriter,
buffers: make(map[int64]*segmentBuffer),
metaCache: metacache,
serializer: serializer,
syncCheckpoint: newCheckpointCandiates(),
syncPolicies: option.syncPolicies,
flushTimestamp: flushTs,
errHandler: option.errorHandler,
taskObserverCallback: option.taskObserverCallback,
}

wb.logger = log.With(zap.Int64("collectionID", wb.collectionID),
Expand Down Expand Up @@ -342,6 +344,10 @@
}

result = append(result, wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
if wb.taskObserverCallback != nil {
wb.taskObserverCallback(syncTask, err)
}

if err != nil {
return err
}
Expand Down Expand Up @@ -654,6 +660,10 @@
}

f := wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
if wb.taskObserverCallback != nil {
wb.taskObserverCallback(syncTask, err)

Check warning on line 664 in internal/flushcommon/writebuffer/write_buffer.go

View check run for this annotation

Codecov / codecov/patch

internal/flushcommon/writebuffer/write_buffer.go#L664

Added line #L664 was not covered by tests
}

if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/flushcommon/pipeline"
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
adaptor2 "github.com/milvus-io/milvus/internal/streamingnode/server/wal/adaptor"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/stats"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
Expand Down Expand Up @@ -116,7 +118,17 @@

// Build and add pipeline.
ds, err := pipeline.NewStreamingNodeDataSyncService(ctx, c.f.pipelineParams,
&datapb.ChannelWatchInfo{Vchan: resp.GetInfo(), Schema: resp.GetSchema()}, handler.Chan())
&datapb.ChannelWatchInfo{Vchan: resp.GetInfo(), Schema: resp.GetSchema()}, handler.Chan(), func(t syncmgr.Task, err error) {
if err != nil || t == nil {
return

Check warning on line 123 in internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go

View check run for this annotation

Codecov / codecov/patch

internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go#L123

Added line #L123 was not covered by tests
}
if tt, ok := t.(*syncmgr.SyncTask); ok {
insertLogs, _, _ := tt.Binlogs()
resource.Resource().SegmentAssignStatsManager().UpdateOnSync(tt.SegmentID(),
stats.SyncOperationMetrics{BinLogCounterIncr: uint64(len(insertLogs))},
)
}
})
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ func TestSegmentAllocManager(t *testing.T) {
assert.True(t, m.IsNoWaitSeal())

// Try to seal with a policy
resource.Resource().SegmentAssignStatsManager().UpdateOnFlush(6000, stats.FlushOperationMetrics{
BinLogCounter: 100,
resource.Resource().SegmentAssignStatsManager().UpdateOnSync(6000, stats.SyncOperationMetrics{
BinLogCounterIncr: 100,
})
// ask a unacknowledgement seal for partition 3 to avoid seal operation.
result, err = m.AssignSegment(ctx, &AssignSegmentRequest{
Expand Down Expand Up @@ -266,6 +266,7 @@ func initializeTestState(t *testing.T) {
// s 6000g

paramtable.Init()
paramtable.Get().DataCoordCfg.SegmentSealProportion.SwapTempValue("1.0")
paramtable.Get().DataCoordCfg.SegmentSealProportionJitter.SwapTempValue("0.0")
paramtable.Get().DataCoordCfg.SegmentMaxSize.SwapTempValue("1")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type SegmentLimitationPolicy interface {
type jitterSegmentLimitationPolicyExtraInfo struct {
Jitter float64
JitterRatio float64
Proportion float64
MaxSegmentSize uint64
}

Expand All @@ -46,13 +47,15 @@ func (p jitterSegmentLimitationPolicy) GenerateLimitation() SegmentLimitation {
jitterRatio = 1
}
maxSegmentSize := uint64(paramtable.Get().DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024)
segmentSize := uint64(jitterRatio * float64(maxSegmentSize))
proportion := paramtable.Get().DataCoordCfg.SegmentSealProportion.GetAsFloat()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add proportion configuration here.

segmentSize := uint64(jitterRatio * float64(maxSegmentSize) * proportion)
return SegmentLimitation{
PolicyName: "jitter_segment_limitation",
SegmentSize: segmentSize,
ExtraInfo: jitterSegmentLimitationPolicyExtraInfo{
Jitter: jitter,
JitterRatio: jitterRatio,
Proportion: proportion,
MaxSegmentSize: maxSegmentSize,
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ func NewProtoFromSegmentStat(stat *SegmentStats) *streamingpb.SegmentAssignmentS
}
}

// FlushOperationMetrics is the metrics of flush operation.
type FlushOperationMetrics struct {
BinLogCounter uint64
// SyncOperationMetrics is the metrics of sync operation.
type SyncOperationMetrics struct {
BinLogCounterIncr uint64 // the counter increment of bin log.
}

// AllocRows alloc space of rows on current segment.
Expand All @@ -71,9 +71,9 @@ func (s *SegmentStats) BinaryCanBeAssign() uint64 {
return s.MaxBinarySize - s.Insert.BinarySize
}

// UpdateOnFlush updates the stats of segment on flush.
func (s *SegmentStats) UpdateOnFlush(f FlushOperationMetrics) {
s.BinLogCounter = f.BinLogCounter
// UpdateOnSync updates the stats of segment on sync.
func (s *SegmentStats) UpdateOnSync(f SyncOperationMetrics) {
s.BinLogCounter += f.BinLogCounterIncr
}

// Copy copies the segment stats.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,17 @@ func (m *StatsManager) GetStatsOfSegment(segmentID int64) *SegmentStats {
return m.segmentStats[segmentID].Copy()
}

// UpdateOnFlush updates the stats of segment on flush.
// UpdateOnSync updates the stats of segment on sync.
// It's an async update operation, so it's not necessary to do success.
func (m *StatsManager) UpdateOnFlush(segmentID int64, flush FlushOperationMetrics) {
func (m *StatsManager) UpdateOnSync(segmentID int64, syncMetric SyncOperationMetrics) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update binglog count when syncmgr make syncing.

m.mu.Lock()
defer m.mu.Unlock()

// Must be exist, otherwise it's a bug.
if _, ok := m.segmentIndex[segmentID]; !ok {
return
}
m.segmentStats[segmentID].UpdateOnFlush(flush)
m.segmentStats[segmentID].UpdateOnSync(syncMetric)

// binlog counter is updated, notify seal manager to do seal scanning.
m.sealNotifier.AddAndNotify(m.segmentIndex[segmentID])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ func TestStatsManager(t *testing.T) {
assert.Equal(t, uint64(350), m.pchannelStats["pchannel"].BinarySize)
assert.Equal(t, uint64(250), m.pchannelStats["pchannel2"].BinarySize)

m.UpdateOnFlush(3, FlushOperationMetrics{BinLogCounter: 100})
m.UpdateOnSync(3, SyncOperationMetrics{BinLogCounterIncr: 100})
<-m.SealNotifier().WaitChan()
infos = m.SealNotifier().Get()
assert.Len(t, infos, 1)
m.UpdateOnFlush(1000, FlushOperationMetrics{BinLogCounter: 100})
m.UpdateOnSync(1000, SyncOperationMetrics{BinLogCounterIncr: 100})
shouldBlock(t, m.SealNotifier().WaitChan())

m.AllocRows(3, InsertMetrics{Rows: 400, BinarySize: 400})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func TestSegmentStats(t *testing.T) {
assert.Equal(t, stat.Insert.Rows, uint64(160))
assert.Equal(t, stat.Insert.BinarySize, uint64(320))

stat.UpdateOnFlush(FlushOperationMetrics{
BinLogCounter: 4,
stat.UpdateOnSync(SyncOperationMetrics{
BinLogCounterIncr: 4,
})
assert.Equal(t, uint64(4), stat.BinLogCounter)
assert.Equal(t, uint64(7), stat.BinLogCounter)
}
Loading