Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Log channel checkpoint source info in writebuffer #28993

Merged
merged 1 commit into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Log channel checkpoint source info in writebuffer
See also #27675
Print channel checkpoint source with rated log will help debugging system
behavior

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia committed Dec 6, 2023
commit b882b2afeaa6f08f2f1d759ed20286068bbd0b99
28 changes: 19 additions & 9 deletions internal/datanode/syncmgr/mock_sync_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions internal/datanode/syncmgr/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type SyncManager interface {
// SyncData is the method to submit sync task.
SyncData(ctx context.Context, task Task) *conc.Future[error]
// GetEarliestPosition returns the earliest position (normally start position) of the processing sync task of provided channel.
GetEarliestPosition(channel string) *msgpb.MsgPosition
GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition)
// Block allows caller to block tasks of provided segment id.
// normally used by compaction task.
// if levelzero delta policy is enabled, this shall be an empty operation.
Expand Down Expand Up @@ -88,20 +88,22 @@ func (mgr syncManager) SyncData(ctx context.Context, task Task) *conc.Future[err
})
}

func (mgr syncManager) GetEarliestPosition(channel string) *msgpb.MsgPosition {
func (mgr syncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition) {
var cp *msgpb.MsgPosition
var segmentID int64
mgr.tasks.Range(func(_ string, task Task) bool {
if task.StartPosition() == nil {
return true
}
if task.ChannelName() == channel {
if cp == nil || task.StartPosition().GetTimestamp() < cp.GetTimestamp() {
cp = task.StartPosition()
segmentID = task.SegmentID()
}
}
return true
})
return cp
return segmentID, cp
}

func (mgr syncManager) Block(segmentID int64) {
Expand Down
57 changes: 50 additions & 7 deletions internal/datanode/writebuffer/write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,64 @@ func (wb *writeBufferBase) GetFlushTimestamp() uint64 {
}

func (wb *writeBufferBase) GetCheckpoint() *msgpb.MsgPosition {
log := log.Ctx(context.Background()).
With(zap.String("channel", wb.channelName)).
WithRateGroup(fmt.Sprintf("writebuffer_cp_%s", wb.channelName), 1, 60)
wb.mut.RLock()
defer wb.mut.RUnlock()

syncingPos := wb.syncMgr.GetEarliestPosition(wb.channelName)
// syncCandidate from sync manager
syncSegmentID, syncCandidate := wb.syncMgr.GetEarliestPosition(wb.channelName)

positions := lo.MapToSlice(wb.buffers, func(_ int64, buf *segmentBuffer) *msgpb.MsgPosition {
return buf.EarliestPosition()
type checkpointCandidate struct {
segmentID int64
position *msgpb.MsgPosition
}
var bufferCandidate *checkpointCandidate

candidates := lo.MapToSlice(wb.buffers, func(_ int64, buf *segmentBuffer) *checkpointCandidate {
return &checkpointCandidate{buf.segmentID, buf.EarliestPosition()}
})
candidates = lo.Filter(candidates, func(candidate *checkpointCandidate, _ int) bool {
return candidate.position != nil
})
positions = append(positions, syncingPos)

checkpoint := getEarliestCheckpoint(positions...)
// all buffer are empty
if checkpoint == nil {
if len(candidates) > 0 {
bufferCandidate = lo.MinBy(candidates, func(a, b *checkpointCandidate) bool {
return a.position.GetTimestamp() < b.position.GetTimestamp()
})
}

var checkpoint *msgpb.MsgPosition
var segmentID int64
var cpSource string
switch {
case bufferCandidate == nil && syncCandidate == nil:
// all buffer are empty
log.RatedInfo(60, "checkpoint from latest consumed msg")
return wb.checkpoint
case bufferCandidate == nil && syncCandidate != nil:
checkpoint = syncCandidate
segmentID = syncSegmentID
cpSource = "syncManager"
case syncCandidate == nil && bufferCandidate != nil:
checkpoint = bufferCandidate.position
segmentID = bufferCandidate.segmentID
cpSource = "segmentBuffer"
case syncCandidate.GetTimestamp() >= bufferCandidate.position.GetTimestamp():
checkpoint = bufferCandidate.position
segmentID = bufferCandidate.segmentID
cpSource = "segmentBuffer"
case syncCandidate.GetTimestamp() < bufferCandidate.position.GetTimestamp():
checkpoint = syncCandidate
segmentID = syncSegmentID
cpSource = "syncManager"
}

log.RatedInfo(20, "checkpoint evaluated",
zap.String("cpSource", cpSource),
zap.Int64("segmentID", segmentID),
zap.Uint64("cpTimestamp", checkpoint.GetTimestamp()))
return checkpoint
}

Expand Down
150 changes: 149 additions & 1 deletion internal/datanode/writebuffer/write_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/metacache"
Expand Down Expand Up @@ -54,7 +55,7 @@ func (s *WriteBufferSuite) SetupTest() {
})
}

func (s *WriteBufferSuite) TestDefaulOption() {
func (s *WriteBufferSuite) TestDefaultOption() {
s.Run("default BFPkOracle", func() {
wb, err := NewWriteBuffer(s.channelName, s.metacache, nil, s.syncMgr)
s.NoError(err)
Expand Down Expand Up @@ -110,6 +111,153 @@ func (s *WriteBufferSuite) TestFlushSegments() {
s.NoError(err)
}

func (s *WriteBufferSuite) TestGetCheckpoint() {
s.Run("use_consume_cp", func() {
s.wb.checkpoint = &msgpb.MsgPosition{
Timestamp: 1000,
}

s.syncMgr.EXPECT().GetEarliestPosition(s.channelName).Return(0, nil).Once()

checkpoint := s.wb.GetCheckpoint()
s.EqualValues(1000, checkpoint.GetTimestamp())
})

s.Run("use_sync_mgr_cp", func() {
s.wb.checkpoint = &msgpb.MsgPosition{
Timestamp: 1000,
}

s.syncMgr.EXPECT().GetEarliestPosition(s.channelName).Return(1, &msgpb.MsgPosition{
Timestamp: 500,
}).Once()

checkpoint := s.wb.GetCheckpoint()
s.EqualValues(500, checkpoint.GetTimestamp())
})

s.Run("use_segment_buffer_min", func() {
s.wb.checkpoint = &msgpb.MsgPosition{
Timestamp: 1000,
}

s.syncMgr.EXPECT().GetEarliestPosition(s.channelName).Return(0, nil).Once()

buf1, err := newSegmentBuffer(2, s.collSchema)
s.Require().NoError(err)
buf1.insertBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 440,
}
buf1.deltaBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 400,
}
buf2, err := newSegmentBuffer(3, s.collSchema)
s.Require().NoError(err)
buf2.insertBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 550,
}
buf2.deltaBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 600,
}

s.wb.mut.Lock()
s.wb.buffers[2] = buf1
s.wb.buffers[3] = buf2
s.wb.mut.Unlock()

defer func() {
s.wb.mut.Lock()
defer s.wb.mut.Unlock()
s.wb.buffers = make(map[int64]*segmentBuffer)
}()

checkpoint := s.wb.GetCheckpoint()
s.EqualValues(400, checkpoint.GetTimestamp())
})

s.Run("sync_mgr_smaller", func() {
s.wb.checkpoint = &msgpb.MsgPosition{
Timestamp: 1000,
}

s.syncMgr.EXPECT().GetEarliestPosition(s.channelName).Return(1, &msgpb.MsgPosition{
Timestamp: 300,
}).Once()

buf1, err := newSegmentBuffer(2, s.collSchema)
s.Require().NoError(err)
buf1.insertBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 440,
}
buf1.deltaBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 400,
}
buf2, err := newSegmentBuffer(3, s.collSchema)
s.Require().NoError(err)
buf2.insertBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 550,
}
buf2.deltaBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 600,
}

s.wb.mut.Lock()
s.wb.buffers[2] = buf1
s.wb.buffers[3] = buf2
s.wb.mut.Unlock()

defer func() {
s.wb.mut.Lock()
defer s.wb.mut.Unlock()
s.wb.buffers = make(map[int64]*segmentBuffer)
}()

checkpoint := s.wb.GetCheckpoint()
s.EqualValues(300, checkpoint.GetTimestamp())
})

s.Run("segment_buffer_smaller", func() {
s.wb.checkpoint = &msgpb.MsgPosition{
Timestamp: 1000,
}

s.syncMgr.EXPECT().GetEarliestPosition(s.channelName).Return(1, &msgpb.MsgPosition{
Timestamp: 800,
}).Once()

buf1, err := newSegmentBuffer(2, s.collSchema)
s.Require().NoError(err)
buf1.insertBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 440,
}
buf1.deltaBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 400,
}
buf2, err := newSegmentBuffer(3, s.collSchema)
s.Require().NoError(err)
buf2.insertBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 550,
}
buf2.deltaBuffer.startPos = &msgpb.MsgPosition{
Timestamp: 600,
}

s.wb.mut.Lock()
s.wb.buffers[2] = buf1
s.wb.buffers[3] = buf2
s.wb.mut.Unlock()

defer func() {
s.wb.mut.Lock()
defer s.wb.mut.Unlock()
s.wb.buffers = make(map[int64]*segmentBuffer)
}()

checkpoint := s.wb.GetCheckpoint()
s.EqualValues(400, checkpoint.GetTimestamp())
})
}

func TestWriteBufferBase(t *testing.T) {
suite.Run(t, new(WriteBufferSuite))
}
Loading