Skip to content

Commit

Permalink
Remove commented code and fix naming issue
Browse files Browse the repository at this point in the history
This PR removes all the commented code and files from PR #28320

For naming issue:
- Renaming `MinCheckpoint` to `EarliestPosition`, see #28320 comment
- Renaming `writebuffer.Mananger` to `BufferMananger`, see #27874
  comment

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia committed Nov 15, 2023
1 parent 0b90507 commit 0c7d802
Show file tree
Hide file tree
Showing 21 changed files with 179 additions and 760 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ generate-mockery-datanode: getdeps
$(INSTALL_PATH)/mockery --name=MetaCache --dir=$(PWD)/internal/datanode/metacache --output=$(PWD)/internal/datanode/metacache --filename=mock_meta_cache.go --with-expecter --structname=MockMetaCache --outpkg=metacache --inpackage
$(INSTALL_PATH)/mockery --name=SyncManager --dir=$(PWD)/internal/datanode/syncmgr --output=$(PWD)/internal/datanode/syncmgr --filename=mock_sync_manager.go --with-expecter --structname=MockSyncManager --outpkg=syncmgr --inpackage
$(INSTALL_PATH)/mockery --name=WriteBuffer --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_write_buffer.go --with-expecter --structname=MockWriteBuffer --outpkg=writebuffer --inpackage
$(INSTALL_PATH)/mockery --name=Manager --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_mananger.go --with-expecter --structname=MockManager --outpkg=writebuffer --inpackage
$(INSTALL_PATH)/mockery --name=BufferManager --dir=$(PWD)/internal/datanode/writebuffer --output=$(PWD)/internal/datanode/writebuffer --filename=mock_mananger.go --with-expecter --structname=MockBufferManager --outpkg=writebuffer --inpackage

generate-mockery-metastore: getdeps
$(INSTALL_PATH)/mockery --name=RootCoordCatalog --dir=$(PWD)/internal/metastore --output=$(PWD)/internal/metastore/mocks --filename=mock_rootcoord_catalog.go --with-expecter --structname=RootCoordCatalog --outpkg=mocks
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (t *compactionTask) getChannelName() string {
func (t *compactionTask) getNumRows() (int64, error) {
numRows := int64(0)
for _, binlog := range t.plan.SegmentBinlogs {
seg, ok := t.metaCache.GetSegmentByID(binlog.GetSegmentID()) // Channel.getSegment(binlog.GetSegmentID())
seg, ok := t.metaCache.GetSegmentByID(binlog.GetSegmentID())
if !ok {
return 0, merr.WrapErrSegmentNotFound(binlog.GetSegmentID(), "get compaction segments num rows failed")
}
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type DataNode struct {
eventManagerMap *typeutil.ConcurrentMap[string, *channelEventManager]

syncMgr syncmgr.SyncManager
writeBufferManager writebuffer.Manager
writeBufferManager writebuffer.BufferManager

clearSignal chan string // vchannel name
segmentCache *Cache
Expand Down
15 changes: 5 additions & 10 deletions internal/datanode/data_sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@ import (

// dataSyncService controls a flowgraph for a specific collection
type dataSyncService struct {
ctx context.Context
cancelFn context.CancelFunc
// channel Channel // channel stores meta of channel
ctx context.Context
cancelFn context.CancelFunc
metacache metacache.MetaCache
opID int64
collectionID UniqueID // collection id of vchan for which this data sync service serves
Expand All @@ -74,20 +73,16 @@ type dataSyncService struct {
dispClient msgdispatcher.Client
chunkManager storage.ChunkManager

// test only
// flushListener chan *segmentFlushPack // chan to listen flush event

stopOnce sync.Once
}

type nodeConfig struct {
msFactory msgstream.Factory // msgStream factory
collectionID UniqueID
vChannelName string
// channel Channel
metacache metacache.MetaCache
allocator allocator.Allocator
serverID UniqueID
metacache metacache.MetaCache
allocator allocator.Allocator
serverID UniqueID
}

// start the flow graph in dataSyncService
Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/data_sync_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ type DataSyncServiceSuite struct {
chunkManager *mocks.ChunkManager
broker *broker.MockBroker
allocator *allocator.MockAllocator
wbManager *writebuffer.MockManager
wbManager *writebuffer.MockBufferManager

factory *dependency.MockFactory
ms *msgstream.MockMsgStream
Expand All @@ -344,7 +344,7 @@ func (s *DataSyncServiceSuite) SetupTest() {
s.chunkManager = mocks.NewChunkManager(s.T())
s.broker = broker.NewMockBroker(s.T())
s.allocator = allocator.NewMockAllocator(s.T())
s.wbManager = writebuffer.NewMockManager(s.T())
s.wbManager = writebuffer.NewMockBufferManager(s.T())

s.broker.EXPECT().UpdateSegmentStatistics(mock.Anything, mock.Anything).Return(nil).Maybe()

Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/flow_graph_time_tick_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type ttNode struct {
BaseNode
vChannelName string
metacache metacache.MetaCache
writeBufferManager writebuffer.Manager
writeBufferManager writebuffer.BufferManager
lastUpdateTime *atomic.Time
broker broker.Broker

Expand Down Expand Up @@ -152,7 +152,7 @@ func (ttn *ttNode) updateChannelCP(channelPos *msgpb.MsgPosition, curTs time.Tim
return nil
}

func newTTNode(config *nodeConfig, broker broker.Broker, wbManager writebuffer.Manager) (*ttNode, error) {
func newTTNode(config *nodeConfig, broker broker.Broker, wbManager writebuffer.BufferManager) (*ttNode, error) {
baseNode := BaseNode{}
baseNode.SetMaxQueueLength(Params.DataNodeCfg.FlowGraphMaxQueueLength.GetAsInt32())
baseNode.SetMaxParallelism(Params.DataNodeCfg.FlowGraphMaxParallelism.GetAsInt32())
Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/flow_graph_write_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type writeNode struct {
BaseNode

channelName string
wbManager writebuffer.Manager
wbManager writebuffer.BufferManager
updater statsUpdater
metacache metacache.MetaCache
}
Expand Down Expand Up @@ -110,7 +110,7 @@ func (wNode *writeNode) Operate(in []Msg) []Msg {

func newWriteNode(
ctx context.Context,
writeBufferManager writebuffer.Manager,
writeBufferManager writebuffer.BufferManager,
updater statsUpdater,
config *nodeConfig,
) *writeNode {
Expand Down
279 changes: 0 additions & 279 deletions internal/datanode/segment.go

This file was deleted.

Loading

0 comments on commit 0c7d802

Please sign in to comment.