Skip to content

Commit

Permalink
clustering compactor use split_cluster_writer
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <anyang.wang@zilliz.com>
  • Loading branch information
wayblink committed Oct 22, 2024
1 parent 31b08d7 commit 6780e52
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 33 deletions.
2 changes: 1 addition & 1 deletion internal/datacoord/compaction_trigger_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C
Begin: start,
End: end,
},
MaxSize: getExpandedSize(expectedSize),
MaxSize: getExpandedSize(expectedSegmentSize),
}
err = m.compactionHandler.enqueueCompaction(task)
if err != nil {
Expand Down
46 changes: 32 additions & 14 deletions internal/datanode/compaction/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,28 +402,43 @@ func (t *clusteringCompactionTask) getVectorAnalyzeResult(ctx context.Context) e
zap.Int("centroidNum", len(centroids.GetCentroids())),
zap.Any("offsetMappingFiles", t.segmentIDOffsetMapping))

splitKeys := make([]string, 0)
splitFieldStats := make([]*storage.FieldStats, 0)
for id, centroid := range centroids.GetCentroids() {
fieldStats, err := storage.NewFieldStats(t.clusteringKeyField.FieldID, t.clusteringKeyField.DataType, 0)
if err != nil {
return err
}
fieldStats.SetVectorCentroids(storage.NewVectorFieldValue(t.clusteringKeyField.DataType, centroid))
clusterBuffer := &ClusterBuffer{
id: id,
flushedRowNum: map[typeutil.UniqueID]atomic.Int64{},
flushedBinlogs: make(map[typeutil.UniqueID]map[typeutil.UniqueID]*datapb.FieldBinlog, 0),
uploadedSegments: make([]*datapb.CompactionSegment, 0),
uploadedSegmentStats: make(map[typeutil.UniqueID]storage.SegmentStats, 0),
clusteringKeyFieldStats: fieldStats,
}
if _, err = t.refreshBufferWriterWithPack(clusterBuffer); err != nil {
return err
}
t.clusterBuffers = append(t.clusterBuffers, clusterBuffer)
splitFieldStats = append(splitFieldStats, fieldStats)
splitKeys = append(splitKeys, string(id))
}

t.offsetToBufferFunc = func(offset int64, idMapping []uint32) *ClusterBuffer {
return t.clusterBuffers[idMapping[offset]]
}

splitWriter, err := NewSplitClusterWriterBuilder().
SetCollectionID(t.GetCollection()).
SetPartitionID(t.partitionID).
SetSchema(t.plan.GetSchema()).
SetSegmentMaxSize(t.plan.GetMaxSize()).
SetSegmentMaxRowCount(t.plan.GetTotalRows()).
SetSplitKeys(splitKeys).
SetAllocator(&compactionAlloactor{
segmentAlloc: t.segIDAlloc,
logIDAlloc: t.logIDAlloc,
}).
SetBinlogIO(t.binlogIO).
SetMemoryBufferSize(math.MaxInt64).
SetWorkerPoolSize(1).
Build()
t.writer = splitWriter
log.Info("create split cluster writer",
zap.Int("splitKeys", len(splitKeys)),
zap.Int64("segmentMaxSize", t.plan.GetMaxSize()),
zap.Int64("segmentMaxRowCount", t.plan.GetTotalRows()))

return nil
}

