Skip to content

Commit

Permalink
fix: add binlog counter updates for stats manager in streaming node f…
Browse files Browse the repository at this point in the history
…lusher

Signed-off-by: chyezh <chyezh@outlook.com>
  • Loading branch information
chyezh committed Oct 11, 2024
1 parent d921e7d commit 6fcde7f
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 38 deletions.
17 changes: 13 additions & 4 deletions internal/flushcommon/pipeline/data_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ func initMetaCache(initCtx context.Context, chunkManager storage.ChunkManager, i
func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams,
info *datapb.ChannelWatchInfo, metacache metacache.MetaCache,
unflushed, flushed []*datapb.SegmentInfo, input <-chan *msgstream.MsgPack,
wbTaskObserverCallback writebuffer.TaskObserverCallback,
) (*DataSyncService, error) {
var (
channelName = info.GetVchan().GetChannelName()
Expand All @@ -241,7 +242,9 @@ func getServiceWithChannel(initCtx context.Context, params *util.PipelineParams,

err := params.WriteBufferManager.Register(channelName, metacache,
writebuffer.WithMetaWriter(syncmgr.BrokerMetaWriter(params.Broker, config.serverID)),
writebuffer.WithIDAllocator(params.Allocator))
writebuffer.WithIDAllocator(params.Allocator),
writebuffer.WithTaskObserverCallback(wbTaskObserverCallback),
)
if err != nil {
log.Warn("failed to register channel buffer", zap.Error(err))
return nil, err
Expand Down Expand Up @@ -355,10 +358,16 @@ func NewDataSyncService(initCtx context.Context, pipelineParams *util.PipelinePa
if metaCache, err = getMetaCacheWithTickler(initCtx, pipelineParams, info, tickler, unflushedSegmentInfos, flushedSegmentInfos); err != nil {
return nil, err
}
return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, nil)
return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, nil, nil)
}

func NewStreamingNodeDataSyncService(initCtx context.Context, pipelineParams *util.PipelineParams, info *datapb.ChannelWatchInfo, input <-chan *msgstream.MsgPack) (*DataSyncService, error) {
func NewStreamingNodeDataSyncService(
initCtx context.Context,
pipelineParams *util.PipelineParams,
info *datapb.ChannelWatchInfo,
input <-chan *msgstream.MsgPack,
wbTaskObserverCallback writebuffer.TaskObserverCallback,
) (*DataSyncService, error) {
// recover segment checkpoints
var (
err error
Expand All @@ -383,7 +392,7 @@ func NewStreamingNodeDataSyncService(initCtx context.Context, pipelineParams *ut
if metaCache, err = getMetaCacheForStreaming(initCtx, pipelineParams, info, unflushedSegmentInfos, flushedSegmentInfos); err != nil {
return nil, err
}
return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, input)
return getServiceWithChannel(initCtx, pipelineParams, info, metaCache, unflushedSegmentInfos, flushedSegmentInfos, input, wbTaskObserverCallback)
}

func NewDataSyncServiceWithMetaCache(metaCache metacache.MetaCache) *DataSyncService {
Expand Down
17 changes: 14 additions & 3 deletions internal/flushcommon/writebuffer/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@ const (

type WriteBufferOption func(opt *writeBufferOption)

type TaskObserverCallback func(t syncmgr.Task, err error)

type writeBufferOption struct {
deletePolicy string
idAllocator allocator.Interface
syncPolicies []SyncPolicy

pkStatsFactory metacache.PkStatsFactory
metaWriter syncmgr.MetaWriter
errorHandler func(error)
pkStatsFactory metacache.PkStatsFactory
metaWriter syncmgr.MetaWriter
errorHandler func(error)
taskObserverCallback TaskObserverCallback
}

func defaultWBOption(metacache metacache.MetaCache) *writeBufferOption {
Expand Down Expand Up @@ -85,3 +88,11 @@ func WithErrorHandler(handler func(err error)) WriteBufferOption {
opt.errorHandler = handler
}
}

// WithTaskObserverCallback sets the callback function for observing task status.
// The callback will be called when every task is executed, should be concurrent safe to be called.
func WithTaskObserverCallback(callback TaskObserverCallback) WriteBufferOption {
return func(opt *writeBufferOption) {
opt.taskObserverCallback = callback
}
}
38 changes: 24 additions & 14 deletions internal/flushcommon/writebuffer/write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ type writeBufferBase struct {
checkpoint *msgpb.MsgPosition
flushTimestamp *atomic.Uint64

errHandler func(err error)
errHandler func(err error)
taskObserverCallback func(t syncmgr.Task, err error) // execute when a sync task finished, should be concurrent safe.

// pre build logger
logger *log.MLogger
Expand Down Expand Up @@ -181,19 +182,20 @@ func newWriteBufferBase(channel string, metacache metacache.MetaCache, syncMgr s
}

wb := &writeBufferBase{
channelName: channel,
collectionID: metacache.Collection(),
collSchema: schema,
estSizePerRecord: estSize,
syncMgr: syncMgr,
metaWriter: option.metaWriter,
buffers: make(map[int64]*segmentBuffer),
metaCache: metacache,
serializer: serializer,
syncCheckpoint: newCheckpointCandiates(),
syncPolicies: option.syncPolicies,
flushTimestamp: flushTs,
errHandler: option.errorHandler,
channelName: channel,
collectionID: metacache.Collection(),
collSchema: schema,
estSizePerRecord: estSize,
syncMgr: syncMgr,
metaWriter: option.metaWriter,
buffers: make(map[int64]*segmentBuffer),
metaCache: metacache,
serializer: serializer,
syncCheckpoint: newCheckpointCandiates(),
syncPolicies: option.syncPolicies,
flushTimestamp: flushTs,
errHandler: option.errorHandler,
taskObserverCallback: option.taskObserverCallback,
}

wb.logger = log.With(zap.Int64("collectionID", wb.collectionID),
Expand Down Expand Up @@ -342,6 +344,10 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64)
}

result = append(result, wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
if wb.taskObserverCallback != nil {
wb.taskObserverCallback(syncTask, err)
}

if err != nil {
return err
}
Expand Down Expand Up @@ -654,6 +660,10 @@ func (wb *writeBufferBase) Close(ctx context.Context, drop bool) {
}

f := wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
if wb.taskObserverCallback != nil {
wb.taskObserverCallback(syncTask, err)
}

if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import (
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/flushcommon/pipeline"
"github.com/milvus-io/milvus/internal/flushcommon/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/streamingnode/server/resource"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal"
adaptor2 "github.com/milvus-io/milvus/internal/streamingnode/server/wal/adaptor"
"github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/stats"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
Expand Down Expand Up @@ -116,7 +118,17 @@ func (c *channelLifetime) Run() error {

// Build and add pipeline.
ds, err := pipeline.NewStreamingNodeDataSyncService(ctx, c.f.pipelineParams,
&datapb.ChannelWatchInfo{Vchan: resp.GetInfo(), Schema: resp.GetSchema()}, handler.Chan())
&datapb.ChannelWatchInfo{Vchan: resp.GetInfo(), Schema: resp.GetSchema()}, handler.Chan(), func(t syncmgr.Task, err error) {
if err != nil || t == nil {
return
}
if tt, ok := t.(*syncmgr.SyncTask); ok {
insertLogs, _, _ := tt.Binlogs()
resource.Resource().SegmentAssignStatsManager().UpdateOnSync(tt.SegmentID(),
stats.SyncOperationMetrics{BinLogCounterIncr: uint64(len(insertLogs))},
)
}
})
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ func TestSegmentAllocManager(t *testing.T) {
assert.True(t, m.IsNoWaitSeal())

// Try to seal with a policy
resource.Resource().SegmentAssignStatsManager().UpdateOnFlush(6000, stats.FlushOperationMetrics{
BinLogCounter: 100,
resource.Resource().SegmentAssignStatsManager().UpdateOnSync(6000, stats.SyncOperationMetrics{
BinLogCounterIncr: 100,
})
// ask a unacknowledgement seal for partition 3 to avoid seal operation.
result, err = m.AssignSegment(ctx, &AssignSegmentRequest{
Expand Down Expand Up @@ -266,6 +266,7 @@ func initializeTestState(t *testing.T) {
// s 6000g

paramtable.Init()
paramtable.Get().DataCoordCfg.SegmentSealProportion.SwapTempValue("1.0")
paramtable.Get().DataCoordCfg.SegmentSealProportionJitter.SwapTempValue("0.0")
paramtable.Get().DataCoordCfg.SegmentMaxSize.SwapTempValue("1")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ func NewProtoFromSegmentStat(stat *SegmentStats) *streamingpb.SegmentAssignmentS
}
}

// FlushOperationMetrics is the metrics of flush operation.
type FlushOperationMetrics struct {
BinLogCounter uint64
// SyncOperationMetrics is the metrics of sync operation.
type SyncOperationMetrics struct {
BinLogCounterIncr uint64 // the counter increment of bin log.
}

// AllocRows alloc space of rows on current segment.
Expand All @@ -71,9 +71,9 @@ func (s *SegmentStats) BinaryCanBeAssign() uint64 {
return s.MaxBinarySize - s.Insert.BinarySize
}

// UpdateOnFlush updates the stats of segment on flush.
func (s *SegmentStats) UpdateOnFlush(f FlushOperationMetrics) {
s.BinLogCounter = f.BinLogCounter
// UpdateOnSync updates the stats of segment on sync.
func (s *SegmentStats) UpdateOnSync(f SyncOperationMetrics) {
s.BinLogCounter += f.BinLogCounterIncr
}

// Copy copies the segment stats.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,17 @@ func (m *StatsManager) GetStatsOfSegment(segmentID int64) *SegmentStats {
return m.segmentStats[segmentID].Copy()
}

// UpdateOnFlush updates the stats of segment on flush.
// UpdateOnSync updates the stats of segment on sync.
// It's an async update operation, so it's not necessary to do success.
func (m *StatsManager) UpdateOnFlush(segmentID int64, flush FlushOperationMetrics) {
func (m *StatsManager) UpdateOnSync(segmentID int64, syncMetric SyncOperationMetrics) {
m.mu.Lock()
defer m.mu.Unlock()

// Must be exist, otherwise it's a bug.
if _, ok := m.segmentIndex[segmentID]; !ok {
return
}
m.segmentStats[segmentID].UpdateOnFlush(flush)
m.segmentStats[segmentID].UpdateOnSync(syncMetric)

// binlog counter is updated, notify seal manager to do seal scanning.
m.sealNotifier.AddAndNotify(m.segmentIndex[segmentID])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ func TestStatsManager(t *testing.T) {
assert.Equal(t, uint64(350), m.pchannelStats["pchannel"].BinarySize)
assert.Equal(t, uint64(250), m.pchannelStats["pchannel2"].BinarySize)

m.UpdateOnFlush(3, FlushOperationMetrics{BinLogCounter: 100})
m.UpdateOnSync(3, SyncOperationMetrics{BinLogCounterIncr: 100})
<-m.SealNotifier().WaitChan()
infos = m.SealNotifier().Get()
assert.Len(t, infos, 1)
m.UpdateOnFlush(1000, FlushOperationMetrics{BinLogCounter: 100})
m.UpdateOnSync(1000, SyncOperationMetrics{BinLogCounterIncr: 100})
shouldBlock(t, m.SealNotifier().WaitChan())

m.AllocRows(3, InsertMetrics{Rows: 400, BinarySize: 400})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func TestSegmentStats(t *testing.T) {
assert.Equal(t, stat.Insert.Rows, uint64(160))
assert.Equal(t, stat.Insert.BinarySize, uint64(320))

stat.UpdateOnFlush(FlushOperationMetrics{
BinLogCounter: 4,
stat.UpdateOnSync(SyncOperationMetrics{
BinLogCounterIncr: 4,
})
assert.Equal(t, uint64(4), stat.BinLogCounter)
assert.Equal(t, uint64(7), stat.BinLogCounter)
}

0 comments on commit 6fcde7f

Please sign in to comment.