Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Make write buffer memory check do until safe #32172

Merged
merged 1 commit into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 29 additions & 27 deletions internal/datanode/writebuffer/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,38 +94,40 @@ func (m *bufferManager) memoryCheck() {

m.mut.Lock()
defer m.mut.Unlock()
for {
var total int64
var candidate WriteBuffer
var candiSize int64
var candiChan string

var total int64
var candidate WriteBuffer
var candiSize int64
var candiChan string
for chanName, buf := range m.buffers {
size := buf.MemorySize()
total += size
if size > candiSize {
candiSize = size
candidate = buf
candiChan = chanName
toMB := func(mem float64) float64 {
return mem / 1024 / 1024
}
}

toMB := func(mem float64) float64 {
return mem / 1024 / 1024
}
for chanName, buf := range m.buffers {
size := buf.MemorySize()
total += size
if size > candiSize {
candiSize = size
candidate = buf
candiChan = chanName
}
}

totalMemory := hardware.GetMemoryCount()
memoryWatermark := float64(totalMemory) * paramtable.Get().DataNodeCfg.MemoryForceSyncWatermark.GetAsFloat()
if float64(total) < memoryWatermark {
log.RatedDebug(20, "skip force sync because memory level is not high enough",
zap.Float64("current_total_memory_usage", toMB(float64(total))),
zap.Float64("current_memory_watermark", toMB(memoryWatermark)))
return
}
totalMemory := hardware.GetMemoryCount()
memoryWatermark := float64(totalMemory) * paramtable.Get().DataNodeCfg.MemoryForceSyncWatermark.GetAsFloat()
if float64(total) < memoryWatermark {
log.RatedDebug(20, "skip force sync because memory level is not high enough",
zap.Float64("current_total_memory_usage", toMB(float64(total))),
zap.Float64("current_memory_watermark", toMB(memoryWatermark)))
return
}

if candidate != nil {
candidate.EvictBuffer(GetOldestBufferPolicy(paramtable.Get().DataNodeCfg.MemoryForceSyncSegmentNum.GetAsInt()))
log.Info("notify writebuffer to sync",
zap.String("channel", candiChan), zap.Float64("bufferSize(MB)", toMB(float64(candiSize))))
if candidate != nil {
candidate.EvictBuffer(GetOldestBufferPolicy(paramtable.Get().DataNodeCfg.MemoryForceSyncSegmentNum.GetAsInt()))
log.Info("notify writebuffer to sync",
zap.String("channel", candiChan), zap.Float64("bufferSize(MB)", toMB(float64(candiSize))))
}
}
}

Expand Down
11 changes: 10 additions & 1 deletion internal/datanode/writebuffer/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
Expand Down Expand Up @@ -209,14 +210,22 @@ func (s *ManagerSuite) TestMemoryCheck() {

wb := NewMockWriteBuffer(s.T())

flag := atomic.NewBool(false)
memoryLimit := hardware.GetMemoryCount()
signal := make(chan struct{}, 1)
wb.EXPECT().MemorySize().Return(int64(float64(memoryLimit) * 0.6))
wb.EXPECT().MemorySize().RunAndReturn(func() int64 {
if flag.Load() {
return int64(float64(memoryLimit) * 0.4)
}
return int64(float64(memoryLimit) * 0.6)
})
//.Return(int64(float64(memoryLimit) * 0.6))
wb.EXPECT().EvictBuffer(mock.Anything).Run(func(polices ...SyncPolicy) {
select {
case signal <- struct{}{}:
default:
}
flag.Store(true)
}).Return()
manager.mut.Lock()
manager.buffers[s.channelName] = wb
Expand Down
10 changes: 6 additions & 4 deletions internal/datanode/writebuffer/write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (wb *writeBufferBase) EvictBuffer(policies ...SyncPolicy) {
segmentIDs := wb.getSegmentsToSync(ts, policies...)
if len(segmentIDs) > 0 {
log.Info("evict buffer find segments to sync", zap.Int64s("segmentIDs", segmentIDs))
wb.syncSegments(context.Background(), segmentIDs)
conc.AwaitAll(wb.syncSegments(context.Background(), segmentIDs)...)
}
}

Expand Down Expand Up @@ -266,6 +266,7 @@ func (wb *writeBufferBase) triggerSync() (segmentIDs []int64) {
segmentsToSync := wb.getSegmentsToSync(wb.checkpoint.GetTimestamp(), wb.syncPolicies...)
if len(segmentsToSync) > 0 {
log.Info("write buffer get segments to sync", zap.Int64s("segmentIDs", segmentsToSync))
// ignore future here, use callback to handle error
wb.syncSegments(context.Background(), segmentsToSync)
}

Expand Down Expand Up @@ -296,8 +297,9 @@ func (wb *writeBufferBase) sealSegments(ctx context.Context, segmentIDs []int64)
return nil
}

func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) {
func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) []*conc.Future[error] {
log := log.Ctx(ctx)
result := make([]*conc.Future[error], 0, len(segmentIDs))
for _, segmentID := range segmentIDs {
syncTask, err := wb.getSyncTask(ctx, segmentID)
if err != nil {
Expand All @@ -309,9 +311,9 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64)
}
}

// discard Future here, handle error in callback
_ = wb.syncMgr.SyncData(ctx, syncTask)
result = append(result, wb.syncMgr.SyncData(ctx, syncTask))
}
return result
}

// getSegmentsToSync applies all policies to get segments list to sync.
Expand Down
5 changes: 4 additions & 1 deletion internal/datanode/writebuffer/write_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
Expand Down Expand Up @@ -356,7 +357,9 @@ func (s *WriteBufferSuite) TestEvictBuffer() {
s.metacache.EXPECT().GetSegmentByID(int64(2)).Return(segment, true)
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
serializer.EXPECT().EncodeBuffer(mock.Anything, mock.Anything).Return(syncmgr.NewSyncTask(), nil)
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(nil)
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).Return(conc.Go[error](func() (error, error) {
return nil, nil
}))
defer func() {
s.wb.mut.Lock()
defer s.wb.mut.Unlock()
Expand Down
Loading