Expand Down Expand Up @@ -480,7 +495,6 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context,
return nil, nil, err
}
for _, seg := range segments {
log.Info("wayblink", zap.String("seg", seg.String()), zap.Int("insertLogNum", len(seg.InsertLogs)))
resultSegments = append(resultSegments, seg)
segmentStats := storage.SegmentStats{
FieldStats: []storage.FieldStats{
Expand Down Expand Up @@ -610,7 +624,11 @@ func (t *clusteringCompactionTask) mappingSegment(
continue
}

err = t.writer.Write(v)
if t.isVectorClusteringKey {
err = t.writer.WriteToKey(v, string(mappingStats.GetCentroidIdMapping()[offset]))
} else {
err = t.writer.Write(v)
}
if err != nil {
log.Error("write fail", zap.Error(err))
return err
Expand Down
41 changes: 24 additions & 17 deletions internal/datanode/compaction/split_cluster_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type SplitClusterWriter struct {
flushMutex sync.Mutex
}

func (c *SplitClusterWriter) Write(value *storage.Value) error {
func (c *SplitClusterWriter) Write(value *storage.Value, segmentID ...int64) error {
clusterKey, err := c.mappingFunc(value)
if err != nil {
return err
Expand All @@ -72,6 +72,19 @@ func (c *SplitClusterWriter) Write(value *storage.Value) error {
return nil
}

func (c *SplitClusterWriter) WriteToKey(value *storage.Value, clusterKey string) error {
_, exist := c.clusterWriters[clusterKey]
if !exist {
return errors.New(fmt.Sprintf("cluster key=%s not exist", clusterKey))
}
err := c.clusterWriters[clusterKey].Write(value)
if err != nil {
return err
}
c.writtenRowNum.Inc()
return nil
}

func (c *SplitClusterWriter) Finish() (map[string][]*datapb.CompactionSegment, error) {
resultSegments := make(map[string][]*datapb.CompactionSegment, 0)
for id, writer := range c.clusterWriters {
Expand All @@ -89,37 +102,31 @@ func (c *SplitClusterWriter) GetRowNum() int64 {
return c.writtenRowNum.Load()
}

func (c *SplitClusterWriter) FlushLargest() error {
// only one flushLargest or flushAll should do at the same time
getLock := c.flushMutex.TryLock()
if !getLock {
return nil
}
func (c *SplitClusterWriter) FlushToLowWaterMark() error {
c.flushMutex.Lock()
defer c.flushMutex.Unlock()
currentMemorySize := c.getTotalUsedMemorySize()
if currentMemorySize <= c.getMemoryBufferLowWatermark() {
log.Info("memory low water mark", zap.Int64("memoryBufferSize", c.getTotalUsedMemorySize()))
log.Info("memory is under low water mark", zap.Int64("memoryBufferSize", c.getTotalUsedMemorySize()))
return nil
}
bufferIDs := make([]string, 0)
clusterKeys := make([]string, 0)
bufferRowNums := make([]int64, 0)
for id, writer := range c.clusterWriters {
bufferIDs = append(bufferIDs, id)
// c.clusterLocks.RLock(id)
clusterKeys = append(clusterKeys, id)
bufferRowNums = append(bufferRowNums, writer.GetRowNum())
// c.clusterLocks.RUnlock(id)
}
sort.Slice(bufferIDs, func(i, j int) bool {
sort.Slice(clusterKeys, func(i, j int) bool {
return bufferRowNums[i] > bufferRowNums[j]
})
log.Info("start flushLargestBuffers", zap.Strings("bufferIDs", bufferIDs), zap.Int64("currentMemorySize", currentMemorySize))

log.Info("start flush largest buffers", zap.Strings("clusterKeys", clusterKeys), zap.Int64("currentMemorySize", currentMemorySize))
futures := make([]*conc.Future[any], 0)
for _, bufferId := range bufferIDs {
writer := c.clusterWriters[bufferId]
for _, clusterKey := range clusterKeys {
writer := c.clusterWriters[clusterKey]
log.Info("currentMemorySize after flush writer binlog",
zap.Int64("currentMemorySize", currentMemorySize),
zap.String("bufferID", bufferId),
zap.String("clusterKey", clusterKey),
zap.Uint64("writtenMemorySize", writer.WrittenMemorySize()),
zap.Int64("RowNum", writer.GetRowNum()))
future := c.flushPool.Submit(func() (any, error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/compaction/split_cluster_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (s *SplitClusterWriterSuite) TestConcurrentSplitByHash() {
for i := j + int64(0); i < j+1000; i++ {
err := splitWriter.Write(generateInt64PKEntitiy(i))
if i == j+100 {
splitWriter.FlushLargest()
splitWriter.FlushToLowWaterMark()
}
s.NoError(err)
}
Expand Down

0 comments on commit 6780e52

Please sign in to comment.