From 0c7d802dcf3a10d2afa7068bee3b618a3bf0fb05 Mon Sep 17 00:00:00 2001 From: Congqi Xia Date: Wed, 15 Nov 2023 16:03:53 +0800 Subject: [PATCH] Remove commented code and fix naming issue This PR removes all the commented code and files from PR #28320 For naming issue: - Renaming `MinCheckpoint` to `EarliestPosition`, see #28320 comment - Renaming `writebuffer.Mananger` to `BufferMananger`, see #27874 comment Signed-off-by: Congqi Xia --- Makefile | 2 +- internal/datanode/compactor.go | 2 +- internal/datanode/data_node.go | 2 +- internal/datanode/data_sync_service.go | 15 +- internal/datanode/data_sync_service_test.go | 4 +- .../datanode/flow_graph_time_tick_node.go | 4 +- internal/datanode/flow_graph_write_node.go | 4 +- internal/datanode/segment.go | 279 ------------------ internal/datanode/segment_sync_policy.go | 96 ------ internal/datanode/segment_sync_policy_test.go | 151 ---------- internal/datanode/segment_test.go | 59 ---- .../datanode/syncmgr/mock_sync_manager.go | 20 +- internal/datanode/syncmgr/sync_manager.go | 12 +- .../datanode/writebuffer/bf_write_buffer.go | 2 +- .../datanode/writebuffer/l0_write_buffer.go | 2 +- internal/datanode/writebuffer/manager.go | 28 +- internal/datanode/writebuffer/manager_test.go | 8 +- .../datanode/writebuffer/mock_mananger.go | 146 ++++----- .../datanode/writebuffer/mock_write_buffer.go | 86 +++--- .../datanode/writebuffer/segment_buffer.go | 2 +- internal/datanode/writebuffer/write_buffer.go | 15 +- 21 files changed, 179 insertions(+), 760 deletions(-) delete mode 100644 internal/datanode/segment.go delete mode 100644 internal/datanode/segment_sync_policy.go delete mode 100644 internal/datanode/segment_sync_policy_test.go delete mode 100644 internal/datanode/segment_test.go diff --git a/Makefile b/Makefile index 408115f5c5169..5c8234248ea32 100644 --- a/Makefile +++ b/Makefile @@ -433,7 +433,7 @@ generate-mockery-datanode: getdeps $(INSTALL_PATH)/mockery --name=MetaCache --dir=$(PWD)/internal/datanode/metacache --output=$(PWD)/internal/datanode/metacache --filename=mock_meta_cache.go --with-expecter --structname=MockMetaCache --outpkg=metacache --inpackage $(INSTALL_PATH)/mockery --name=SyncManager --dir=$(PWD)/internal/datanode/syncmgr --output=$(PWD)/internal/datanode/syncmgr --filename=mock_sync_manager.go --with-expecter --structname=MockSyncManager --outpkg=syncmgr --inpackage $(INSTALL_PATH)/mockery --name=WriteBuffer --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_write_buffer.go --with-expecter --structname=MockWriteBuffer --outpkg=writebuffer --inpackage - $(INSTALL_PATH)/mockery --name=Manager --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_mananger.go --with-expecter --structname=MockManager --outpkg=writebuffer --inpackage + $(INSTALL_PATH)/mockery --name=BufferManager --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_mananger.go --with-expecter --structname=MockBufferManager --outpkg=writebuffer --inpackage generate-mockery-metastore: getdeps $(INSTALL_PATH)/mockery --name=RootCoordCatalog --dir=$(PWD)/internal/metastore --output=$(PWD)/internal/metastore/mocks --filename=mock_rootcoord_catalog.go --with-expecter --structname=RootCoordCatalog --outpkg=mocks diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index ec382986c84a9..7fa4453103051 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -133,7 +133,7 @@ func (t *compactionTask) getChannelName() string { func (t *compactionTask) getNumRows() (int64, error) { numRows := int64(0) for _, binlog := range t.plan.SegmentBinlogs { - seg, ok := t.metaCache.GetSegmentByID(binlog.GetSegmentID()) // Channel.getSegment(binlog.GetSegmentID()) + seg, ok := t.metaCache.GetSegmentByID(binlog.GetSegmentID()) if !ok { return 0, merr.WrapErrSegmentNotFound(binlog.GetSegmentID(), "get compaction segments num rows failed") } diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index c9d9c1cf49bc1..3d2abab2f8635 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -89,7 +89,7 @@ type DataNode struct { eventManagerMap *typeutil.ConcurrentMap[string, *channelEventManager] syncMgr syncmgr.SyncManager - writeBufferManager writebuffer.Manager + writeBufferManager writebuffer.BufferManager clearSignal chan string // vchannel name segmentCache *Cache diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 85579a7ca6073..90dcecf445292 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -45,9 +45,8 @@ import ( // dataSyncService controls a flowgraph for a specific collection type dataSyncService struct { - ctx context.Context - cancelFn context.CancelFunc - // channel Channel // channel stores meta of channel + ctx context.Context + cancelFn context.CancelFunc metacache metacache.MetaCache opID int64 collectionID UniqueID // collection id of vchan for which this data sync service serves @@ -74,9 +73,6 @@ type dataSyncService struct { dispClient msgdispatcher.Client chunkManager storage.ChunkManager - // test only - // flushListener chan *segmentFlushPack // chan to listen flush event - stopOnce sync.Once } @@ -84,10 +80,9 @@ type nodeConfig struct { msFactory msgstream.Factory // msgStream factory collectionID UniqueID vChannelName string - // channel Channel - metacache metacache.MetaCache - allocator allocator.Allocator - serverID UniqueID + metacache metacache.MetaCache + allocator allocator.Allocator + serverID UniqueID } // start the flow graph in dataSyncService diff --git a/internal/datanode/data_sync_service_test.go b/internal/datanode/data_sync_service_test.go index 8a167ee616316..55a43da1e3449 100644 --- a/internal/datanode/data_sync_service_test.go +++ b/internal/datanode/data_sync_service_test.go @@ -326,7 +326,7 @@ type DataSyncServiceSuite struct { chunkManager *mocks.ChunkManager broker *broker.MockBroker allocator *allocator.MockAllocator - wbManager *writebuffer.MockManager + wbManager *writebuffer.MockBufferManager factory *dependency.MockFactory ms *msgstream.MockMsgStream @@ -344,7 +344,7 @@ func (s *DataSyncServiceSuite) SetupTest() { s.chunkManager = mocks.NewChunkManager(s.T()) s.broker = broker.NewMockBroker(s.T()) s.allocator = allocator.NewMockAllocator(s.T()) - s.wbManager = writebuffer.NewMockManager(s.T()) + s.wbManager = writebuffer.NewMockBufferManager(s.T()) s.broker.EXPECT().UpdateSegmentStatistics(mock.Anything, mock.Anything).Return(nil).Maybe() diff --git a/internal/datanode/flow_graph_time_tick_node.go b/internal/datanode/flow_graph_time_tick_node.go index fdf6cc144258a..1b0c06ad9f7eb 100644 --- a/internal/datanode/flow_graph_time_tick_node.go +++ b/internal/datanode/flow_graph_time_tick_node.go @@ -47,7 +47,7 @@ type ttNode struct { BaseNode vChannelName string metacache metacache.MetaCache - writeBufferManager writebuffer.Manager + writeBufferManager writebuffer.BufferManager lastUpdateTime *atomic.Time broker broker.Broker @@ -152,7 +152,7 @@ func (ttn *ttNode) updateChannelCP(channelPos *msgpb.MsgPosition, curTs time.Tim return nil } -func newTTNode(config *nodeConfig, broker broker.Broker, wbManager writebuffer.Manager) (*ttNode, error) { +func newTTNode(config *nodeConfig, broker broker.Broker, wbManager writebuffer.BufferManager) (*ttNode, error) { baseNode := BaseNode{} baseNode.SetMaxQueueLength(Params.DataNodeCfg.FlowGraphMaxQueueLength.GetAsInt32()) baseNode.SetMaxParallelism(Params.DataNodeCfg.FlowGraphMaxParallelism.GetAsInt32()) diff --git a/internal/datanode/flow_graph_write_node.go b/internal/datanode/flow_graph_write_node.go index a105df93c0e68..7f30cb26571bb 100644 --- a/internal/datanode/flow_graph_write_node.go +++ b/internal/datanode/flow_graph_write_node.go @@ -21,7 +21,7 @@ type writeNode struct { BaseNode channelName string - wbManager writebuffer.Manager + wbManager writebuffer.BufferManager updater statsUpdater metacache metacache.MetaCache } @@ -110,7 +110,7 @@ func (wNode *writeNode) Operate(in []Msg) []Msg { func newWriteNode( ctx context.Context, - writeBufferManager writebuffer.Manager, + writeBufferManager writebuffer.BufferManager, updater statsUpdater, config *nodeConfig, ) *writeNode { diff --git a/internal/datanode/segment.go b/internal/datanode/segment.go deleted file mode 100644 index 7a51730c2b544..0000000000000 --- a/internal/datanode/segment.go +++ /dev/null @@ -1,279 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package datanode - -// import ( -// "fmt" -// "math" -// "strconv" -// "sync" -// "sync/atomic" - -// "github.com/bits-and-blooms/bloom/v3" -// "go.uber.org/zap" - -// "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" -// "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" -// "github.com/milvus-io/milvus/internal/proto/datapb" -// "github.com/milvus-io/milvus/internal/storage" -// "github.com/milvus-io/milvus/pkg/log" -// "github.com/milvus-io/milvus/pkg/metrics" -// "github.com/milvus-io/milvus/pkg/util/paramtable" -// "github.com/milvus-io/milvus/pkg/util/tsoutil" -// ) - -// // Segment contains the latest segment infos from channel. -// type Segment struct { -// collectionID UniqueID -// partitionID UniqueID -// segmentID UniqueID -// sType atomic.Value // datapb.SegmentType - -// numRows int64 -// memorySize int64 -// compactedTo UniqueID - -// curInsertBuf *BufferData -// curDeleteBuf *DelDataBuf -// historyInsertBuf []*BufferData -// historyDeleteBuf []*DelDataBuf - -// statLock sync.RWMutex -// currentStat *storage.PkStatistics -// historyStats []*storage.PkStatistics - -// startPos *msgpb.MsgPosition // TODO readonly -// lazyLoading atomic.Value -// syncing atomic.Value -// released atomic.Value -// } - -// func (s *Segment) isSyncing() bool { -// if s != nil { -// b, ok := s.syncing.Load().(bool) -// if ok { -// return b -// } -// } -// return false -// } - -// func (s *Segment) setSyncing(syncing bool) { -// if s != nil { -// s.syncing.Store(syncing) -// } -// } - -// func (s *Segment) isLoadingLazy() bool { -// b, ok := s.lazyLoading.Load().(bool) -// if !ok { -// return false -// } -// return b -// } - -// func (s *Segment) setLoadingLazy(b bool) { -// s.lazyLoading.Store(b) -// } - -// func (s *Segment) isReleased() bool { -// b, ok := s.released.Load().(bool) -// if !ok { -// return false -// } -// return b -// } - -// func (s *Segment) setReleased(b bool) { -// s.released.Store(b) -// } - -// func (s *Segment) isValid() bool { -// return s != nil && s.getType() != datapb.SegmentType_Compacted -// } - -// func (s *Segment) notFlushed() bool { -// return s.isValid() && s.getType() != datapb.SegmentType_Flushed -// } - -// func (s *Segment) getType() datapb.SegmentType { -// return s.sType.Load().(datapb.SegmentType) -// } - -// func (s *Segment) setType(t datapb.SegmentType) { -// s.sType.Store(t) -// } - -// func (s *Segment) updatePKRange(ids storage.FieldData) { -// s.statLock.Lock() -// defer s.statLock.Unlock() -// s.InitCurrentStat() -// err := s.currentStat.UpdatePKRange(ids) -// if err != nil { -// panic(err) -// } -// } - -// func (s *Segment) getHistoricalStats(pkField *schemapb.FieldSchema) ([]*storage.PrimaryKeyStats, int64) { -// statsList := []*storage.PrimaryKeyStats{} -// for _, stats := range s.historyStats { -// statsList = append(statsList, &storage.PrimaryKeyStats{ -// FieldID: pkField.FieldID, -// PkType: int64(pkField.DataType), -// BF: stats.PkFilter, -// MaxPk: stats.MaxPK, -// MinPk: stats.MinPK, -// }) -// } - -// if s.currentStat != nil { -// statsList = append(statsList, &storage.PrimaryKeyStats{ -// FieldID: pkField.FieldID, -// PkType: int64(pkField.DataType), -// BF: s.currentStat.PkFilter, -// MaxPk: s.currentStat.MaxPK, -// MinPk: s.currentStat.MinPK, -// }) -// } -// return statsList, s.numRows -// } - -// func (s *Segment) InitCurrentStat() { -// if s.currentStat == nil { -// s.currentStat = &storage.PkStatistics{ -// PkFilter: bloom.NewWithEstimates(storage.BloomFilterSize, storage.MaxBloomFalsePositive), -// } -// } -// } - -// // check if PK exists is current -// func (s *Segment) isPKExist(pk primaryKey) bool { -// // for integrity, report false positive while lazy loading -// if s.isLoadingLazy() { -// return true -// } -// s.statLock.Lock() -// defer s.statLock.Unlock() -// if s.currentStat != nil && s.currentStat.PkExist(pk) { -// return true -// } - -// for _, historyStats := range s.historyStats { -// if historyStats.PkExist(pk) { -// return true -// } -// } -// return false -// } - -// // setInsertBuffer set curInsertBuf. -// func (s *Segment) setInsertBuffer(buf *BufferData) { -// s.curInsertBuf = buf - -// if buf != nil && buf.buffer != nil { -// dataSize := 0 -// for _, data := range buf.buffer.Data { -// dataSize += data.GetMemorySize() -// } -// metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), -// strconv.FormatInt(s.collectionID, 10)).Add(float64(dataSize)) -// } -// } - -// // rollInsertBuffer moves curInsertBuf to historyInsertBuf, and then sets curInsertBuf to nil. -// func (s *Segment) rollInsertBuffer() { -// if s.curInsertBuf == nil { -// return -// } - -// if s.curInsertBuf.buffer != nil { -// dataSize := 0 -// for _, data := range s.curInsertBuf.buffer.Data { -// dataSize += data.GetMemorySize() -// } -// metrics.DataNodeFlowGraphBufferDataSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), -// strconv.FormatInt(s.collectionID, 10)).Sub(float64(dataSize)) -// } - -// s.curInsertBuf.buffer = nil // free buffer memory, only keep meta infos in historyInsertBuf -// s.historyInsertBuf = append(s.historyInsertBuf, s.curInsertBuf) -// s.curInsertBuf = nil -// } - -// // evictHistoryInsertBuffer removes flushed buffer from historyInsertBuf after saveBinlogPath. -// func (s *Segment) evictHistoryInsertBuffer(endPos *msgpb.MsgPosition) { -// tmpBuffers := make([]*BufferData, 0) -// for _, buf := range s.historyInsertBuf { -// if buf.endPos.Timestamp > endPos.Timestamp { -// tmpBuffers = append(tmpBuffers, buf) -// } -// } -// s.historyInsertBuf = tmpBuffers -// ts, _ := tsoutil.ParseTS(endPos.Timestamp) -// log.Info("evictHistoryInsertBuffer done", zap.Int64("segmentID", s.segmentID), zap.Time("ts", ts), zap.String("channel", endPos.ChannelName)) -// } - -// // rollDeleteBuffer moves curDeleteBuf to historyDeleteBuf, and then sets curDeleteBuf to nil. -// func (s *Segment) rollDeleteBuffer() { -// if s.curDeleteBuf == nil { -// return -// } -// s.curDeleteBuf.delData = nil // free buffer memory, only keep meta infos in historyDeleteBuf -// s.historyDeleteBuf = append(s.historyDeleteBuf, s.curDeleteBuf) -// s.curDeleteBuf = nil -// } - -// // evictHistoryDeleteBuffer removes flushed buffer from historyDeleteBuf after saveBinlogPath. -// func (s *Segment) evictHistoryDeleteBuffer(endPos *msgpb.MsgPosition) { -// tmpBuffers := make([]*DelDataBuf, 0) -// for _, buf := range s.historyDeleteBuf { -// if buf.endPos.Timestamp > endPos.Timestamp { -// tmpBuffers = append(tmpBuffers, buf) -// } -// } -// s.historyDeleteBuf = tmpBuffers -// ts, _ := tsoutil.ParseTS(endPos.Timestamp) -// log.Info("evictHistoryDeleteBuffer done", zap.Int64("segmentID", s.segmentID), zap.Time("ts", ts), zap.String("channel", endPos.ChannelName)) -// } - -// func (s *Segment) isBufferEmpty() bool { -// return s.curInsertBuf == nil && -// s.curDeleteBuf == nil && -// len(s.historyInsertBuf) == 0 && -// len(s.historyDeleteBuf) == 0 -// } - -// func (s *Segment) minBufferTs() uint64 { -// var minTs uint64 = math.MaxUint64 -// if s.curInsertBuf != nil && s.curInsertBuf.startPos != nil && s.curInsertBuf.startPos.Timestamp < minTs { -// minTs = s.curInsertBuf.startPos.Timestamp -// } -// if s.curDeleteBuf != nil && s.curDeleteBuf.startPos != nil && s.curDeleteBuf.startPos.Timestamp < minTs { -// minTs = s.curDeleteBuf.startPos.Timestamp -// } -// for _, ib := range s.historyInsertBuf { -// if ib != nil && ib.startPos != nil && ib.startPos.Timestamp < minTs { -// minTs = ib.startPos.Timestamp -// } -// } -// for _, db := range s.historyDeleteBuf { -// if db != nil && db.startPos != nil && db.startPos.Timestamp < minTs { -// minTs = db.startPos.Timestamp -// } -// } -// return minTs -// } diff --git a/internal/datanode/segment_sync_policy.go b/internal/datanode/segment_sync_policy.go deleted file mode 100644 index 06891d1c06451..0000000000000 --- a/internal/datanode/segment_sync_policy.go +++ /dev/null @@ -1,96 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package datanode - -// import ( -// "math" -// "sort" -// "time" - -// "github.com/samber/lo" -// "go.uber.org/zap" - -// "github.com/milvus-io/milvus/pkg/log" -// "github.com/milvus-io/milvus/pkg/util/tsoutil" -// ) - -// const minSyncSize = 0.5 * 1024 * 1024 - -// // segmentsSyncPolicy sync policy applies to segments -// type segmentSyncPolicy func(segments []*Segment, c Channel, ts Timestamp) []UniqueID - -// // syncPeriodically get segmentSyncPolicy with segments sync periodically. -// func syncPeriodically() segmentSyncPolicy { -// return func(segments []*Segment, c Channel, ts Timestamp) []UniqueID { -// segmentsToSync := make([]UniqueID, 0) -// for _, seg := range segments { -// endPosTime := tsoutil.PhysicalTime(ts) -// minBufferTime := tsoutil.PhysicalTime(seg.minBufferTs()) -// shouldSync := endPosTime.Sub(minBufferTime) >= Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second) -// if shouldSync { -// segmentsToSync = append(segmentsToSync, seg.segmentID) -// } -// } -// if len(segmentsToSync) > 0 { -// log.Info("sync segment periodically", zap.Int64s("segmentIDs", segmentsToSync)) -// } -// return segmentsToSync -// } -// } - -// // syncMemoryTooHigh force sync the largest segment. -// func syncMemoryTooHigh() segmentSyncPolicy { -// return func(segments []*Segment, c Channel, _ Timestamp) []UniqueID { -// if len(segments) == 0 || !c.getIsHighMemory() { -// return nil -// } -// sort.Slice(segments, func(i, j int) bool { -// return segments[i].memorySize > segments[j].memorySize -// }) -// syncSegments := make([]UniqueID, 0) -// syncSegmentsNum := math.Min(float64(Params.DataNodeCfg.MemoryForceSyncSegmentNum.GetAsInt()), float64(len(segments))) -// for i := 0; i < int(syncSegmentsNum); i++ { -// if segments[i].memorySize < minSyncSize { // prevent generating too many small binlogs -// break -// } -// syncSegments = append(syncSegments, segments[i].segmentID) -// log.Info("sync segment due to memory usage is too high", -// zap.Int64("segmentID", segments[i].segmentID), -// zap.Int64("memorySize", segments[i].memorySize)) -// } -// return syncSegments -// } -// } - -// // syncSegmentsAtTs returns a new segmentSyncPolicy, sync segments when ts exceeds ChannelMeta.flushTs -// func syncSegmentsAtTs() segmentSyncPolicy { -// return func(segments []*Segment, c Channel, ts Timestamp) []UniqueID { -// flushTs := c.getFlushTs() -// if flushTs != 0 && ts >= flushTs { -// segmentsWithBuffer := lo.Filter(segments, func(segment *Segment, _ int) bool { -// return !segment.isBufferEmpty() -// }) -// segmentIDs := lo.Map(segmentsWithBuffer, func(segment *Segment, _ int) UniqueID { -// return segment.segmentID -// }) -// log.Info("sync segment at ts", zap.Int64s("segmentIDs", segmentIDs), -// zap.Time("ts", tsoutil.PhysicalTime(ts)), zap.Time("flushTs", tsoutil.PhysicalTime(flushTs))) -// return segmentIDs -// } -// return nil -// } -// } diff --git a/internal/datanode/segment_sync_policy_test.go b/internal/datanode/segment_sync_policy_test.go deleted file mode 100644 index a679d2d76189f..0000000000000 --- a/internal/datanode/segment_sync_policy_test.go +++ /dev/null @@ -1,151 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package datanode - -// import ( -// "fmt" -// "math" -// "testing" -// "time" - -// "github.com/stretchr/testify/assert" - -// "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" -// "github.com/milvus-io/milvus/pkg/util/tsoutil" -// ) - -// func TestSyncPeriodically(t *testing.T) { -// t0 := time.Now() - -// tests := []struct { -// testName string -// bufferTs time.Time -// endPosTs time.Time -// isBufferEmpty bool -// shouldSyncNum int -// }{ -// {"test buffer empty", t0, t0.Add(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), true, 0}, -// {"test buffer not empty and stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second)), false, 1}, -// {"test buffer not empty and not stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod.GetAsDuration(time.Second) / 2), false, 0}, -// } - -// for _, test := range tests { -// t.Run(test.testName, func(t *testing.T) { -// policy := syncPeriodically() -// segment := &Segment{} -// segment.setInsertBuffer(&BufferData{ -// startPos: &msgpb.MsgPosition{ -// Timestamp: tsoutil.ComposeTSByTime(test.bufferTs, 0), -// }, -// }) -// if test.isBufferEmpty { -// segment.curInsertBuf = nil -// } -// res := policy([]*Segment{segment}, nil, tsoutil.ComposeTSByTime(test.endPosTs, 0)) -// assert.Equal(t, test.shouldSyncNum, len(res)) -// }) -// } -// } - -// func TestSyncMemoryTooHigh(t *testing.T) { -// tests := []struct { -// testName string -// syncSegmentNum int -// isHighMemory bool -// memorySizesInMB []float64 -// shouldSyncSegs []UniqueID -// }{ -// { -// "test normal 1", 3, true, -// []float64{1, 2, 3, 4, 5}, -// []UniqueID{5, 4, 3}, -// }, -// { -// "test normal 2", 2, true, -// []float64{1, 2, 3, 4, 5}, -// []UniqueID{5, 4}, -// }, -// { -// "test normal 3", 5, true, -// []float64{1, 2, 3, 4, 5}, -// []UniqueID{5, 4, 3, 2, 1}, -// }, -// { -// "test isHighMemory false", 3, false, -// []float64{1, 2, 3, 4, 5}, -// []UniqueID{}, -// }, -// { -// "test syncSegmentNum 1", 1, true, -// []float64{1, 2, 3, 4, 5}, -// []UniqueID{5}, -// }, -// { -// "test with small segment", 3, true, -// []float64{0.1, 0.1, 0.1, 4, 5}, -// []UniqueID{5, 4}, -// }, -// } - -// for _, test := range tests { -// t.Run(test.testName, func(t *testing.T) { -// channel := newChannel("channel", 0, nil, nil, nil) -// channel.setIsHighMemory(test.isHighMemory) -// Params.Save(Params.DataNodeCfg.MemoryForceSyncSegmentNum.Key, fmt.Sprintf("%d", test.syncSegmentNum)) -// policy := syncMemoryTooHigh() -// segments := make([]*Segment, len(test.memorySizesInMB)) -// for i := range segments { -// segments[i] = &Segment{ -// segmentID: UniqueID(i + 1), memorySize: int64(test.memorySizesInMB[i] * 1024 * 1024), -// } -// } -// segs := policy(segments, channel, 0) -// assert.ElementsMatch(t, segs, test.shouldSyncSegs) -// }) -// } -// } - -// func TestSyncSegmentsAtTs(t *testing.T) { -// tests := []struct { -// testName string -// ts Timestamp -// flushTs Timestamp -// shouldSyncNum int -// }{ -// {"test ts < flushTs", 100, 200, 0}, -// {"test ts > flushTs", 300, 200, 1}, -// {"test ts = flushTs", 100, 100, 1}, -// {"test flushTs = 0", 100, 0, 0}, -// {"test flushTs = maxUint64", 100, math.MaxUint64, 0}, -// } - -// for _, test := range tests { -// t.Run(test.testName, func(t *testing.T) { -// channel := newChannel("channel", 0, nil, nil, nil) -// channel.setFlushTs(test.flushTs) - -// segment := &Segment{} -// segment.setInsertBuffer(&BufferData{ -// startPos: &msgpb.MsgPosition{}, -// }) - -// policy := syncSegmentsAtTs() -// res := policy([]*Segment{segment}, channel, test.ts) -// assert.Equal(t, test.shouldSyncNum, len(res)) -// }) -// } -// } diff --git a/internal/datanode/segment_test.go b/internal/datanode/segment_test.go deleted file mode 100644 index ef0f5fedad388..0000000000000 --- a/internal/datanode/segment_test.go +++ /dev/null @@ -1,59 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package datanode - -// import ( -// "math/rand" -// "testing" - -// "github.com/stretchr/testify/assert" - -// "github.com/milvus-io/milvus/internal/storage" -// "github.com/milvus-io/milvus/pkg/common" -// ) - -// func TestSegment_UpdatePKRange(t *testing.T) { -// seg := &Segment{} - -// cases := make([]int64, 0, 100) -// for i := 0; i < 100; i++ { -// cases = append(cases, rand.Int63()) -// } -// buf := make([]byte, 8) -// for _, c := range cases { -// seg.updatePKRange(&storage.Int64FieldData{ -// Data: []int64{c}, -// }) - -// pk := newInt64PrimaryKey(c) - -// assert.Equal(t, true, seg.currentStat.MinPK.LE(pk)) -// assert.Equal(t, true, seg.currentStat.MaxPK.GE(pk)) - -// common.Endian.PutUint64(buf, uint64(c)) -// assert.True(t, seg.currentStat.PkFilter.Test(buf)) - -// assert.True(t, seg.isPKExist(pk)) -// } -// } - -// func TestEmptySegment(t *testing.T) { -// seg := &Segment{} - -// pk := newInt64PrimaryKey(1000) -// assert.False(t, seg.isPKExist(pk)) -// } diff --git a/internal/datanode/syncmgr/mock_sync_manager.go b/internal/datanode/syncmgr/mock_sync_manager.go index 2d103ce0a4673..ebf01293d58ca 100644 --- a/internal/datanode/syncmgr/mock_sync_manager.go +++ b/internal/datanode/syncmgr/mock_sync_manager.go @@ -58,8 +58,8 @@ func (_c *MockSyncManager_Block_Call) RunAndReturn(run func(int64)) *MockSyncMan return _c } -// GetMinCheckpoints provides a mock function with given fields: channel -func (_m *MockSyncManager) GetMinCheckpoints(channel string) *msgpb.MsgPosition { +// GetEarliestPosition provides a mock function with given fields: channel +func (_m *MockSyncManager) GetEarliestPosition(channel string) *msgpb.MsgPosition { ret := _m.Called(channel) var r0 *msgpb.MsgPosition @@ -74,30 +74,30 @@ func (_m *MockSyncManager) GetMinCheckpoints(channel string) *msgpb.MsgPosition return r0 } -// MockSyncManager_GetMinCheckpoints_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetMinCheckpoints' -type MockSyncManager_GetMinCheckpoints_Call struct { +// MockSyncManager_GetEarliestPosition_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetEarliestPosition' +type MockSyncManager_GetEarliestPosition_Call struct { *mock.Call } -// GetMinCheckpoints is a helper method to define mock.On call +// GetEarliestPosition is a helper method to define mock.On call // - channel string -func (_e *MockSyncManager_Expecter) GetMinCheckpoints(channel interface{}) *MockSyncManager_GetMinCheckpoints_Call { - return &MockSyncManager_GetMinCheckpoints_Call{Call: _e.mock.On("GetMinCheckpoints", channel)} +func (_e *MockSyncManager_Expecter) GetEarliestPosition(channel interface{}) *MockSyncManager_GetEarliestPosition_Call { + return &MockSyncManager_GetEarliestPosition_Call{Call: _e.mock.On("GetEarliestPosition", channel)} } -func (_c *MockSyncManager_GetMinCheckpoints_Call) Run(run func(channel string)) *MockSyncManager_GetMinCheckpoints_Call { +func (_c *MockSyncManager_GetEarliestPosition_Call) Run(run func(channel string)) *MockSyncManager_GetEarliestPosition_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(string)) }) return _c } -func (_c *MockSyncManager_GetMinCheckpoints_Call) Return(_a0 *msgpb.MsgPosition) *MockSyncManager_GetMinCheckpoints_Call { +func (_c *MockSyncManager_GetEarliestPosition_Call) Return(_a0 *msgpb.MsgPosition) *MockSyncManager_GetEarliestPosition_Call { _c.Call.Return(_a0) return _c } -func (_c *MockSyncManager_GetMinCheckpoints_Call) RunAndReturn(run func(string) *msgpb.MsgPosition) *MockSyncManager_GetMinCheckpoints_Call { +func (_c *MockSyncManager_GetEarliestPosition_Call) RunAndReturn(run func(string) *msgpb.MsgPosition) *MockSyncManager_GetEarliestPosition_Call { _c.Call.Return(run) return _c } diff --git a/internal/datanode/syncmgr/sync_manager.go b/internal/datanode/syncmgr/sync_manager.go index f29797cba2beb..53a75c392e16c 100644 --- a/internal/datanode/syncmgr/sync_manager.go +++ b/internal/datanode/syncmgr/sync_manager.go @@ -34,10 +34,18 @@ type SyncMeta struct { metacache metacache.MetaCache } +// SyncMangger is the interface for sync manager. +// it processes the sync tasks inside and changes the meta. type SyncManager interface { + // SyncData is the method to submit sync task. SyncData(ctx context.Context, task *SyncTask) *conc.Future[error] - GetMinCheckpoints(channel string) *msgpb.MsgPosition + // GetEarliestPosition returns the earliest position (normally start position) of the processing sync task of provided channel. + GetEarliestPosition(channel string) *msgpb.MsgPosition + // Block allows caller to block tasks of provided segment id. + // normally used by compaction task. + // if levelzero delta policy is enabled, this shall be an empty operation. Block(segmentID int64) + // Unblock is the reverse method for `Block`. Unblock(segmentID int64) } @@ -75,7 +83,7 @@ func (mgr syncManager) SyncData(ctx context.Context, task *SyncTask) *conc.Futur }) } -func (mgr syncManager) GetMinCheckpoints(channel string) *msgpb.MsgPosition { +func (mgr syncManager) GetEarliestPosition(channel string) *msgpb.MsgPosition { var cp *msgpb.MsgPosition mgr.tasks.Range(func(_ string, task *SyncTask) bool { if task.startPosition == nil { diff --git a/internal/datanode/writebuffer/bf_write_buffer.go b/internal/datanode/writebuffer/bf_write_buffer.go index 3dbce1df33100..d2b0eb68b0323 100644 --- a/internal/datanode/writebuffer/bf_write_buffer.go +++ b/internal/datanode/writebuffer/bf_write_buffer.go @@ -71,5 +71,5 @@ func (wb *bfWriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg // update buffer last checkpoint wb.checkpoint = endPos - return wb.triggerAutoSync() + return wb.triggerSync() } diff --git a/internal/datanode/writebuffer/l0_write_buffer.go b/internal/datanode/writebuffer/l0_write_buffer.go index 80f20b0a550c1..d036dece17ee6 100644 --- a/internal/datanode/writebuffer/l0_write_buffer.go +++ b/internal/datanode/writebuffer/l0_write_buffer.go @@ -62,7 +62,7 @@ func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg // update buffer last checkpoint wb.checkpoint = endPos - return wb.triggerAutoSync() + return wb.triggerSync() } func (wb *l0WriteBuffer) getL0SegmentID(partitionID int64) int64 { diff --git a/internal/datanode/writebuffer/manager.go b/internal/datanode/writebuffer/manager.go index 2a4bbfc4ca080..be7e095b4e44c 100644 --- a/internal/datanode/writebuffer/manager.go +++ b/internal/datanode/writebuffer/manager.go @@ -15,8 +15,8 @@ import ( "github.com/milvus-io/milvus/pkg/util/merr" ) -// Manager is the interface for WriteBuffer management. -type Manager interface { +// BufferManager is the interface for WriteBuffer management. +type BufferManager interface { // Register adds a WriteBuffer with provided schema & options. Register(channel string, schema *schemapb.CollectionSchema, metacache metacache.MetaCache, opts ...WriteBufferOption) error // FlushSegments notifies writeBuffer corresponding to provided channel to flush segments. @@ -36,21 +36,21 @@ type Manager interface { } // NewManager returns initialized manager as `Manager` -func NewManager(syncMgr syncmgr.SyncManager) Manager { - return &manager{ +func NewManager(syncMgr syncmgr.SyncManager) BufferManager { + return &bufferManager{ syncMgr: syncMgr, buffers: make(map[string]WriteBuffer), } } -type manager struct { +type bufferManager struct { syncMgr syncmgr.SyncManager buffers map[string]WriteBuffer mut sync.RWMutex } // Register a new WriteBuffer for channel. -func (m *manager) Register(channel string, schema *schemapb.CollectionSchema, metacache metacache.MetaCache, opts ...WriteBufferOption) error { +func (m *bufferManager) Register(channel string, schema *schemapb.CollectionSchema, metacache metacache.MetaCache, opts ...WriteBufferOption) error { m.mut.Lock() defer m.mut.Unlock() @@ -67,7 +67,7 @@ func (m *manager) Register(channel string, schema *schemapb.CollectionSchema, me } // FlushSegments call sync segment and change segments state to Flushed. -func (m *manager) FlushSegments(ctx context.Context, channel string, segmentIDs []int64) error { +func (m *bufferManager) FlushSegments(ctx context.Context, channel string, segmentIDs []int64) error { m.mut.RLock() buf, ok := m.buffers[channel] m.mut.RUnlock() @@ -82,7 +82,7 @@ func (m *manager) FlushSegments(ctx context.Context, channel string, segmentIDs return buf.FlushSegments(ctx, segmentIDs) } -func (m *manager) FlushChannel(ctx context.Context, channel string, flushTs uint64) error { +func (m *bufferManager) FlushChannel(ctx context.Context, channel string, flushTs uint64) error { m.mut.RLock() buf, ok := m.buffers[channel] m.mut.RUnlock() @@ -98,7 +98,7 @@ func (m *manager) FlushChannel(ctx context.Context, channel string, flushTs uint } // BufferData put data into channel write buffer. -func (m *manager) BufferData(channel string, insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error { +func (m *bufferManager) BufferData(channel string, insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error { m.mut.RLock() buf, ok := m.buffers[channel] m.mut.RUnlock() @@ -113,7 +113,7 @@ func (m *manager) BufferData(channel string, insertMsgs []*msgstream.InsertMsg, } // GetCheckpoint returns checkpoint for provided channel. -func (m *manager) GetCheckpoint(channel string) (*msgpb.MsgPosition, bool, error) { +func (m *bufferManager) GetCheckpoint(channel string) (*msgpb.MsgPosition, bool, error) { m.mut.RLock() buf, ok := m.buffers[channel] m.mut.RUnlock() @@ -121,13 +121,13 @@ func (m *manager) GetCheckpoint(channel string) (*msgpb.MsgPosition, bool, error if !ok { return nil, false, merr.WrapErrChannelNotFound(channel) } - cp := buf.MinCheckpoint() + cp := buf.GetCheckpoint() flushTs := buf.GetFlushTimestamp() return cp, flushTs != nonFlushTS && cp.GetTimestamp() >= flushTs, nil } -func (m *manager) NotifyCheckpointUpdated(channel string, ts uint64) { +func (m *bufferManager) NotifyCheckpointUpdated(channel string, ts uint64) { m.mut.Lock() defer m.mut.Unlock() buf, ok := m.buffers[channel] @@ -143,7 +143,7 @@ func (m *manager) NotifyCheckpointUpdated(channel string, ts uint64) { // RemoveChannel remove channel WriteBuffer from manager. // this method discards all buffered data since datanode no longer has the ownership -func (m *manager) RemoveChannel(channel string) { +func (m *bufferManager) RemoveChannel(channel string) { m.mut.Lock() buf, ok := m.buffers[channel] delete(m.buffers, channel) @@ -159,7 +159,7 @@ func (m *manager) RemoveChannel(channel string) { // DropChannel removes channel WriteBuffer and process `DropChannel` // this method will save all buffered data -func (m *manager) DropChannel(channel string) { +func (m *bufferManager) DropChannel(channel string) { m.mut.Lock() buf, ok := m.buffers[channel] delete(m.buffers, channel) diff --git a/internal/datanode/writebuffer/manager_test.go b/internal/datanode/writebuffer/manager_test.go index 42bbd94c2189f..7a437012fb537 100644 --- a/internal/datanode/writebuffer/manager_test.go +++ b/internal/datanode/writebuffer/manager_test.go @@ -26,7 +26,7 @@ type ManagerSuite struct { syncMgr *syncmgr.MockSyncManager metacache *metacache.MockMetaCache - manager *manager + manager *bufferManager } func (s *ManagerSuite) SetupSuite() { @@ -55,7 +55,7 @@ func (s *ManagerSuite) SetupTest() { mgr := NewManager(s.syncMgr) var ok bool - s.manager, ok = mgr.(*manager) + s.manager, ok = mgr.(*bufferManager) s.Require().True(ok) } @@ -132,7 +132,7 @@ func (s *ManagerSuite) TestGetCheckpoint() { manager.mut.Unlock() pos := &msgpb.MsgPosition{ChannelName: s.channelName, Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0)} - wb.EXPECT().MinCheckpoint().Return(pos) + wb.EXPECT().GetCheckpoint().Return(pos) wb.EXPECT().GetFlushTimestamp().Return(nonFlushTS) result, needUpdate, err := manager.GetCheckpoint(s.channelName) s.NoError(err) @@ -150,7 +150,7 @@ func (s *ManagerSuite) TestGetCheckpoint() { cpTimestamp := tsoutil.ComposeTSByTime(time.Now(), 0) pos := &msgpb.MsgPosition{ChannelName: s.channelName, Timestamp: cpTimestamp} - wb.EXPECT().MinCheckpoint().Return(pos) + wb.EXPECT().GetCheckpoint().Return(pos) wb.EXPECT().GetFlushTimestamp().Return(cpTimestamp - 1) result, needUpdate, err := manager.GetCheckpoint(s.channelName) s.NoError(err) diff --git a/internal/datanode/writebuffer/mock_mananger.go b/internal/datanode/writebuffer/mock_mananger.go index 2701b8e306e87..a42efcd32a299 100644 --- a/internal/datanode/writebuffer/mock_mananger.go +++ b/internal/datanode/writebuffer/mock_mananger.go @@ -15,21 +15,21 @@ import ( schemapb "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" ) -// MockManager is an autogenerated mock type for the Manager type -type MockManager struct { +// MockBufferManager is an autogenerated mock type for the BufferManager type +type MockBufferManager struct { mock.Mock } -type MockManager_Expecter struct { +type MockBufferManager_Expecter struct { mock *mock.Mock } -func (_m *MockManager) EXPECT() *MockManager_Expecter { - return &MockManager_Expecter{mock: &_m.Mock} +func (_m *MockBufferManager) EXPECT() *MockBufferManager_Expecter { + return &MockBufferManager_Expecter{mock: &_m.Mock} } // BufferData provides a mock function with given fields: channel, insertMsgs, deleteMsgs, startPos, endPos -func (_m *MockManager) BufferData(channel string, insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos *msgpb.MsgPosition, endPos *msgpb.MsgPosition) error { +func (_m *MockBufferManager) BufferData(channel string, insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos *msgpb.MsgPosition, endPos *msgpb.MsgPosition) error { ret := _m.Called(channel, insertMsgs, deleteMsgs, startPos, endPos) var r0 error @@ -42,8 +42,8 @@ func (_m *MockManager) BufferData(channel string, insertMsgs []*msgstream.Insert return r0 } -// MockManager_BufferData_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BufferData' -type MockManager_BufferData_Call struct { +// MockBufferManager_BufferData_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BufferData' +type MockBufferManager_BufferData_Call struct { *mock.Call } @@ -53,62 +53,62 @@ type MockManager_BufferData_Call struct { // - deleteMsgs []*msgstream.DeleteMsg // - startPos *msgpb.MsgPosition // - endPos *msgpb.MsgPosition -func (_e *MockManager_Expecter) BufferData(channel interface{}, insertMsgs interface{}, deleteMsgs interface{}, startPos interface{}, endPos interface{}) *MockManager_BufferData_Call { - return &MockManager_BufferData_Call{Call: _e.mock.On("BufferData", channel, insertMsgs, deleteMsgs, startPos, endPos)} +func (_e *MockBufferManager_Expecter) BufferData(channel interface{}, insertMsgs interface{}, deleteMsgs interface{}, startPos interface{}, endPos interface{}) *MockBufferManager_BufferData_Call { + return &MockBufferManager_BufferData_Call{Call: _e.mock.On("BufferData", channel, insertMsgs, deleteMsgs, startPos, endPos)} } -func (_c *MockManager_BufferData_Call) Run(run func(channel string, insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos *msgpb.MsgPosition, endPos *msgpb.MsgPosition)) *MockManager_BufferData_Call { +func (_c *MockBufferManager_BufferData_Call) Run(run func(channel string, insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos *msgpb.MsgPosition, endPos *msgpb.MsgPosition)) *MockBufferManager_BufferData_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(string), args[1].([]*msgstream.InsertMsg), args[2].([]*msgstream.DeleteMsg), args[3].(*msgpb.MsgPosition), args[4].(*msgpb.MsgPosition)) }) return _c } -func (_c *MockManager_BufferData_Call) Return(_a0 error) *MockManager_BufferData_Call { +func (_c *MockBufferManager_BufferData_Call) Return(_a0 error) *MockBufferManager_BufferData_Call { _c.Call.Return(_a0) return _c } -func (_c *MockManager_BufferData_Call) RunAndReturn(run func(string, []*msgstream.InsertMsg, []*msgstream.DeleteMsg, *msgpb.MsgPosition, *msgpb.MsgPosition) error) *MockManager_BufferData_Call { +func (_c *MockBufferManager_BufferData_Call) RunAndReturn(run func(string, []*msgstream.InsertMsg, []*msgstream.DeleteMsg, *msgpb.MsgPosition, *msgpb.MsgPosition) error) *MockBufferManager_BufferData_Call { _c.Call.Return(run) return _c } // DropChannel provides a mock function with given fields: channel -func (_m *MockManager) DropChannel(channel string) { +func (_m *MockBufferManager) DropChannel(channel string) { _m.Called(channel) } -// MockManager_DropChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropChannel' -type MockManager_DropChannel_Call struct { +// MockBufferManager_DropChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropChannel' +type MockBufferManager_DropChannel_Call struct { *mock.Call } // DropChannel is a helper method to define mock.On call // - channel string -func (_e *MockManager_Expecter) DropChannel(channel interface{}) *MockManager_DropChannel_Call { - return &MockManager_DropChannel_Call{Call: _e.mock.On("DropChannel", channel)} +func (_e *MockBufferManager_Expecter) DropChannel(channel interface{}) *MockBufferManager_DropChannel_Call { + return &MockBufferManager_DropChannel_Call{Call: _e.mock.On("DropChannel", channel)} } -func (_c *MockManager_DropChannel_Call) Run(run func(channel string)) *MockManager_DropChannel_Call { +func (_c *MockBufferManager_DropChannel_Call) Run(run func(channel string)) *MockBufferManager_DropChannel_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(string)) }) return _c } -func (_c *MockManager_DropChannel_Call) Return() *MockManager_DropChannel_Call { +func (_c *MockBufferManager_DropChannel_Call) Return() *MockBufferManager_DropChannel_Call { _c.Call.Return() return _c } -func (_c *MockManager_DropChannel_Call) RunAndReturn(run func(string)) *MockManager_DropChannel_Call { +func (_c *MockBufferManager_DropChannel_Call) RunAndReturn(run func(string)) *MockBufferManager_DropChannel_Call { _c.Call.Return(run) return _c } // FlushChannel provides a mock function with given fields: ctx, channel, flushTs -func (_m *MockManager) FlushChannel(ctx context.Context, channel string, flushTs uint64) error { +func (_m *MockBufferManager) FlushChannel(ctx context.Context, channel string, flushTs uint64) error { ret := _m.Called(ctx, channel, flushTs) var r0 error @@ -121,8 +121,8 @@ func (_m *MockManager) FlushChannel(ctx context.Context, channel string, flushTs return r0 } -// MockManager_FlushChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FlushChannel' -type MockManager_FlushChannel_Call struct { +// MockBufferManager_FlushChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FlushChannel' +type MockBufferManager_FlushChannel_Call struct { *mock.Call } @@ -130,29 +130,29 @@ type MockManager_FlushChannel_Call struct { // - ctx context.Context // - channel string // - flushTs uint64 -func (_e *MockManager_Expecter) FlushChannel(ctx interface{}, channel interface{}, flushTs interface{}) *MockManager_FlushChannel_Call { - return &MockManager_FlushChannel_Call{Call: _e.mock.On("FlushChannel", ctx, channel, flushTs)} +func (_e *MockBufferManager_Expecter) FlushChannel(ctx interface{}, channel interface{}, flushTs interface{}) *MockBufferManager_FlushChannel_Call { + return &MockBufferManager_FlushChannel_Call{Call: _e.mock.On("FlushChannel", ctx, channel, flushTs)} } -func (_c *MockManager_FlushChannel_Call) Run(run func(ctx context.Context, channel string, flushTs uint64)) *MockManager_FlushChannel_Call { +func (_c *MockBufferManager_FlushChannel_Call) Run(run func(ctx context.Context, channel string, flushTs uint64)) *MockBufferManager_FlushChannel_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context), args[1].(string), args[2].(uint64)) }) return _c } -func (_c *MockManager_FlushChannel_Call) Return(_a0 error) *MockManager_FlushChannel_Call { +func (_c *MockBufferManager_FlushChannel_Call) Return(_a0 error) *MockBufferManager_FlushChannel_Call { _c.Call.Return(_a0) return _c } -func (_c *MockManager_FlushChannel_Call) RunAndReturn(run func(context.Context, string, uint64) error) *MockManager_FlushChannel_Call { +func (_c *MockBufferManager_FlushChannel_Call) RunAndReturn(run func(context.Context, string, uint64) error) *MockBufferManager_FlushChannel_Call { _c.Call.Return(run) return _c } // FlushSegments provides a mock function with given fields: ctx, channel, segmentIDs -func (_m *MockManager) FlushSegments(ctx context.Context, channel string, segmentIDs []int64) error { +func (_m *MockBufferManager) FlushSegments(ctx context.Context, channel string, segmentIDs []int64) error { ret := _m.Called(ctx, channel, segmentIDs) var r0 error @@ -165,8 +165,8 @@ func (_m *MockManager) FlushSegments(ctx context.Context, channel string, segmen return r0 } -// MockManager_FlushSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FlushSegments' -type MockManager_FlushSegments_Call struct { +// MockBufferManager_FlushSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FlushSegments' +type MockBufferManager_FlushSegments_Call struct { *mock.Call } @@ -174,29 +174,29 @@ type MockManager_FlushSegments_Call struct { // - ctx context.Context // - channel string // - segmentIDs []int64 -func (_e *MockManager_Expecter) FlushSegments(ctx interface{}, channel interface{}, segmentIDs interface{}) *MockManager_FlushSegments_Call { - return &MockManager_FlushSegments_Call{Call: _e.mock.On("FlushSegments", ctx, channel, segmentIDs)} +func (_e *MockBufferManager_Expecter) FlushSegments(ctx interface{}, channel interface{}, segmentIDs interface{}) *MockBufferManager_FlushSegments_Call { + return &MockBufferManager_FlushSegments_Call{Call: _e.mock.On("FlushSegments", ctx, channel, segmentIDs)} } -func (_c *MockManager_FlushSegments_Call) Run(run func(ctx context.Context, channel string, segmentIDs []int64)) *MockManager_FlushSegments_Call { +func (_c *MockBufferManager_FlushSegments_Call) Run(run func(ctx context.Context, channel string, segmentIDs []int64)) *MockBufferManager_FlushSegments_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context), args[1].(string), args[2].([]int64)) }) return _c } -func (_c *MockManager_FlushSegments_Call) Return(_a0 error) *MockManager_FlushSegments_Call { +func (_c *MockBufferManager_FlushSegments_Call) Return(_a0 error) *MockBufferManager_FlushSegments_Call { _c.Call.Return(_a0) return _c } -func (_c *MockManager_FlushSegments_Call) RunAndReturn(run func(context.Context, string, []int64) error) *MockManager_FlushSegments_Call { +func (_c *MockBufferManager_FlushSegments_Call) RunAndReturn(run func(context.Context, string, []int64) error) *MockBufferManager_FlushSegments_Call { _c.Call.Return(run) return _c } // GetCheckpoint provides a mock function with given fields: channel -func (_m *MockManager) GetCheckpoint(channel string) (*msgpb.MsgPosition, bool, error) { +func (_m *MockBufferManager) GetCheckpoint(channel string) (*msgpb.MsgPosition, bool, error) { ret := _m.Called(channel) var r0 *msgpb.MsgPosition @@ -228,70 +228,70 @@ func (_m *MockManager) GetCheckpoint(channel string) (*msgpb.MsgPosition, bool, return r0, r1, r2 } -// MockManager_GetCheckpoint_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCheckpoint' -type MockManager_GetCheckpoint_Call struct { +// MockBufferManager_GetCheckpoint_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCheckpoint' +type MockBufferManager_GetCheckpoint_Call struct { *mock.Call } // GetCheckpoint is a helper method to define mock.On call // - channel string -func (_e *MockManager_Expecter) GetCheckpoint(channel interface{}) *MockManager_GetCheckpoint_Call { - return &MockManager_GetCheckpoint_Call{Call: _e.mock.On("GetCheckpoint", channel)} +func (_e *MockBufferManager_Expecter) GetCheckpoint(channel interface{}) *MockBufferManager_GetCheckpoint_Call { + return &MockBufferManager_GetCheckpoint_Call{Call: _e.mock.On("GetCheckpoint", channel)} } -func (_c *MockManager_GetCheckpoint_Call) Run(run func(channel string)) *MockManager_GetCheckpoint_Call { +func (_c *MockBufferManager_GetCheckpoint_Call) Run(run func(channel string)) *MockBufferManager_GetCheckpoint_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(string)) }) return _c } -func (_c *MockManager_GetCheckpoint_Call) Return(_a0 *msgpb.MsgPosition, _a1 bool, _a2 error) *MockManager_GetCheckpoint_Call { +func (_c *MockBufferManager_GetCheckpoint_Call) Return(_a0 *msgpb.MsgPosition, _a1 bool, _a2 error) *MockBufferManager_GetCheckpoint_Call { _c.Call.Return(_a0, _a1, _a2) return _c } -func (_c *MockManager_GetCheckpoint_Call) RunAndReturn(run func(string) (*msgpb.MsgPosition, bool, error)) *MockManager_GetCheckpoint_Call { +func (_c *MockBufferManager_GetCheckpoint_Call) RunAndReturn(run func(string) (*msgpb.MsgPosition, bool, error)) *MockBufferManager_GetCheckpoint_Call { _c.Call.Return(run) return _c } // NotifyCheckpointUpdated provides a mock function with given fields: channel, ts -func (_m *MockManager) NotifyCheckpointUpdated(channel string, ts uint64) { +func (_m *MockBufferManager) NotifyCheckpointUpdated(channel string, ts uint64) { _m.Called(channel, ts) } -// MockManager_NotifyCheckpointUpdated_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NotifyCheckpointUpdated' -type MockManager_NotifyCheckpointUpdated_Call struct { +// MockBufferManager_NotifyCheckpointUpdated_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'NotifyCheckpointUpdated' +type MockBufferManager_NotifyCheckpointUpdated_Call struct { *mock.Call } // NotifyCheckpointUpdated is a helper method to define mock.On call // - channel string // - ts uint64 -func (_e *MockManager_Expecter) NotifyCheckpointUpdated(channel interface{}, ts interface{}) *MockManager_NotifyCheckpointUpdated_Call { - return &MockManager_NotifyCheckpointUpdated_Call{Call: _e.mock.On("NotifyCheckpointUpdated", channel, ts)} +func (_e *MockBufferManager_Expecter) NotifyCheckpointUpdated(channel interface{}, ts interface{}) *MockBufferManager_NotifyCheckpointUpdated_Call { + return &MockBufferManager_NotifyCheckpointUpdated_Call{Call: _e.mock.On("NotifyCheckpointUpdated", channel, ts)} } -func (_c *MockManager_NotifyCheckpointUpdated_Call) Run(run func(channel string, ts uint64)) *MockManager_NotifyCheckpointUpdated_Call { +func (_c *MockBufferManager_NotifyCheckpointUpdated_Call) Run(run func(channel string, ts uint64)) *MockBufferManager_NotifyCheckpointUpdated_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(string), args[1].(uint64)) }) return _c } -func (_c *MockManager_NotifyCheckpointUpdated_Call) Return() *MockManager_NotifyCheckpointUpdated_Call { +func (_c *MockBufferManager_NotifyCheckpointUpdated_Call) Return() *MockBufferManager_NotifyCheckpointUpdated_Call { _c.Call.Return() return _c } -func (_c *MockManager_NotifyCheckpointUpdated_Call) RunAndReturn(run func(string, uint64)) *MockManager_NotifyCheckpointUpdated_Call { +func (_c *MockBufferManager_NotifyCheckpointUpdated_Call) RunAndReturn(run func(string, uint64)) *MockBufferManager_NotifyCheckpointUpdated_Call { _c.Call.Return(run) return _c } // Register provides a mock function with given fields: channel, schema, _a2, opts -func (_m *MockManager) Register(channel string, schema *schemapb.CollectionSchema, _a2 metacache.MetaCache, opts ...WriteBufferOption) error { +func (_m *MockBufferManager) Register(channel string, schema *schemapb.CollectionSchema, _a2 metacache.MetaCache, opts ...WriteBufferOption) error { _va := make([]interface{}, len(opts)) for _i := range opts { _va[_i] = opts[_i] @@ -311,8 +311,8 @@ func (_m *MockManager) Register(channel string, schema *schemapb.CollectionSchem return r0 } -// MockManager_Register_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Register' -type MockManager_Register_Call struct { +// MockBufferManager_Register_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Register' +type MockBufferManager_Register_Call struct { *mock.Call } @@ -321,12 +321,12 @@ type MockManager_Register_Call struct { // - schema *schemapb.CollectionSchema // - _a2 metacache.MetaCache // - opts ...WriteBufferOption -func (_e *MockManager_Expecter) Register(channel interface{}, schema interface{}, _a2 interface{}, opts ...interface{}) *MockManager_Register_Call { - return &MockManager_Register_Call{Call: _e.mock.On("Register", +func (_e *MockBufferManager_Expecter) Register(channel interface{}, schema interface{}, _a2 interface{}, opts ...interface{}) *MockBufferManager_Register_Call { + return &MockBufferManager_Register_Call{Call: _e.mock.On("Register", append([]interface{}{channel, schema, _a2}, opts...)...)} } -func (_c *MockManager_Register_Call) Run(run func(channel string, schema *schemapb.CollectionSchema, _a2 metacache.MetaCache, opts ...WriteBufferOption)) *MockManager_Register_Call { +func (_c *MockBufferManager_Register_Call) Run(run func(channel string, schema *schemapb.CollectionSchema, _a2 metacache.MetaCache, opts ...WriteBufferOption)) *MockBufferManager_Register_Call { _c.Call.Run(func(args mock.Arguments) { variadicArgs := make([]WriteBufferOption, len(args)-3) for i, a := range args[3:] { @@ -339,56 +339,56 @@ func (_c *MockManager_Register_Call) Run(run func(channel string, schema *schema return _c } -func (_c *MockManager_Register_Call) Return(_a0 error) *MockManager_Register_Call { +func (_c *MockBufferManager_Register_Call) Return(_a0 error) *MockBufferManager_Register_Call { _c.Call.Return(_a0) return _c } -func (_c *MockManager_Register_Call) RunAndReturn(run func(string, *schemapb.CollectionSchema, metacache.MetaCache, ...WriteBufferOption) error) *MockManager_Register_Call { +func (_c *MockBufferManager_Register_Call) RunAndReturn(run func(string, *schemapb.CollectionSchema, metacache.MetaCache, ...WriteBufferOption) error) *MockBufferManager_Register_Call { _c.Call.Return(run) return _c } // RemoveChannel provides a mock function with given fields: channel -func (_m *MockManager) RemoveChannel(channel string) { +func (_m *MockBufferManager) RemoveChannel(channel string) { _m.Called(channel) } -// MockManager_RemoveChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveChannel' -type MockManager_RemoveChannel_Call struct { +// MockBufferManager_RemoveChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveChannel' +type MockBufferManager_RemoveChannel_Call struct { *mock.Call } // RemoveChannel is a helper method to define mock.On call // - channel string -func (_e *MockManager_Expecter) RemoveChannel(channel interface{}) *MockManager_RemoveChannel_Call { - return &MockManager_RemoveChannel_Call{Call: _e.mock.On("RemoveChannel", channel)} +func (_e *MockBufferManager_Expecter) RemoveChannel(channel interface{}) *MockBufferManager_RemoveChannel_Call { + return &MockBufferManager_RemoveChannel_Call{Call: _e.mock.On("RemoveChannel", channel)} } -func (_c *MockManager_RemoveChannel_Call) Run(run func(channel string)) *MockManager_RemoveChannel_Call { +func (_c *MockBufferManager_RemoveChannel_Call) Run(run func(channel string)) *MockBufferManager_RemoveChannel_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(string)) }) return _c } -func (_c *MockManager_RemoveChannel_Call) Return() *MockManager_RemoveChannel_Call { +func (_c *MockBufferManager_RemoveChannel_Call) Return() *MockBufferManager_RemoveChannel_Call { _c.Call.Return() return _c } -func (_c *MockManager_RemoveChannel_Call) RunAndReturn(run func(string)) *MockManager_RemoveChannel_Call { +func (_c *MockBufferManager_RemoveChannel_Call) RunAndReturn(run func(string)) *MockBufferManager_RemoveChannel_Call { _c.Call.Return(run) return _c } -// NewMockManager creates a new instance of MockManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// NewMockBufferManager creates a new instance of MockBufferManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. -func NewMockManager(t interface { +func NewMockBufferManager(t interface { mock.TestingT Cleanup(func()) -}) *MockManager { - mock := &MockManager{} +}) *MockBufferManager { + mock := &MockBufferManager{} mock.Mock.Test(t) t.Cleanup(func() { mock.AssertExpectations(t) }) diff --git a/internal/datanode/writebuffer/mock_write_buffer.go b/internal/datanode/writebuffer/mock_write_buffer.go index d1f1a7f8b06d3..88e1201876473 100644 --- a/internal/datanode/writebuffer/mock_write_buffer.go +++ b/internal/datanode/writebuffer/mock_write_buffer.go @@ -145,6 +145,49 @@ func (_c *MockWriteBuffer_FlushSegments_Call) RunAndReturn(run func(context.Cont return _c } +// GetCheckpoint provides a mock function with given fields: +func (_m *MockWriteBuffer) GetCheckpoint() *msgpb.MsgPosition { + ret := _m.Called() + + var r0 *msgpb.MsgPosition + if rf, ok := ret.Get(0).(func() *msgpb.MsgPosition); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*msgpb.MsgPosition) + } + } + + return r0 +} + +// MockWriteBuffer_GetCheckpoint_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCheckpoint' +type MockWriteBuffer_GetCheckpoint_Call struct { + *mock.Call +} + +// GetCheckpoint is a helper method to define mock.On call +func (_e *MockWriteBuffer_Expecter) GetCheckpoint() *MockWriteBuffer_GetCheckpoint_Call { + return &MockWriteBuffer_GetCheckpoint_Call{Call: _e.mock.On("GetCheckpoint")} +} + +func (_c *MockWriteBuffer_GetCheckpoint_Call) Run(run func()) *MockWriteBuffer_GetCheckpoint_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockWriteBuffer_GetCheckpoint_Call) Return(_a0 *msgpb.MsgPosition) *MockWriteBuffer_GetCheckpoint_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWriteBuffer_GetCheckpoint_Call) RunAndReturn(run func() *msgpb.MsgPosition) *MockWriteBuffer_GetCheckpoint_Call { + _c.Call.Return(run) + return _c +} + // GetFlushTimestamp provides a mock function with given fields: func (_m *MockWriteBuffer) GetFlushTimestamp() uint64 { ret := _m.Called() @@ -228,49 +271,6 @@ func (_c *MockWriteBuffer_HasSegment_Call) RunAndReturn(run func(int64) bool) *M return _c } -// MinCheckpoint provides a mock function with given fields: -func (_m *MockWriteBuffer) MinCheckpoint() *msgpb.MsgPosition { - ret := _m.Called() - - var r0 *msgpb.MsgPosition - if rf, ok := ret.Get(0).(func() *msgpb.MsgPosition); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*msgpb.MsgPosition) - } - } - - return r0 -} - -// MockWriteBuffer_MinCheckpoint_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MinCheckpoint' -type MockWriteBuffer_MinCheckpoint_Call struct { - *mock.Call -} - -// MinCheckpoint is a helper method to define mock.On call -func (_e *MockWriteBuffer_Expecter) MinCheckpoint() *MockWriteBuffer_MinCheckpoint_Call { - return &MockWriteBuffer_MinCheckpoint_Call{Call: _e.mock.On("MinCheckpoint")} -} - -func (_c *MockWriteBuffer_MinCheckpoint_Call) Run(run func()) *MockWriteBuffer_MinCheckpoint_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MockWriteBuffer_MinCheckpoint_Call) Return(_a0 *msgpb.MsgPosition) *MockWriteBuffer_MinCheckpoint_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockWriteBuffer_MinCheckpoint_Call) RunAndReturn(run func() *msgpb.MsgPosition) *MockWriteBuffer_MinCheckpoint_Call { - _c.Call.Return(run) - return _c -} - // SetFlushTimestamp provides a mock function with given fields: flushTs func (_m *MockWriteBuffer) SetFlushTimestamp(flushTs uint64) { _m.Called(flushTs) diff --git a/internal/datanode/writebuffer/segment_buffer.go b/internal/datanode/writebuffer/segment_buffer.go index 718c9ac491185..bee6cd244c65e 100644 --- a/internal/datanode/writebuffer/segment_buffer.go +++ b/internal/datanode/writebuffer/segment_buffer.go @@ -44,7 +44,7 @@ func (buf *segmentBuffer) MinTimestamp() typeutil.Timestamp { return deltaTs } -func (buf *segmentBuffer) MinCheckpoint() *msgpb.MsgPosition { +func (buf *segmentBuffer) EarliestPosition() *msgpb.MsgPosition { return getEarliestCheckpoint(buf.insertBuffer.startPos, buf.deltaBuffer.startPos) } diff --git a/internal/datanode/writebuffer/write_buffer.go b/internal/datanode/writebuffer/write_buffer.go index a50b4a0117bf8..d8e1b752f486e 100644 --- a/internal/datanode/writebuffer/write_buffer.go +++ b/internal/datanode/writebuffer/write_buffer.go @@ -40,10 +40,10 @@ type WriteBuffer interface { GetFlushTimestamp() uint64 // FlushSegments is the method to perform `Sync` operation with provided options. FlushSegments(ctx context.Context, segmentIDs []int64) error - // MinCheckpoint returns current channel checkpoint. + // GetCheckpoint returns current channel checkpoint. // If there are any non-empty segment buffer, returns the earliest buffer start position. // Otherwise, returns latest buffered checkpoint. - MinCheckpoint() *msgpb.MsgPosition + GetCheckpoint() *msgpb.MsgPosition // Close is the method to close and sink current buffer data. Close(drop bool) } @@ -124,14 +124,14 @@ func (wb *writeBufferBase) GetFlushTimestamp() uint64 { return wb.flushTimestamp.Load() } -func (wb *writeBufferBase) MinCheckpoint() *msgpb.MsgPosition { +func (wb *writeBufferBase) GetCheckpoint() *msgpb.MsgPosition { wb.mut.RLock() defer wb.mut.RUnlock() - syncingPos := wb.syncMgr.GetMinCheckpoints(wb.channelName) + syncingPos := wb.syncMgr.GetEarliestPosition(wb.channelName) positions := lo.MapToSlice(wb.buffers, func(_ int64, buf *segmentBuffer) *msgpb.MsgPosition { - return buf.MinCheckpoint() + return buf.EarliestPosition() }) positions = append(positions, syncingPos) @@ -143,7 +143,7 @@ func (wb *writeBufferBase) MinCheckpoint() *msgpb.MsgPosition { return checkpoint } -func (wb *writeBufferBase) triggerAutoSync() error { +func (wb *writeBufferBase) triggerSync() error { segmentsToSync := wb.getSegmentsToSync(wb.checkpoint.GetTimestamp()) if len(segmentsToSync) > 0 { log.Info("write buffer get segments to sync", zap.Int64s("segmentIDs", segmentsToSync)) @@ -174,6 +174,7 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) syncTask := wb.getSyncTask(ctx, segmentID) if syncTask == nil { // segment info not found + log.Ctx(ctx).Warn("segment not found in meta", zap.Int64("segmentID", segmentID)) continue } @@ -218,7 +219,7 @@ func (wb *writeBufferBase) yieldBuffer(segmentID int64) (*storage.InsertData, *s // remove buffer and move it to sync manager delete(wb.buffers, segmentID) - start := buffer.MinCheckpoint() + start := buffer.EarliestPosition() insert, delta := buffer.Yield() return insert, delta, start