From 4c440758f2f0c68b663b2f1da9aab259ca211912 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Thu, 21 Nov 2024 16:08:33 +0800 Subject: [PATCH] fix: [24]Replace outer lock with concurrent map (#37817) See also: #37493 pr: #37817 Signed-off-by: yangxuan --- internal/datanode/writebuffer/manager.go | 86 +++++++------------ internal/datanode/writebuffer/manager_test.go | 30 ++----- internal/datanode/writebuffer/write_buffer.go | 28 +++--- 3 files changed, 47 insertions(+), 97 deletions(-) diff --git a/internal/datanode/writebuffer/manager.go b/internal/datanode/writebuffer/manager.go index a90bbb4125bf3..d2f248a540718 100644 --- a/internal/datanode/writebuffer/manager.go +++ b/internal/datanode/writebuffer/manager.go @@ -16,6 +16,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/lifetime" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) // BufferManager is the interface for WriteBuffer management. @@ -49,7 +50,7 @@ type BufferManager interface { func NewManager(syncMgr syncmgr.SyncManager) BufferManager { return &bufferManager{ syncMgr: syncMgr, - buffers: make(map[string]WriteBuffer), + buffers: typeutil.NewConcurrentMap[string, WriteBuffer](), ch: lifetime.NewSafeChan(), } @@ -57,8 +58,7 @@ func NewManager(syncMgr syncmgr.SyncManager) BufferManager { type bufferManager struct { syncMgr syncmgr.SyncManager - buffers map[string]WriteBuffer - mut sync.RWMutex + buffers *typeutil.ConcurrentMap[string, WriteBuffer] wg sync.WaitGroup ch lifetime.SafeChan @@ -93,13 +93,11 @@ func (m *bufferManager) memoryCheck() { return } startTime := time.Now() - m.mut.RLock() defer func() { dur := time.Since(startTime) if dur > 30*time.Second { log.Warn("memory check takes too long", zap.Duration("time", dur)) } - m.mut.RUnlock() }() for { @@ -112,7 +110,7 @@ func (m *bufferManager) memoryCheck() { return mem / 1024 / 1024 } - for chanName, buf := range m.buffers { + m.buffers.Range(func(chanName string, buf WriteBuffer) bool { size := buf.MemorySize() total += size if size > candiSize { @@ -120,7 +118,8 @@ func (m *bufferManager) memoryCheck() { candidate = buf candiChan = chanName } - } + return true + }) totalMemory := hardware.GetMemoryCount() memoryWatermark := float64(totalMemory) * paramtable.Get().DataNodeCfg.MemoryForceSyncWatermark.GetAsFloat() @@ -146,28 +145,23 @@ func (m *bufferManager) Stop() { // Register a new WriteBuffer for channel. func (m *bufferManager) Register(channel string, metacache metacache.MetaCache, storageV2Cache *metacache.StorageV2Cache, opts ...WriteBufferOption) error { - m.mut.Lock() - defer m.mut.Unlock() - - _, ok := m.buffers[channel] - if ok { - return merr.WrapErrChannelReduplicate(channel) - } buf, err := NewWriteBuffer(channel, metacache, storageV2Cache, m.syncMgr, opts...) if err != nil { return err } - m.buffers[channel] = buf + + _, loaded := m.buffers.GetOrInsert(channel, buf) + if loaded { + buf.Close(context.Background(), false) + return merr.WrapErrChannelReduplicate(channel) + } return nil } // SealSegments call sync segment and change segments state to Flushed. func (m *bufferManager) SealSegments(ctx context.Context, channel string, segmentIDs []int64) error { - m.mut.RLock() - buf, ok := m.buffers[channel] - m.mut.RUnlock() - - if !ok { + buf, loaded := m.buffers.Get(channel) + if !loaded { log.Ctx(ctx).Warn("write buffer not found when flush segments", zap.String("channel", channel), zap.Int64s("segmentIDs", segmentIDs)) @@ -178,11 +172,8 @@ func (m *bufferManager) SealSegments(ctx context.Context, channel string, segmen } func (m *bufferManager) FlushChannel(ctx context.Context, channel string, flushTs uint64) error { - m.mut.RLock() - buf, ok := m.buffers[channel] - m.mut.RUnlock() - - if !ok { + buf, loaded := m.buffers.Get(channel) + if !loaded { log.Ctx(ctx).Warn("write buffer not found when flush channel", zap.String("channel", channel), zap.Uint64("flushTs", flushTs)) @@ -193,12 +184,9 @@ func (m *bufferManager) FlushChannel(ctx context.Context, channel string, flushT } // BufferData put data into channel write buffer. -func (m *bufferManager) BufferData(channel string, insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error { - m.mut.RLock() - buf, ok := m.buffers[channel] - m.mut.RUnlock() - - if !ok { +func (m *bufferManager) BufferData(channel string, insertData []*msgstream.InsertData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error { + buf, loaded := m.buffers.Get(channel) + if !loaded { log.Ctx(context.Background()).Warn("write buffer not found when buffer data", zap.String("channel", channel)) return merr.WrapErrChannelNotFound(channel) @@ -209,11 +197,8 @@ func (m *bufferManager) BufferData(channel string, insertMsgs []*msgstream.Inser // GetCheckpoint returns checkpoint for provided channel. func (m *bufferManager) GetCheckpoint(channel string) (*msgpb.MsgPosition, bool, error) { - m.mut.RLock() - buf, ok := m.buffers[channel] - m.mut.RUnlock() - - if !ok { + buf, loaded := m.buffers.Get(channel) + if !loaded { return nil, false, merr.WrapErrChannelNotFound(channel) } cp := buf.GetCheckpoint() @@ -223,10 +208,8 @@ func (m *bufferManager) GetCheckpoint(channel string) (*msgpb.MsgPosition, bool, } func (m *bufferManager) NotifyCheckpointUpdated(channel string, ts uint64) { - m.mut.Lock() - defer m.mut.Unlock() - buf, ok := m.buffers[channel] - if !ok { + buf, loaded := m.buffers.Get(channel) + if !loaded { return } flushTs := buf.GetFlushTimestamp() @@ -239,12 +222,8 @@ func (m *bufferManager) NotifyCheckpointUpdated(channel string, ts uint64) { // RemoveChannel remove channel WriteBuffer from manager. // this method discards all buffered data since datanode no longer has the ownership func (m *bufferManager) RemoveChannel(channel string) { - m.mut.Lock() - buf, ok := m.buffers[channel] - delete(m.buffers, channel) - m.mut.Unlock() - - if !ok { + buf, loaded := m.buffers.GetAndRemove(channel) + if !loaded { log.Warn("failed to remove channel, channel not maintained in manager", zap.String("channel", channel)) return } @@ -255,12 +234,8 @@ func (m *bufferManager) RemoveChannel(channel string) { // DropChannel removes channel WriteBuffer and process `DropChannel` // this method will save all buffered data func (m *bufferManager) DropChannel(channel string) { - m.mut.Lock() - buf, ok := m.buffers[channel] - delete(m.buffers, channel) - m.mut.Unlock() - - if !ok { + buf, loaded := m.buffers.GetAndRemove(channel) + if !loaded { log.Warn("failed to drop channel, channel not maintained in manager", zap.String("channel", channel)) return } @@ -269,11 +244,8 @@ func (m *bufferManager) DropChannel(channel string) { } func (m *bufferManager) DropPartitions(channel string, partitionIDs []int64) { - m.mut.RLock() - buf, ok := m.buffers[channel] - m.mut.RUnlock() - - if !ok { + buf, loaded := m.buffers.Get(channel) + if !loaded { log.Warn("failed to drop partition, channel not maintained in manager", zap.String("channel", channel), zap.Int64s("partitionIDs", partitionIDs)) return } diff --git a/internal/datanode/writebuffer/manager_test.go b/internal/datanode/writebuffer/manager_test.go index 61b3b693a2e82..a9b91bea9292d 100644 --- a/internal/datanode/writebuffer/manager_test.go +++ b/internal/datanode/writebuffer/manager_test.go @@ -98,10 +98,7 @@ func (s *ManagerSuite) TestFlushSegments() { defer cancel() wb := NewMockWriteBuffer(s.T()) - - s.manager.mut.Lock() - s.manager.buffers[s.channelName] = wb - s.manager.mut.Unlock() + s.manager.buffers.Insert(s.channelName, wb) wb.EXPECT().SealSegments(mock.Anything, mock.Anything).Return(nil) @@ -120,10 +117,7 @@ func (s *ManagerSuite) TestBufferData() { s.Run("normal_buffer_data", func() { wb := NewMockWriteBuffer(s.T()) - s.manager.mut.Lock() - s.manager.buffers[s.channelName] = wb - s.manager.mut.Unlock() - + s.manager.buffers.Insert(s.channelName, wb) wb.EXPECT().BufferData(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) err := manager.BufferData(s.channelName, nil, nil, nil, nil) @@ -141,10 +135,7 @@ func (s *ManagerSuite) TestGetCheckpoint() { s.Run("normal_checkpoint", func() { wb := NewMockWriteBuffer(s.T()) - manager.mut.Lock() - manager.buffers[s.channelName] = wb - manager.mut.Unlock() - + manager.buffers.Insert(s.channelName, wb) pos := &msgpb.MsgPosition{ChannelName: s.channelName, Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0)} wb.EXPECT().GetCheckpoint().Return(pos) wb.EXPECT().GetFlushTimestamp().Return(nonFlushTS) @@ -157,10 +148,7 @@ func (s *ManagerSuite) TestGetCheckpoint() { s.Run("checkpoint_need_update", func() { wb := NewMockWriteBuffer(s.T()) - manager.mut.Lock() - manager.buffers[s.channelName] = wb - manager.mut.Unlock() - + manager.buffers.Insert(s.channelName, wb) cpTimestamp := tsoutil.ComposeTSByTime(time.Now(), 0) pos := &msgpb.MsgPosition{ChannelName: s.channelName, Timestamp: cpTimestamp} @@ -207,10 +195,7 @@ func (s *ManagerSuite) TestDropPartitions() { wb := NewMockWriteBuffer(s.T()) wb.EXPECT().DropPartitions(mock.Anything).Return() - manager.mut.Lock() - manager.buffers[s.channelName] = wb - manager.mut.Unlock() - + manager.buffers.Insert(s.channelName, wb) manager.DropPartitions(s.channelName, []int64{1}) }) } @@ -248,10 +233,7 @@ func (s *ManagerSuite) TestMemoryCheck() { } flag.Store(true) }).Return() - manager.mut.Lock() - manager.buffers[s.channelName] = wb - manager.mut.Unlock() - + manager.buffers.Insert(s.channelName, wb) manager.Start() defer manager.Stop() diff --git a/internal/datanode/writebuffer/write_buffer.go b/internal/datanode/writebuffer/write_buffer.go index f4c162bd29696..870b7b604cdac 100644 --- a/internal/datanode/writebuffer/write_buffer.go +++ b/internal/datanode/writebuffer/write_buffer.go @@ -65,38 +65,35 @@ type checkpointCandidate struct { } type checkpointCandidates struct { - candidates map[string]*checkpointCandidate - mu sync.RWMutex + candidates *typeutil.ConcurrentMap[string, *checkpointCandidate] +} + +func getCandidatesKey(segmentID int64, timestamp uint64) string { + return fmt.Sprintf("%d-%d", segmentID, timestamp) } func newCheckpointCandiates() *checkpointCandidates { return &checkpointCandidates{ - candidates: make(map[string]*checkpointCandidate), + candidates: typeutil.NewConcurrentMap[string, *checkpointCandidate](), // segmentID-ts } } func (c *checkpointCandidates) Remove(segmentID int64, timestamp uint64) { - c.mu.Lock() - defer c.mu.Unlock() - delete(c.candidates, fmt.Sprintf("%d-%d", segmentID, timestamp)) + c.candidates.Remove(getCandidatesKey(segmentID, timestamp)) } func (c *checkpointCandidates) Add(segmentID int64, position *msgpb.MsgPosition, source string) { - c.mu.Lock() - defer c.mu.Unlock() - c.candidates[fmt.Sprintf("%d-%d", segmentID, position.GetTimestamp())] = &checkpointCandidate{segmentID, position, source} + c.candidates.Insert(getCandidatesKey(segmentID, position.GetTimestamp()), &checkpointCandidate{segmentID, position, source}) } func (c *checkpointCandidates) GetEarliestWithDefault(def *checkpointCandidate) *checkpointCandidate { - c.mu.RLock() - defer c.mu.RUnlock() - var result *checkpointCandidate = def - for _, candidate := range c.candidates { + c.candidates.Range(func(_ string, candidate *checkpointCandidate) bool { if result == nil || candidate.position.GetTimestamp() < result.position.GetTimestamp() { result = candidate } - } + return true + }) return result } @@ -111,8 +108,6 @@ func NewWriteBuffer(channel string, metacache metacache.MetaCache, storageV2Cach // writeBufferBase is the common component for buffering data type writeBufferBase struct { - mut sync.RWMutex - collectionID int64 channelName string @@ -123,6 +118,7 @@ type writeBufferBase struct { estSizePerRecord int metaCache metacache.MetaCache + mut sync.RWMutex buffers map[int64]*segmentBuffer // segmentID => segmentBuffer syncPolicies []SyncPolicy