Skip to content

Commit

Permalink
enhance: Add split_cluster_writer to support vshard/clustering compactor
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <anyang.wang@zilliz.com>
  • Loading branch information
wayblink committed Oct 17, 2024
1 parent 4d08eec commit fb5145c
Show file tree
Hide file tree
Showing 10 changed files with 793 additions and 31 deletions.
13 changes: 1 addition & 12 deletions internal/datanode/compaction/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,17 +526,6 @@ func (t *clusteringCompactionTask) mappingSegment(
remained int64 = 0
)

isDeletedValue := func(v *storage.Value) bool {
ts, ok := delta[v.PK.GetValue()]
// insert task and delete task has the same ts when upsert
// here should be < instead of <=
// to avoid the upsert data to be deleted after compact
if ok && uint64(v.Timestamp) < ts {
return true
}
return false
}

mappingStats := &clusteringpb.ClusteringCentroidIdMappingStats{}
if t.isVectorClusteringKey {
offSetPath := t.segmentIDOffsetMapping[segment.SegmentID]
Expand Down Expand Up @@ -603,7 +592,7 @@ func (t *clusteringCompactionTask) mappingSegment(
offset++

// Filtering deleted entity
if isDeletedValue(v) {
if isDeletedEntity(v, delta) {
deleted++
continue
}
Expand Down
11 changes: 11 additions & 0 deletions internal/datanode/compaction/compactor_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ func isExpiredEntity(ttl int64, now, ts typeutil.Timestamp) bool {
return expireTime.Before(pnow)
}

func isDeletedEntity(v *storage.Value, delta map[interface{}]typeutil.Timestamp) bool {
ts, ok := delta[v.PK.GetValue()]
// insert task and delete task has the same ts when upsert
// here should be < instead of <=
// to avoid the upsert data to be deleted after compact
if ok && uint64(v.Timestamp) < ts {
return true
}
return false
}

func mergeDeltalogs(ctx context.Context, io io.BinlogIO, dpaths map[typeutil.UniqueID][]string) (map[interface{}]typeutil.Timestamp, error) {
pk2ts := make(map[interface{}]typeutil.Timestamp)

Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/compaction/merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func mergeSortMultipleSegments(ctx context.Context,
segIDAlloc := allocator.NewLocalAllocator(plan.GetPreAllocatedSegmentIDs().GetBegin(), plan.GetPreAllocatedSegmentIDs().GetEnd())
logIDAlloc := allocator.NewLocalAllocator(plan.GetBeginLogID(), math.MaxInt64)
compAlloc := NewCompactionAllocator(segIDAlloc, logIDAlloc)
mWriter := NewMultiSegmentWriter(binlogIO, compAlloc, plan, maxRows, partitionID, collectionID, bm25FieldIds)
mWriter := NewMultiSegmentWriter(binlogIO, compAlloc, plan.GetSchema(), plan.GetChannel(), plan.GetMaxSize(), maxRows, partitionID, collectionID, bm25FieldIds, false)

var (
expiredRowCount int64 // the number of expired entities
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/compaction/mix_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (t *mixCompactionTask) mergeSplit(
segIDAlloc := allocator.NewLocalAllocator(t.plan.GetPreAllocatedSegmentIDs().GetBegin(), t.plan.GetPreAllocatedSegmentIDs().GetEnd())
logIDAlloc := allocator.NewLocalAllocator(t.plan.GetBeginLogID(), math.MaxInt64)
compAlloc := NewCompactionAllocator(segIDAlloc, logIDAlloc)
mWriter := NewMultiSegmentWriter(t.binlogIO, compAlloc, t.plan, t.maxRows, t.partitionID, t.collectionID, t.bm25FieldIDs)
mWriter := NewMultiSegmentWriter(t.binlogIO, compAlloc, t.plan.GetSchema(), t.plan.GetChannel(), t.plan.GetMaxSize(), t.maxRows, t.partitionID, t.collectionID, t.bm25FieldIDs, false)

deletedRowCount := int64(0)
expiredRowCount := int64(0)
Expand Down
91 changes: 75 additions & 16 deletions internal/datanode/compaction/segment_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"fmt"
"math"
"sync"

"github.com/samber/lo"
"go.uber.org/atomic"
Expand All @@ -25,7 +26,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// Not concurrent safe.
// concurrent safe
type MultiSegmentWriter struct {
binlogIO io.BinlogIO
allocator *compactionAlloactor
Expand All @@ -48,6 +49,12 @@ type MultiSegmentWriter struct {
// segID -> fieldID -> binlogs

res []*datapb.CompactionSegment

supportConcurrent bool
writeLock sync.Mutex

rowCount *atomic.Int64

// DONOT leave it empty of all segments are deleted, just return a segment with zero meta for datacoord
bm25Fields []int64
}
Expand All @@ -72,7 +79,8 @@ func (alloc *compactionAlloactor) getLogIDAllocator() allocator.Interface {
return alloc.logIDAlloc
}

func NewMultiSegmentWriter(binlogIO io.BinlogIO, allocator *compactionAlloactor, plan *datapb.CompactionPlan, maxRows int64, partitionID, collectionID int64, bm25Fields []int64) *MultiSegmentWriter {
func NewMultiSegmentWriter(binlogIO io.BinlogIO, allocator *compactionAlloactor, schema *schemapb.CollectionSchema, channel string, segmentSize int64,
maxRows int64, partitionID, collectionID int64, bm25Fields []int64, supportConcurrent bool) *MultiSegmentWriter {
return &MultiSegmentWriter{
binlogIO: binlogIO,
allocator: allocator,
Expand All @@ -81,16 +89,18 @@ func NewMultiSegmentWriter(binlogIO io.BinlogIO, allocator *compactionAlloactor,
current: -1,

maxRows: maxRows, // For bloomfilter only
segmentSize: plan.GetMaxSize(),
segmentSize: segmentSize,

schema: plan.GetSchema(),
schema: schema,
partitionID: partitionID,
collectionID: collectionID,
channel: plan.GetChannel(),
channel: channel,

cachedMeta: make(map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog),
res: make([]*datapb.CompactionSegment, 0),
bm25Fields: bm25Fields,
cachedMeta: make(map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog),
res: make([]*datapb.CompactionSegment, 0),
bm25Fields: bm25Fields,
supportConcurrent: supportConcurrent,
rowCount: atomic.NewInt64(0),
}
}

Expand Down Expand Up @@ -184,6 +194,10 @@ func (w *MultiSegmentWriter) getWriter() (*SegmentWriter, error) {
}

func (w *MultiSegmentWriter) Write(v *storage.Value) error {
if w.supportConcurrent {
w.writeLock.Lock()
defer w.writeLock.Unlock()
}
writer, err := w.getWriter()
if err != nil {
return err
Expand All @@ -194,28 +208,39 @@ func (w *MultiSegmentWriter) Write(v *storage.Value) error {
if _, ok := w.cachedMeta[writer.segmentID]; !ok {
w.cachedMeta[writer.segmentID] = make(map[typeutil.UniqueID]*datapb.FieldBinlog)
}

kvs, partialBinlogs, err := serializeWrite(context.TODO(), w.allocator.getLogIDAllocator(), writer)
err = w.flushBinlog(writer)
if err != nil {
return err
}
}

if err := w.binlogIO.Upload(context.TODO(), kvs); err != nil {
return err
}
err = writer.Write(v)
if err != nil {
return err
}
w.rowCount.Inc()
return nil
}

mergeFieldBinlogs(w.cachedMeta[writer.segmentID], partialBinlogs)
func (w *MultiSegmentWriter) flushBinlog(writer *SegmentWriter) error {
kvs, partialBinlogs, err := serializeWrite(context.TODO(), w.allocator.getLogIDAllocator(), writer)
if err != nil {
return err
}

return writer.Write(v)
if err := w.binlogIO.Upload(context.TODO(), kvs); err != nil {
return err
}

mergeFieldBinlogs(w.cachedMeta[writer.segmentID], partialBinlogs)
return nil
}

func (w *MultiSegmentWriter) appendEmptySegment() error {
writer, err := w.getWriter()
if err != nil {
return err
}

w.res = append(w.res, &datapb.CompactionSegment{
SegmentID: writer.GetSegmentID(),
NumOfRows: 0,
Expand Down Expand Up @@ -243,6 +268,40 @@ func (w *MultiSegmentWriter) Finish() ([]*datapb.CompactionSegment, error) {
return w.res, nil
}

func (w *MultiSegmentWriter) Flush() error {
if w.current == -1 {
return nil
}
if w.supportConcurrent {
w.writeLock.Lock()
defer w.writeLock.Unlock()
}
writer, err := w.getWriter()
if err != nil {
return err
}
// init segment fieldBinlogs if it is not exist
if _, ok := w.cachedMeta[writer.segmentID]; !ok {
w.cachedMeta[writer.segmentID] = make(map[typeutil.UniqueID]*datapb.FieldBinlog)
}
return w.flushBinlog(writer)
}

func (w *MultiSegmentWriter) GetRowNum() int64 {
return w.rowCount.Load()
}

func (w *MultiSegmentWriter) WrittenMemorySize() uint64 {
if w.current == -1 {
return 0
}
writer, err := w.getWriter()
if err != nil {
return 0
}
return writer.WrittenMemorySize()
}

func NewSegmentDeltaWriter(segmentID, partitionID, collectionID int64) *SegmentDeltaWriter {
return &SegmentDeltaWriter{
deleteData: &storage.DeleteData{},
Expand Down
Loading

0 comments on commit fb5145c

Please sign in to comment.