Skip to content

Commit

Permalink
enhance: Log channel checkpoint source info in writebuffer (#28993)
Browse files Browse the repository at this point in the history
See also #27675
Print channel checkpoint source with rated log will help debugging
system behavior

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia authored Dec 7, 2023
1 parent 6736f65 commit cb43647
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 20 deletions.
28 changes: 19 additions & 9 deletions internal/datanode/syncmgr/mock_sync_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions internal/datanode/syncmgr/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type SyncManager interface {
// SyncData is the method to submit sync task.
SyncData(ctx context.Context, task Task) *conc.Future[error]
// GetEarliestPosition returns the earliest position (normally start position) of the processing sync task of provided channel.
GetEarliestPosition(channel string) *msgpb.MsgPosition
GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition)
// Block allows caller to block tasks of provided segment id.
// normally used by compaction task.
// if levelzero delta policy is enabled, this shall be an empty operation.
Expand Down Expand Up @@ -88,20 +88,22 @@ func (mgr syncManager) SyncData(ctx context.Context, task Task) *conc.Future[err
})
}

func (mgr syncManager) GetEarliestPosition(channel string) *msgpb.MsgPosition {
func (mgr syncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition) {
var cp *msgpb.MsgPosition
var segmentID int64
mgr.tasks.Range(func(_ string, task Task) bool {
if task.StartPosition() == nil {
return true
}
if task.ChannelName() == channel {
if cp == nil || task.StartPosition().GetTimestamp() < cp.GetTimestamp() {
cp = task.StartPosition()
segmentID = task.SegmentID()
}
}
return true
})
return cp
return segmentID, cp
}

func (mgr syncManager) Block(segmentID int64) {
Expand Down
57 changes: 50 additions & 7 deletions internal/datanode/writebuffer/write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,64 @@ func (wb *writeBufferBase) GetFlushTimestamp() uint64 {
}

func (wb *writeBufferBase) GetCheckpoint() *msgpb.MsgPosition {
log := log.Ctx(context.Background()).
With(zap.String("channel", wb.channelName)).
WithRateGroup(fmt.Sprintf("writebuffer_cp_%s", wb.channelName), 1, 60)
wb.mut.RLock()
defer wb.mut.RUnlock()

syncingPos := wb.syncMgr.GetEarliestPosition(wb.channelName)
// syncCandidate from sync manager
syncSegmentID, syncCandidate := wb.syncMgr.GetEarliestPosition(wb.channelName)

positions := lo.MapToSlice(wb.buffers, func(_ int64, buf *segmentBuffer) *msgpb.MsgPosition {
return buf.EarliestPosition()
type checkpointCandidate struct {
segmentID int64
position *msgpb.MsgPosition
}
var bufferCandidate *checkpointCandidate

candidates := lo.MapToSlice(wb.buffers, func(_ int64, buf *segmentBuffer) *checkpointCandidate {
return &checkpointCandidate{buf.segmentID, buf.EarliestPosition()}
})
candidates = lo.Filter(candidates, func(candidate *checkpointCandidate, _ int) bool {
return candidate.position != nil
})
positions = append(positions, syncingPos)

checkpoint := getEarliestCheckpoint(positions...)
// all buffer are empty
if checkpoint == nil {
if len(candidates) > 0 {
bufferCandidate = lo.MinBy(candidates, func(a, b *checkpointCandidate) bool {
return a.position.GetTimestamp() < b.position.GetTimestamp()
})
}

var checkpoint *msgpb.MsgPosition
var segmentID int64
var cpSource string
switch {
case bufferCandidate == nil && syncCandidate == nil:
// all buffer are empty
log.RatedInfo(60, "checkpoint from latest consumed msg")
return wb.checkpoint
case bufferCandidate == nil && syncCandidate != nil:
checkpoint = syncCandidate
segmentID = syncSegmentID
cpSource = "syncManager"
case syncCandidate == nil && bufferCandidate != nil:
checkpoint = bufferCandidate.position
segmentID = bufferCandidate.segmentID
cpSource = "segmentBuffer"
case syncCandidate.GetTimestamp() >= bufferCandidate.position.GetTimestamp():
checkpoint = bufferCandidate.position
segmentID = bufferCandidate.segmentID
cpSource = "segmentBuffer"
case syncCandidate.GetTimestamp() < bufferCandidate.position.GetTimestamp():
checkpoint = syncCandidate
segmentID = syncSegmentID
cpSource = "syncManager"
}

log.RatedInfo(20, "checkpoint evaluated",
zap.String("cpSource", cpSource),
zap.Int64("segmentID", segmentID),
zap.Uint64("cpTimestamp", checkpoint.GetTimestamp()))
return checkpoint
}

Expand Down
150 changes: 149 additions & 1 deletion internal/datanode/writebuffer/write_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/metacache"
Expand Down Expand Up @@ -54,7 +55,7 @@ func (s *WriteBufferSuite) SetupTest() {
})
}

func (s *WriteBufferSuite) TestDefaulOption() {
func (s *WriteBufferSuite) TestDefaultOption() {
s.Run("default BFPkOracle", func() {
wb, err := NewWriteBuffer(s.channelName, s.metacache, nil, s.syncMgr)
s.NoError(err)
Expand Down Expand Up @@ -110,6 +111,153 @@ func (s *WriteBufferSuite) TestFlushSegments() {
s.NoError(err)
}

func (s *WriteBufferSuite) TestGetCheckpoint() {
s.Run("use_consume_cp", func() {
s.wb.checkpoint = &msgpb.MsgPosition{
Timestamp: 1000,
}

s.syncMgr.EXPECT().GetEarliestPosition(s.channelName).Return(0, nil).Once()

checkpoint := s.wb.GetCheckpoint()
s.EqualValues(1000, checkpoint.GetTimestamp())
})

s.Run("use_sync_mgr_cp", func() {
s.wb.checkpoint = &msgpb.MsgPosition{
Timestamp: 1000,
}

s.syncMgr.EXPECT().GetEarliestPosition(s.channelName).Return(1, &msgpb.MsgPosition{
Timestamp: 500,
}).Once()

checkpoint := s.wb.GetCheckpoint()
s.EqualValues(500, checkpoint.GetTimestamp())
})

s.Run("use_segment_buffer_min", func() {
s.wb.checkpoint = &msgpb.MsgPosition{
Timestamp: 1000,
}

s.syncMgr.EXPECT().GetEarliestPosition(s.channelName).Return(0, nil).Once()

buf1, err := newSegmentBuffer(2, s.collSchema)
s.Require().NoError(err)
buf1.insertBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 440,
}
buf1.deltaBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 400,
}
buf2, err := newSegmentBuffer(3, s.collSchema)
s.Require().NoError(err)
buf2.insertBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 550,
}
buf2.deltaBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 600,
}

s.wb.mut.Lock()
s.wb.buffers[2] = buf1
s.wb.buffers[3] = buf2
s.wb.mut.Unlock()

defer func() {
s.wb.mut.Lock()
defer s.wb.mut.Unlock()
s.wb.buffers = make(map[int64]*segmentBuffer)
}()

checkpoint := s.wb.GetCheckpoint()
s.EqualValues(400, checkpoint.GetTimestamp())
})

s.Run("sync_mgr_smaller", func() {
s.wb.checkpoint = &msgpb.MsgPosition{
Timestamp: 1000,
}

s.syncMgr.EXPECT().GetEarliestPosition(s.channelName).Return(1, &msgpb.MsgPosition{
Timestamp: 300,
}).Once()

buf1, err := newSegmentBuffer(2, s.collSchema)
s.Require().NoError(err)
buf1.insertBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 440,
}
buf1.deltaBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 400,
}
buf2, err := newSegmentBuffer(3, s.collSchema)
s.Require().NoError(err)
buf2.insertBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 550,
}
buf2.deltaBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 600,
}

s.wb.mut.Lock()
s.wb.buffers[2] = buf1
s.wb.buffers[3] = buf2
s.wb.mut.Unlock()

defer func() {
s.wb.mut.Lock()
defer s.wb.mut.Unlock()
s.wb.buffers = make(map[int64]*segmentBuffer)
}()

checkpoint := s.wb.GetCheckpoint()
s.EqualValues(300, checkpoint.GetTimestamp())
})

s.Run("segment_buffer_smaller", func() {
s.wb.checkpoint = &msgpb.MsgPosition{
Timestamp: 1000,
}

s.syncMgr.EXPECT().GetEarliestPosition(s.channelName).Return(1, &msgpb.MsgPosition{
Timestamp: 800,
}).Once()

buf1, err := newSegmentBuffer(2, s.collSchema)
s.Require().NoError(err)
buf1.insertBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 440,
}
buf1.deltaBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 400,
}
buf2, err := newSegmentBuffer(3, s.collSchema)
s.Require().NoError(err)
buf2.insertBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 550,
}
buf2.deltaBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 600,
}

s.wb.mut.Lock()
s.wb.buffers[2] = buf1
s.wb.buffers[3] = buf2
s.wb.mut.Unlock()

defer func() {
s.wb.mut.Lock()
defer s.wb.mut.Unlock()
s.wb.buffers = make(map[int64]*segmentBuffer)
}()

checkpoint := s.wb.GetCheckpoint()
s.EqualValues(400, checkpoint.GetTimestamp())
})
}

func TestWriteBufferBase(t *testing.T) {
suite.Run(t, new(WriteBufferSuite))
}

0 comments on commit cb43647

Please sign in to comment.