Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: [24]Replace outer lock with concurrent map (#37817) #37897

Merged
merged 3 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 28 additions & 56 deletions internal/datanode/writebuffer/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// BufferManager is the interface for WriteBuffer management.
Expand Down Expand Up @@ -49,16 +50,15 @@ type BufferManager interface {
func NewManager(syncMgr syncmgr.SyncManager) BufferManager {
return &bufferManager{
syncMgr: syncMgr,
buffers: make(map[string]WriteBuffer),
buffers: typeutil.NewConcurrentMap[string, WriteBuffer](),

ch: lifetime.NewSafeChan(),
}
}

type bufferManager struct {
syncMgr syncmgr.SyncManager
buffers map[string]WriteBuffer
mut sync.RWMutex
buffers *typeutil.ConcurrentMap[string, WriteBuffer]

wg sync.WaitGroup
ch lifetime.SafeChan
Expand Down Expand Up @@ -93,13 +93,11 @@ func (m *bufferManager) memoryCheck() {
return
}
startTime := time.Now()
m.mut.RLock()
defer func() {
dur := time.Since(startTime)
if dur > 30*time.Second {
log.Warn("memory check takes too long", zap.Duration("time", dur))
}
m.mut.RUnlock()
}()

for {
Expand All @@ -112,15 +110,16 @@ func (m *bufferManager) memoryCheck() {
return mem / 1024 / 1024
}

for chanName, buf := range m.buffers {
m.buffers.Range(func(chanName string, buf WriteBuffer) bool {
size := buf.MemorySize()
total += size
if size > candiSize {
candiSize = size
candidate = buf
candiChan = chanName
}
}
return true
})

totalMemory := hardware.GetMemoryCount()
memoryWatermark := float64(totalMemory) * paramtable.Get().DataNodeCfg.MemoryForceSyncWatermark.GetAsFloat()
Expand All @@ -146,28 +145,23 @@ func (m *bufferManager) Stop() {

// Register a new WriteBuffer for channel.
func (m *bufferManager) Register(channel string, metacache metacache.MetaCache, storageV2Cache *metacache.StorageV2Cache, opts ...WriteBufferOption) error {
m.mut.Lock()
defer m.mut.Unlock()

_, ok := m.buffers[channel]
if ok {
return merr.WrapErrChannelReduplicate(channel)
}
buf, err := NewWriteBuffer(channel, metacache, storageV2Cache, m.syncMgr, opts...)
if err != nil {
return err
}
m.buffers[channel] = buf

_, loaded := m.buffers.GetOrInsert(channel, buf)
if loaded {
buf.Close(false)
return merr.WrapErrChannelReduplicate(channel)
}
return nil
}

// SealSegments call sync segment and change segments state to Flushed.
func (m *bufferManager) SealSegments(ctx context.Context, channel string, segmentIDs []int64) error {
m.mut.RLock()
buf, ok := m.buffers[channel]
m.mut.RUnlock()

if !ok {
buf, loaded := m.buffers.Get(channel)
if !loaded {
log.Ctx(ctx).Warn("write buffer not found when flush segments",
zap.String("channel", channel),
zap.Int64s("segmentIDs", segmentIDs))
Expand All @@ -178,11 +172,8 @@ func (m *bufferManager) SealSegments(ctx context.Context, channel string, segmen
}

func (m *bufferManager) FlushChannel(ctx context.Context, channel string, flushTs uint64) error {
m.mut.RLock()
buf, ok := m.buffers[channel]
m.mut.RUnlock()

if !ok {
buf, loaded := m.buffers.Get(channel)
if !loaded {
log.Ctx(ctx).Warn("write buffer not found when flush channel",
zap.String("channel", channel),
zap.Uint64("flushTs", flushTs))
Expand All @@ -194,11 +185,8 @@ func (m *bufferManager) FlushChannel(ctx context.Context, channel string, flushT

// BufferData put data into channel write buffer.
func (m *bufferManager) BufferData(channel string, insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error {
m.mut.RLock()
buf, ok := m.buffers[channel]
m.mut.RUnlock()

if !ok {
buf, loaded := m.buffers.Get(channel)
if !loaded {
log.Ctx(context.Background()).Warn("write buffer not found when buffer data",
zap.String("channel", channel))
return merr.WrapErrChannelNotFound(channel)
Expand All @@ -209,11 +197,8 @@ func (m *bufferManager) BufferData(channel string, insertMsgs []*msgstream.Inser

// GetCheckpoint returns checkpoint for provided channel.
func (m *bufferManager) GetCheckpoint(channel string) (*msgpb.MsgPosition, bool, error) {
m.mut.RLock()
buf, ok := m.buffers[channel]
m.mut.RUnlock()

if !ok {
buf, loaded := m.buffers.Get(channel)
if !loaded {
return nil, false, merr.WrapErrChannelNotFound(channel)
}
cp := buf.GetCheckpoint()
Expand All @@ -223,10 +208,8 @@ func (m *bufferManager) GetCheckpoint(channel string) (*msgpb.MsgPosition, bool,
}

func (m *bufferManager) NotifyCheckpointUpdated(channel string, ts uint64) {
m.mut.Lock()
defer m.mut.Unlock()
buf, ok := m.buffers[channel]
if !ok {
buf, loaded := m.buffers.Get(channel)
if !loaded {
return
}
flushTs := buf.GetFlushTimestamp()
Expand All @@ -239,12 +222,8 @@ func (m *bufferManager) NotifyCheckpointUpdated(channel string, ts uint64) {
// RemoveChannel remove channel WriteBuffer from manager.
// this method discards all buffered data since datanode no longer has the ownership
func (m *bufferManager) RemoveChannel(channel string) {
m.mut.Lock()
buf, ok := m.buffers[channel]
delete(m.buffers, channel)
m.mut.Unlock()

if !ok {
buf, loaded := m.buffers.GetAndRemove(channel)
if !loaded {
log.Warn("failed to remove channel, channel not maintained in manager", zap.String("channel", channel))
return
}
Expand All @@ -255,12 +234,8 @@ func (m *bufferManager) RemoveChannel(channel string) {
// DropChannel removes channel WriteBuffer and process `DropChannel`
// this method will save all buffered data
func (m *bufferManager) DropChannel(channel string) {
m.mut.Lock()
buf, ok := m.buffers[channel]
delete(m.buffers, channel)
m.mut.Unlock()

if !ok {
buf, loaded := m.buffers.GetAndRemove(channel)
if !loaded {
log.Warn("failed to drop channel, channel not maintained in manager", zap.String("channel", channel))
return
}
Expand All @@ -269,11 +244,8 @@ func (m *bufferManager) DropChannel(channel string) {
}

func (m *bufferManager) DropPartitions(channel string, partitionIDs []int64) {
m.mut.RLock()
buf, ok := m.buffers[channel]
m.mut.RUnlock()

if !ok {
buf, loaded := m.buffers.Get(channel)
if !loaded {
log.Warn("failed to drop partition, channel not maintained in manager", zap.String("channel", channel), zap.Int64s("partitionIDs", partitionIDs))
return
}
Expand Down
30 changes: 6 additions & 24 deletions internal/datanode/writebuffer/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,7 @@ func (s *ManagerSuite) TestFlushSegments() {
defer cancel()

wb := NewMockWriteBuffer(s.T())

s.manager.mut.Lock()
s.manager.buffers[s.channelName] = wb
s.manager.mut.Unlock()
s.manager.buffers.Insert(s.channelName, wb)

wb.EXPECT().SealSegments(mock.Anything, mock.Anything).Return(nil)

Expand All @@ -120,10 +117,7 @@ func (s *ManagerSuite) TestBufferData() {
s.Run("normal_buffer_data", func() {
wb := NewMockWriteBuffer(s.T())

s.manager.mut.Lock()
s.manager.buffers[s.channelName] = wb
s.manager.mut.Unlock()

s.manager.buffers.Insert(s.channelName, wb)
wb.EXPECT().BufferData(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)

err := manager.BufferData(s.channelName, nil, nil, nil, nil)
Expand All @@ -141,10 +135,7 @@ func (s *ManagerSuite) TestGetCheckpoint() {
s.Run("normal_checkpoint", func() {
wb := NewMockWriteBuffer(s.T())

manager.mut.Lock()
manager.buffers[s.channelName] = wb
manager.mut.Unlock()

manager.buffers.Insert(s.channelName, wb)
pos := &msgpb.MsgPosition{ChannelName: s.channelName, Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0)}
wb.EXPECT().GetCheckpoint().Return(pos)
wb.EXPECT().GetFlushTimestamp().Return(nonFlushTS)
Expand All @@ -157,10 +148,7 @@ func (s *ManagerSuite) TestGetCheckpoint() {
s.Run("checkpoint_need_update", func() {
wb := NewMockWriteBuffer(s.T())

manager.mut.Lock()
manager.buffers[s.channelName] = wb
manager.mut.Unlock()

manager.buffers.Insert(s.channelName, wb)
cpTimestamp := tsoutil.ComposeTSByTime(time.Now(), 0)

pos := &msgpb.MsgPosition{ChannelName: s.channelName, Timestamp: cpTimestamp}
Expand Down Expand Up @@ -207,10 +195,7 @@ func (s *ManagerSuite) TestDropPartitions() {
wb := NewMockWriteBuffer(s.T())
wb.EXPECT().DropPartitions(mock.Anything).Return()

manager.mut.Lock()
manager.buffers[s.channelName] = wb
manager.mut.Unlock()

manager.buffers.Insert(s.channelName, wb)
manager.DropPartitions(s.channelName, []int64{1})
})
}
Expand Down Expand Up @@ -248,10 +233,7 @@ func (s *ManagerSuite) TestMemoryCheck() {
}
flag.Store(true)
}).Return()
manager.mut.Lock()
manager.buffers[s.channelName] = wb
manager.mut.Unlock()

manager.buffers.Insert(s.channelName, wb)
manager.Start()
defer manager.Stop()

Expand Down
28 changes: 12 additions & 16 deletions internal/datanode/writebuffer/write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,38 +65,35 @@ type checkpointCandidate struct {
}

type checkpointCandidates struct {
candidates map[string]*checkpointCandidate
mu sync.RWMutex
candidates *typeutil.ConcurrentMap[string, *checkpointCandidate]
}

func getCandidatesKey(segmentID int64, timestamp uint64) string {
return fmt.Sprintf("%d-%d", segmentID, timestamp)
}

func newCheckpointCandiates() *checkpointCandidates {
return &checkpointCandidates{
candidates: make(map[string]*checkpointCandidate),
candidates: typeutil.NewConcurrentMap[string, *checkpointCandidate](), // segmentID-ts
}
}

func (c *checkpointCandidates) Remove(segmentID int64, timestamp uint64) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.candidates, fmt.Sprintf("%d-%d", segmentID, timestamp))
c.candidates.Remove(getCandidatesKey(segmentID, timestamp))
}

func (c *checkpointCandidates) Add(segmentID int64, position *msgpb.MsgPosition, source string) {
c.mu.Lock()
defer c.mu.Unlock()
c.candidates[fmt.Sprintf("%d-%d", segmentID, position.GetTimestamp())] = &checkpointCandidate{segmentID, position, source}
c.candidates.Insert(getCandidatesKey(segmentID, position.GetTimestamp()), &checkpointCandidate{segmentID, position, source})
}

func (c *checkpointCandidates) GetEarliestWithDefault(def *checkpointCandidate) *checkpointCandidate {
c.mu.RLock()
defer c.mu.RUnlock()

var result *checkpointCandidate = def
for _, candidate := range c.candidates {
c.candidates.Range(func(_ string, candidate *checkpointCandidate) bool {
if result == nil || candidate.position.GetTimestamp() < result.position.GetTimestamp() {
result = candidate
}
}
return true
})
return result
}

Expand All @@ -111,8 +108,6 @@ func NewWriteBuffer(channel string, metacache metacache.MetaCache, storageV2Cach

// writeBufferBase is the common component for buffering data
type writeBufferBase struct {
mut sync.RWMutex

collectionID int64
channelName string

Expand All @@ -123,6 +118,7 @@ type writeBufferBase struct {
estSizePerRecord int
metaCache metacache.MetaCache

mut sync.RWMutex
buffers map[int64]*segmentBuffer // segmentID => segmentBuffer

syncPolicies []SyncPolicy
Expand Down
Loading