diff --git a/internal/datanode/syncmgr/mock_sync_manager.go b/internal/datanode/syncmgr/mock_sync_manager.go index 9d05ee41a23af..1c1ea504b244f 100644 --- a/internal/datanode/syncmgr/mock_sync_manager.go +++ b/internal/datanode/syncmgr/mock_sync_manager.go @@ -59,19 +59,29 @@ func (_c *MockSyncManager_Block_Call) RunAndReturn(run func(int64)) *MockSyncMan } // GetEarliestPosition provides a mock function with given fields: channel -func (_m *MockSyncManager) GetEarliestPosition(channel string) *msgpb.MsgPosition { +func (_m *MockSyncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition) { ret := _m.Called(channel) - var r0 *msgpb.MsgPosition - if rf, ok := ret.Get(0).(func(string) *msgpb.MsgPosition); ok { + var r0 int64 + var r1 *msgpb.MsgPosition + if rf, ok := ret.Get(0).(func(string) (int64, *msgpb.MsgPosition)); ok { + return rf(channel) + } + if rf, ok := ret.Get(0).(func(string) int64); ok { r0 = rf(channel) } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*msgpb.MsgPosition) + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func(string) *msgpb.MsgPosition); ok { + r1 = rf(channel) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*msgpb.MsgPosition) } } - return r0 + return r0, r1 } // MockSyncManager_GetEarliestPosition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetEarliestPosition' @@ -92,12 +102,12 @@ func (_c *MockSyncManager_GetEarliestPosition_Call) Run(run func(channel string) return _c } -func (_c *MockSyncManager_GetEarliestPosition_Call) Return(_a0 *msgpb.MsgPosition) *MockSyncManager_GetEarliestPosition_Call { - _c.Call.Return(_a0) +func (_c *MockSyncManager_GetEarliestPosition_Call) Return(_a0 int64, _a1 *msgpb.MsgPosition) *MockSyncManager_GetEarliestPosition_Call { + _c.Call.Return(_a0, _a1) return _c } -func (_c *MockSyncManager_GetEarliestPosition_Call) RunAndReturn(run func(string) *msgpb.MsgPosition) *MockSyncManager_GetEarliestPosition_Call { +func (_c *MockSyncManager_GetEarliestPosition_Call) RunAndReturn(run func(string) (int64, *msgpb.MsgPosition)) *MockSyncManager_GetEarliestPosition_Call { _c.Call.Return(run) return _c } diff --git a/internal/datanode/syncmgr/sync_manager.go b/internal/datanode/syncmgr/sync_manager.go index 840398b9b943c..9358a1f6383b0 100644 --- a/internal/datanode/syncmgr/sync_manager.go +++ b/internal/datanode/syncmgr/sync_manager.go @@ -40,7 +40,7 @@ type SyncManager interface { // SyncData is the method to submit sync task. SyncData(ctx context.Context, task Task) *conc.Future[error] // GetEarliestPosition returns the earliest position (normally start position) of the processing sync task of provided channel. - GetEarliestPosition(channel string) *msgpb.MsgPosition + GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition) // Block allows caller to block tasks of provided segment id. // normally used by compaction task. // if levelzero delta policy is enabled, this shall be an empty operation. @@ -88,8 +88,9 @@ func (mgr syncManager) SyncData(ctx context.Context, task Task) *conc.Future[err }) } -func (mgr syncManager) GetEarliestPosition(channel string) *msgpb.MsgPosition { +func (mgr syncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition) { var cp *msgpb.MsgPosition + var segmentID int64 mgr.tasks.Range(func(_ string, task Task) bool { if task.StartPosition() == nil { return true @@ -97,11 +98,12 @@ func (mgr syncManager) GetEarliestPosition(channel string) *msgpb.MsgPosition { if task.ChannelName() == channel { if cp == nil || task.StartPosition().GetTimestamp() < cp.GetTimestamp() { cp = task.StartPosition() + segmentID = task.SegmentID() } } return true }) - return cp + return segmentID, cp } func (mgr syncManager) Block(segmentID int64) { diff --git a/internal/datanode/writebuffer/write_buffer.go b/internal/datanode/writebuffer/write_buffer.go index 8969f6c868589..18f407bc4d789 100644 --- a/internal/datanode/writebuffer/write_buffer.go +++ b/internal/datanode/writebuffer/write_buffer.go @@ -135,21 +135,64 @@ func (wb *writeBufferBase) GetFlushTimestamp() uint64 { } func (wb *writeBufferBase) GetCheckpoint() *msgpb.MsgPosition { + log := log.Ctx(context.Background()). + With(zap.String("channel", wb.channelName)). + WithRateGroup(fmt.Sprintf("writebuffer_cp_%s", wb.channelName), 1, 60) wb.mut.RLock() defer wb.mut.RUnlock() - syncingPos := wb.syncMgr.GetEarliestPosition(wb.channelName) + // syncCandidate from sync manager + syncSegmentID, syncCandidate := wb.syncMgr.GetEarliestPosition(wb.channelName) - positions := lo.MapToSlice(wb.buffers, func(_ int64, buf *segmentBuffer) *msgpb.MsgPosition { - return buf.EarliestPosition() + type checkpointCandidate struct { + segmentID int64 + position *msgpb.MsgPosition + } + var bufferCandidate *checkpointCandidate + + candidates := lo.MapToSlice(wb.buffers, func(_ int64, buf *segmentBuffer) *checkpointCandidate { + return &checkpointCandidate{buf.segmentID, buf.EarliestPosition()} + }) + candidates = lo.Filter(candidates, func(candidate *checkpointCandidate, _ int) bool { + return candidate.position != nil }) - positions = append(positions, syncingPos) - checkpoint := getEarliestCheckpoint(positions...) - // all buffer are empty - if checkpoint == nil { + if len(candidates) > 0 { + bufferCandidate = lo.MinBy(candidates, func(a, b *checkpointCandidate) bool { + return a.position.GetTimestamp() < b.position.GetTimestamp() + }) + } + + var checkpoint *msgpb.MsgPosition + var segmentID int64 + var cpSource string + switch { + case bufferCandidate == nil && syncCandidate == nil: + // all buffer are empty + log.RatedInfo(60, "checkpoint from latest consumed msg") return wb.checkpoint + case bufferCandidate == nil && syncCandidate != nil: + checkpoint = syncCandidate + segmentID = syncSegmentID + cpSource = "syncManager" + case syncCandidate == nil && bufferCandidate != nil: + checkpoint = bufferCandidate.position + segmentID = bufferCandidate.segmentID + cpSource = "segmentBuffer" + case syncCandidate.GetTimestamp() >= bufferCandidate.position.GetTimestamp(): + checkpoint = bufferCandidate.position + segmentID = bufferCandidate.segmentID + cpSource = "segmentBuffer" + case syncCandidate.GetTimestamp() < bufferCandidate.position.GetTimestamp(): + checkpoint = syncCandidate + segmentID = syncSegmentID + cpSource = "syncManager" } + + log.RatedInfo(20, "checkpoint evaluated", + zap.String("cpSource", cpSource), + zap.Int64("segmentID", segmentID), + zap.Uint64("cpTimestamp", checkpoint.GetTimestamp())) return checkpoint } diff --git a/internal/datanode/writebuffer/write_buffer_test.go b/internal/datanode/writebuffer/write_buffer_test.go index 12e22af354a7d..9ee97c8cce9d3 100644 --- a/internal/datanode/writebuffer/write_buffer_test.go +++ b/internal/datanode/writebuffer/write_buffer_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/datanode/metacache" @@ -54,7 +55,7 @@ func (s *WriteBufferSuite) SetupTest() { }) } -func (s *WriteBufferSuite) TestDefaulOption() { +func (s *WriteBufferSuite) TestDefaultOption() { s.Run("default BFPkOracle", func() { wb, err := NewWriteBuffer(s.channelName, s.metacache, nil, s.syncMgr) s.NoError(err) @@ -110,6 +111,153 @@ func (s *WriteBufferSuite) TestFlushSegments() { s.NoError(err) } +func (s *WriteBufferSuite) TestGetCheckpoint() { + s.Run("use_consume_cp", func() { + s.wb.checkpoint = &msgpb.MsgPosition{ + Timestamp: 1000, + } + + s.syncMgr.EXPECT().GetEarliestPosition(s.channelName).Return(0, nil).Once() + + checkpoint := s.wb.GetCheckpoint() + s.EqualValues(1000, checkpoint.GetTimestamp()) + }) + + s.Run("use_sync_mgr_cp", func() { + s.wb.checkpoint = &msgpb.MsgPosition{ + Timestamp: 1000, + } + + s.syncMgr.EXPECT().GetEarliestPosition(s.channelName).Return(1, &msgpb.MsgPosition{ + Timestamp: 500, + }).Once() + + checkpoint := s.wb.GetCheckpoint() + s.EqualValues(500, checkpoint.GetTimestamp()) + }) + + s.Run("use_segment_buffer_min", func() { + s.wb.checkpoint = &msgpb.MsgPosition{ + Timestamp: 1000, + } + + s.syncMgr.EXPECT().GetEarliestPosition(s.channelName).Return(0, nil).Once() + + buf1, err := newSegmentBuffer(2, s.collSchema) + s.Require().NoError(err) + buf1.insertBuffer.startPos = &msgpb.MsgPosition{ + Timestamp: 440, + } + buf1.deltaBuffer.startPos = &msgpb.MsgPosition{ + Timestamp: 400, + } + buf2, err := newSegmentBuffer(3, s.collSchema) + s.Require().NoError(err) + buf2.insertBuffer.startPos = &msgpb.MsgPosition{ + Timestamp: 550, + } + buf2.deltaBuffer.startPos = &msgpb.MsgPosition{ + Timestamp: 600, + } + + s.wb.mut.Lock() + s.wb.buffers[2] = buf1 + s.wb.buffers[3] = buf2 + s.wb.mut.Unlock() + + defer func() { + s.wb.mut.Lock() + defer s.wb.mut.Unlock() + s.wb.buffers = make(map[int64]*segmentBuffer) + }() + + checkpoint := s.wb.GetCheckpoint() + s.EqualValues(400, checkpoint.GetTimestamp()) + }) + + s.Run("sync_mgr_smaller", func() { + s.wb.checkpoint = &msgpb.MsgPosition{ + Timestamp: 1000, + } + + s.syncMgr.EXPECT().GetEarliestPosition(s.channelName).Return(1, &msgpb.MsgPosition{ + Timestamp: 300, + }).Once() + + buf1, err := newSegmentBuffer(2, s.collSchema) + s.Require().NoError(err) + buf1.insertBuffer.startPos = &msgpb.MsgPosition{ + Timestamp: 440, + } + buf1.deltaBuffer.startPos = &msgpb.MsgPosition{ + Timestamp: 400, + } + buf2, err := newSegmentBuffer(3, s.collSchema) + s.Require().NoError(err) + buf2.insertBuffer.startPos = &msgpb.MsgPosition{ + Timestamp: 550, + } + buf2.deltaBuffer.startPos = &msgpb.MsgPosition{ + Timestamp: 600, + } + + s.wb.mut.Lock() + s.wb.buffers[2] = buf1 + s.wb.buffers[3] = buf2 + s.wb.mut.Unlock() + + defer func() { + s.wb.mut.Lock() + defer s.wb.mut.Unlock() + s.wb.buffers = make(map[int64]*segmentBuffer) + }() + + checkpoint := s.wb.GetCheckpoint() + s.EqualValues(300, checkpoint.GetTimestamp()) + }) + + s.Run("segment_buffer_smaller", func() { + s.wb.checkpoint = &msgpb.MsgPosition{ + Timestamp: 1000, + } + + s.syncMgr.EXPECT().GetEarliestPosition(s.channelName).Return(1, &msgpb.MsgPosition{ + Timestamp: 800, + }).Once() + + buf1, err := newSegmentBuffer(2, s.collSchema) + s.Require().NoError(err) + buf1.insertBuffer.startPos = &msgpb.MsgPosition{ + Timestamp: 440, + } + buf1.deltaBuffer.startPos = &msgpb.MsgPosition{ + Timestamp: 400, + } + buf2, err := newSegmentBuffer(3, s.collSchema) + s.Require().NoError(err) + buf2.insertBuffer.startPos = &msgpb.MsgPosition{ + Timestamp: 550, + } + buf2.deltaBuffer.startPos = &msgpb.MsgPosition{ + Timestamp: 600, + } + + s.wb.mut.Lock() + s.wb.buffers[2] = buf1 + s.wb.buffers[3] = buf2 + s.wb.mut.Unlock() + + defer func() { + s.wb.mut.Lock() + defer s.wb.mut.Unlock() + s.wb.buffers = make(map[int64]*segmentBuffer) + }() + + checkpoint := s.wb.GetCheckpoint() + s.EqualValues(400, checkpoint.GetTimestamp()) + }) +} + func TestWriteBufferBase(t *testing.T) { suite.Run(t, new(WriteBufferSuite)) }