Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use writebuffer refactory With level zero delta policy #28336

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Fix LevelZero segment sync logic
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia authored and XuanYang-cn committed Nov 16, 2023
commit a0d643b2f9e6ca567f4d17868cb4e7e21ab21604
7 changes: 7 additions & 0 deletions internal/datanode/metacache/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand Down Expand Up @@ -57,6 +58,12 @@
}
}

func WithLevel(level datapb.SegmentLevel) SegmentFilter {
return func(info *SegmentInfo) bool {
return info.level == level
}

Check warning on line 64 in internal/datanode/metacache/actions.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/metacache/actions.go#L61-L64

Added lines #L61 - L64 were not covered by tests
}

type SegmentAction func(info *SegmentInfo)

func UpdateState(state commonpb.SegmentState) SegmentAction {
Expand Down
10 changes: 10 additions & 0 deletions internal/datanode/metacache/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type SegmentInfo struct {
bfs *BloomFilterSet
compactTo int64
importing bool
level datapb.SegmentLevel
}

func (s *SegmentInfo) SegmentID() int64 {
Expand Down Expand Up @@ -81,6 +82,10 @@ func (s *SegmentInfo) GetBloomFilterSet() *BloomFilterSet {
return s.bfs
}

func (s *SegmentInfo) Level() datapb.SegmentLevel {
return s.level
}

func (s *SegmentInfo) Clone() *SegmentInfo {
return &SegmentInfo{
segmentID: s.segmentID,
Expand All @@ -98,6 +103,10 @@ func (s *SegmentInfo) Clone() *SegmentInfo {
}

func NewSegmentInfo(info *datapb.SegmentInfo, bfs *BloomFilterSet) *SegmentInfo {
level := info.GetLevel()
if level == datapb.SegmentLevel_Legacy {
level = datapb.SegmentLevel_L1
}
return &SegmentInfo{
segmentID: info.GetID(),
partitionID: info.GetPartitionID(),
Expand All @@ -106,6 +115,7 @@ func NewSegmentInfo(info *datapb.SegmentInfo, bfs *BloomFilterSet) *SegmentInfo
startPosition: info.GetStartPosition(),
checkpoint: info.GetDmlPosition(),
startPosRecorded: true,
level: level,
bfs: bfs,
}
}
1 change: 1 addition & 0 deletions internal/datanode/syncmgr/meta_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func (b *brokerMetaWriter) UpdateSync(pack *SyncTask) error {
Flushed: pack.isFlush,
Dropped: pack.isDrop,
Channel: pack.channelName,
SegLevel: pack.level,
}
err := retry.Do(context.Background(), func() error {
err := b.broker.SaveBinlogPaths(context.Background(), req)
Expand Down
5 changes: 5 additions & 0 deletions internal/datanode/syncmgr/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,8 @@
t.batchSize = batchSize
return t
}

func (t *SyncTask) WithLevel(level datapb.SegmentLevel) *SyncTask {
t.level = level
return t

Check warning on line 120 in internal/datanode/syncmgr/options.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/syncmgr/options.go#L118-L120

Added lines #L118 - L120 were not covered by tests
}
1 change: 1 addition & 0 deletions internal/datanode/syncmgr/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type SyncTask struct {
// batchSize is the row number of this sync task,
// not the total num of rows of segemnt
batchSize int64
level datapb.SegmentLevel

tsFrom typeutil.Timestamp
tsTo typeutil.Timestamp
Expand Down
3 changes: 2 additions & 1 deletion internal/datanode/writebuffer/bf_write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,6 @@ func (wb *bfWriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsg
// update buffer last checkpoint
wb.checkpoint = endPos

return wb.triggerSync()
_, err = wb.triggerSync()
return err
}
33 changes: 29 additions & 4 deletions internal/datanode/writebuffer/l0_write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
Expand All @@ -20,7 +22,8 @@
type l0WriteBuffer struct {
*writeBufferBase

l0Segments map[int64]int64 // partitionID => l0 segment ID
l0Segments map[int64]int64 // partitionID => l0 segment ID
l0partition map[int64]int64 // l0 segment id => partition id

syncMgr syncmgr.SyncManager
idAllocator allocator.Interface
Expand All @@ -32,6 +35,7 @@
}
return &l0WriteBuffer{
l0Segments: make(map[int64]int64),
l0partition: make(map[int64]int64),
writeBufferBase: newWriteBufferBase(channel, sch, metacache, syncMgr, option),
syncMgr: syncMgr,
idAllocator: option.idAllocator,
Expand All @@ -50,7 +54,7 @@
}

for _, msg := range deleteMsgs {
l0SegmentID := wb.getL0SegmentID(msg.GetPartitionID())
l0SegmentID := wb.getL0SegmentID(msg.GetPartitionID(), startPos)
pks := storage.ParseIDs2PrimaryKeys(msg.GetPrimaryKeys())
err := wb.bufferDelete(l0SegmentID, pks, msg.GetTimestamps(), startPos, endPos)
if err != nil {
Expand All @@ -62,10 +66,21 @@
// update buffer last checkpoint
wb.checkpoint = endPos

return wb.triggerSync()
segmentsSync, err := wb.triggerSync()
if err != nil {
return err
}

Check warning on line 72 in internal/datanode/writebuffer/l0_write_buffer.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/writebuffer/l0_write_buffer.go#L71-L72

Added lines #L71 - L72 were not covered by tests
for _, segment := range segmentsSync {
partition, ok := wb.l0partition[segment]
if ok {
delete(wb.l0partition, segment)
delete(wb.l0Segments, partition)
}

Check warning on line 78 in internal/datanode/writebuffer/l0_write_buffer.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/writebuffer/l0_write_buffer.go#L76-L78

Added lines #L76 - L78 were not covered by tests
}
return nil
}

func (wb *l0WriteBuffer) getL0SegmentID(partitionID int64) int64 {
func (wb *l0WriteBuffer) getL0SegmentID(partitionID int64, startPos *msgpb.MsgPosition) int64 {
segmentID, ok := wb.l0Segments[partitionID]
if !ok {
err := retry.Do(context.Background(), func() error {
Expand All @@ -78,6 +93,16 @@
panic(err)
}
wb.l0Segments[partitionID] = segmentID
wb.l0partition[segmentID] = partitionID
wb.metaCache.AddSegment(&datapb.SegmentInfo{
ID: segmentID,
PartitionID: partitionID,
CollectionID: wb.collectionID,
InsertChannel: wb.channelName,
StartPosition: startPos,
State: commonpb.SegmentState_Growing,
Level: datapb.SegmentLevel_L0,
}, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSet() })
}
return segmentID
}
9 changes: 6 additions & 3 deletions internal/datanode/writebuffer/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/pkg/common"
Expand All @@ -25,6 +26,7 @@ type ManagerSuite struct {
collSchema *schemapb.CollectionSchema
syncMgr *syncmgr.MockSyncManager
metacache *metacache.MockMetaCache
allocator *allocator.MockAllocator

manager *bufferManager
}
Expand Down Expand Up @@ -52,6 +54,7 @@ func (s *ManagerSuite) SetupSuite() {
func (s *ManagerSuite) SetupTest() {
s.syncMgr = syncmgr.NewMockSyncManager(s.T())
s.metacache = metacache.NewMockMetaCache(s.T())
s.allocator = allocator.NewMockAllocator(s.T())

mgr := NewManager(s.syncMgr)
var ok bool
Expand All @@ -62,10 +65,10 @@ func (s *ManagerSuite) SetupTest() {
func (s *ManagerSuite) TestRegister() {
manager := s.manager

err := manager.Register(s.channelName, s.collSchema, s.metacache)
err := manager.Register(s.channelName, s.collSchema, s.metacache, WithIDAllocator(s.allocator))
s.NoError(err)

err = manager.Register(s.channelName, s.collSchema, s.metacache)
err = manager.Register(s.channelName, s.collSchema, s.metacache, WithIDAllocator(s.allocator))
s.Error(err)
s.ErrorIs(err, merr.ErrChannelReduplicate)
}
Expand Down Expand Up @@ -169,7 +172,7 @@ func (s *ManagerSuite) TestRemoveChannel() {
})

s.Run("remove_channel", func() {
err := manager.Register(s.channelName, s.collSchema, s.metacache)
err := manager.Register(s.channelName, s.collSchema, s.metacache, WithIDAllocator(s.allocator))
s.Require().NoError(err)

s.NotPanics(func() {
Expand Down
5 changes: 4 additions & 1 deletion internal/datanode/writebuffer/sync_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand Down Expand Up @@ -48,7 +49,9 @@ func GetFlushTsPolicy(flushTimestamp *atomic.Uint64, meta metacache.MetaCache) S
if !ok {
return buf.segmentID, false
}
return buf.segmentID, seg.State() == commonpb.SegmentState_Flushed && buf.MinTimestamp() < flushTs
inRange := seg.State() == commonpb.SegmentState_Flushed ||
seg.Level() == datapb.SegmentLevel_L0
return buf.segmentID, inRange && buf.MinTimestamp() < flushTs
})
// set segment flushing
meta.UpdateSegments(metacache.UpdateState(commonpb.SegmentState_Flushing),
Expand Down
6 changes: 3 additions & 3 deletions internal/datanode/writebuffer/write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,18 @@
return checkpoint
}

func (wb *writeBufferBase) triggerSync() error {
func (wb *writeBufferBase) triggerSync() (segmentIDs []int64, err error) {
segmentsToSync := wb.getSegmentsToSync(wb.checkpoint.GetTimestamp())
if len(segmentsToSync) > 0 {
log.Info("write buffer get segments to sync", zap.Int64s("segmentIDs", segmentsToSync))
err := wb.syncSegments(context.Background(), segmentsToSync)
if err != nil {
log.Warn("segment segments failed", zap.Int64s("segmentIDs", segmentsToSync), zap.Error(err))
return err
return segmentsToSync, err

Check warning on line 153 in internal/datanode/writebuffer/write_buffer.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/writebuffer/write_buffer.go#L153

Added line #L153 was not covered by tests
}
}

return nil
return segmentsToSync, nil
}

func (wb *writeBufferBase) flushSegments(ctx context.Context, segmentIDs []int64) error {
Expand Down
Loading