From 02bea9402f2c7a58037e188acdb3e87e3266d995 Mon Sep 17 00:00:00 2001 From: Ted Xu Date: Thu, 25 Apr 2024 17:04:09 +0800 Subject: [PATCH] Enhance: enable stream writer in compactions Signed-off-by: Ted Xu --- internal/datanode/binlog_io.go | 21 ++-- internal/datanode/binlog_io_test.go | 51 +++++---- internal/datanode/compactor.go | 157 ++++++++++++--------------- internal/datanode/compactor_test.go | 162 +++++++++++++++++++++------- internal/datanode/io/binlog_io.go | 2 +- internal/storage/serde.go | 13 ++- 6 files changed, 237 insertions(+), 169 deletions(-) diff --git a/internal/datanode/binlog_io.go b/internal/datanode/binlog_io.go index a351f0f260300..1ea26729a7a29 100644 --- a/internal/datanode/binlog_io.go +++ b/internal/datanode/binlog_io.go @@ -80,22 +80,18 @@ func genDeltaBlobs(b io.BinlogIO, allocator allocator.Allocator, data *DeleteDat } // genInsertBlobs returns insert-paths and save blob to kvs -func genInsertBlobs(b io.BinlogIO, allocator allocator.Allocator, data *InsertData, collectionID, partID, segID UniqueID, iCodec *storage.InsertCodec, kvs map[string][]byte) (map[UniqueID]*datapb.FieldBinlog, error) { - inlogs, err := iCodec.Serialize(partID, segID, data) - if err != nil { - return nil, err - } - +func genInsertBlobs(b io.BinlogIO, allocator allocator.Allocator, data []*Blob, collectionID, partID, segID UniqueID, kvs map[string][]byte, +) (map[UniqueID]*datapb.FieldBinlog, error) { inpaths := make(map[UniqueID]*datapb.FieldBinlog) notifyGenIdx := make(chan struct{}) defer close(notifyGenIdx) - generator, err := allocator.GetGenerator(len(inlogs), notifyGenIdx) + generator, err := allocator.GetGenerator(len(data), notifyGenIdx) if err != nil { return nil, err } - for _, blob := range inlogs { + for _, blob := range data { // Blob Key is generated by Serialize from int64 fieldID in collection schema, which won't raise error in ParseInt fID, _ := strconv.ParseInt(blob.GetKey(), 10, 64) k := metautil.JoinIDPath(collectionID, partID, segID, fID, <-generator) @@ -177,22 +173,21 @@ func uploadInsertLog( collectionID UniqueID, partID UniqueID, segID UniqueID, - iData *InsertData, - iCodec *storage.InsertCodec, + data []*Blob, ) (map[UniqueID]*datapb.FieldBinlog, error) { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "UploadInsertLog") defer span.End() kvs := make(map[string][]byte) - if iData.IsEmpty() { + if len(data) <= 0 || data[0].RowNum <= 0 { log.Warn("binlog io uploading empty insert data", zap.Int64("segmentID", segID), - zap.Int64("collectionID", iCodec.Schema.GetID()), + zap.Int64("collectionID", collectionID), ) return nil, nil } - inpaths, err := genInsertBlobs(b, allocator, iData, collectionID, partID, segID, iCodec, kvs) + inpaths, err := genInsertBlobs(b, allocator, data, collectionID, partID, segID, kvs) if err != nil { return nil, err } diff --git a/internal/datanode/binlog_io_test.go b/internal/datanode/binlog_io_test.go index eea1b18291e81..038978ac0464c 100644 --- a/internal/datanode/binlog_io_test.go +++ b/internal/datanode/binlog_io_test.go @@ -124,21 +124,17 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { f := &MetaFactory{} meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", schemapb.DataType_Int64) - t.Run("empty insert", func(t *testing.T) { - alloc := allocator.NewMockAllocator(t) - binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool()) - iCodec := storage.NewInsertCodecWithSchema(meta) - paths, err := uploadInsertLog(context.Background(), binlogIO, alloc, meta.GetID(), 10, 1, genEmptyInsertData(), iCodec) - assert.NoError(t, err) - assert.Nil(t, paths) - }) - t.Run("gen insert blob failed", func(t *testing.T) { alloc := allocator.NewMockAllocator(t) binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool()) iCodec := storage.NewInsertCodecWithSchema(meta) + var partId int64 = 10 + var segId int64 = 1 + iData := genInsertData(2) + blobs, err := iCodec.Serialize(10, 1, iData) + assert.NoError(t, err) alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(nil, fmt.Errorf("mock err")) - _, err := uploadInsertLog(context.Background(), binlogIO, alloc, meta.GetID(), 10, 1, genInsertData(2), iCodec) + _, err = uploadInsertLog(context.Background(), binlogIO, alloc, meta.GetID(), partId, segId, blobs) assert.Error(t, err) }) @@ -147,13 +143,18 @@ func TestBinlogIOInterfaceMethods(t *testing.T) { alloc := allocator.NewMockAllocator(t) binlogIO := io.NewBinlogIO(mkc, getOrCreateIOPool()) iCodec := storage.NewInsertCodecWithSchema(meta) + var partId int64 = 1 + var segId int64 = 10 + iData := genInsertData(2) + blobs, err := iCodec.Serialize(10, 1, iData) + assert.NoError(t, err) alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil) ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - _, err := uploadInsertLog(ctx, binlogIO, alloc, meta.GetID(), 1, 10, genInsertData(2), iCodec) + _, err = uploadInsertLog(ctx, binlogIO, alloc, meta.GetID(), partId, segId, blobs) assert.Error(t, err) }) }) @@ -256,9 +257,13 @@ func TestBinlogIOInnerMethods(t *testing.T) { t.Run(test.description, func(t *testing.T) { meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", test.pkType) iCodec := storage.NewInsertCodecWithSchema(meta) - + var partId int64 = 10 + var segId int64 = 1 + iData := genInsertData(2) + blobs, err := iCodec.Serialize(10, 1, iData) + assert.NoError(t, err) kvs := make(map[string][]byte) - pin, err := genInsertBlobs(binlogIO, alloc, genInsertData(2), meta.GetID(), 10, 1, iCodec, kvs) + pin, err := genInsertBlobs(binlogIO, alloc, blobs, meta.GetID(), partId, segId, kvs) assert.NoError(t, err) assert.Equal(t, 12, len(pin)) @@ -277,30 +282,22 @@ func TestBinlogIOInnerMethods(t *testing.T) { cm := storage.NewLocalChunkManager(storage.RootPath(binlogTestDir)) defer cm.RemoveWithPrefix(ctx, cm.RootPath()) - t.Run("serialize error", func(t *testing.T) { - iCodec := storage.NewInsertCodecWithSchema(nil) - - alloc := allocator.NewMockAllocator(t) - binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool()) - kvs := make(map[string][]byte) - pin, err := genInsertBlobs(binlogIO, alloc, genEmptyInsertData(), 0, 10, 1, iCodec, kvs) - - assert.Error(t, err) - assert.Empty(t, kvs) - assert.Empty(t, pin) - }) - t.Run("GetGenerator error", func(t *testing.T) { f := &MetaFactory{} meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", schemapb.DataType_Int64) iCodec := storage.NewInsertCodecWithSchema(meta) + var partId int64 = 10 + var segId int64 = 1 + iData := genInsertData(2) + blobs, err := iCodec.Serialize(partId, segId, iData) + assert.NoError(t, err) alloc := allocator.NewMockAllocator(t) alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock GetGenerator error")) binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool()) kvs := make(map[string][]byte) - pin, err := genInsertBlobs(binlogIO, alloc, genInsertData(2), meta.GetID(), 10, 1, iCodec, kvs) + pin, err := genInsertBlobs(binlogIO, alloc, blobs, meta.GetID(), partId, segId, kvs) assert.Error(t, err) assert.Empty(t, kvs) diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 5b11341b730de..8ff2c28f74e82 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -55,8 +55,6 @@ var ( errContext = errors.New("context done or timeout") ) -type iterator = storage.Iterator - type compactor interface { complete() compact() (*datapb.CompactionPlanResult, error) @@ -174,48 +172,15 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob) (map[interf return pk2ts, nil } -func (t *compactionTask) uploadRemainLog( - ctxTimeout context.Context, - targetSegID UniqueID, - partID UniqueID, - meta *etcdpb.CollectionMeta, - stats *storage.PrimaryKeyStats, - totRows int64, - writeBuffer *storage.InsertData, -) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) { - iCodec := storage.NewInsertCodecWithSchema(meta) - inPaths := make(map[int64]*datapb.FieldBinlog, 0) - var err error - if !writeBuffer.IsEmpty() { - inPaths, err = uploadInsertLog(ctxTimeout, t.binlogIO, t.Allocator, meta.GetID(), partID, targetSegID, writeBuffer, iCodec) - if err != nil { - return nil, nil, err - } - } - - statPaths, err := uploadStatsLog(ctxTimeout, t.binlogIO, t.Allocator, meta.GetID(), partID, targetSegID, stats, totRows, iCodec) - if err != nil { - return nil, nil, err +func newBinlogWriter(collectionId, partitionId, segmentId UniqueID, schema *schemapb.CollectionSchema, +) (writer *storage.SerializeWriter[*storage.Value], closers []func() (*Blob, error), err error) { + fieldWriters := storage.NewBinlogStreamWriters(collectionId, partitionId, segmentId, schema.Fields) + closers = make([]func() (*Blob, error), 0, len(fieldWriters)) + for _, w := range fieldWriters { + closers = append(closers, w.Finalize) } - - return inPaths, statPaths, nil -} - -func (t *compactionTask) uploadSingleInsertLog( - ctxTimeout context.Context, - targetSegID UniqueID, - partID UniqueID, - meta *etcdpb.CollectionMeta, - writeBuffer *storage.InsertData, -) (map[UniqueID]*datapb.FieldBinlog, error) { - iCodec := storage.NewInsertCodecWithSchema(meta) - - inPaths, err := uploadInsertLog(ctxTimeout, t.binlogIO, t.Allocator, meta.GetID(), partID, targetSegID, writeBuffer, iCodec) - if err != nil { - return nil, err - } - - return inPaths, nil + writer, err = storage.NewBinlogSerializeWriter(schema, partitionId, segmentId, fieldWriters, 1024) + return } func (t *compactionTask) merge( @@ -231,10 +196,15 @@ func (t *compactionTask) merge( log := log.With(zap.Int64("planID", t.getPlanID())) mergeStart := time.Now() + writer, finalizers, err := newBinlogWriter(meta.GetID(), partID, targetSegID, meta.GetSchema()) + if err != nil { + return nil, nil, 0, err + } + var ( - numBinlogs int // binlog number - numRows int64 // the number of rows uploaded - expired int64 // the number of expired entity + numBinlogs int // binlog number + numRows uint64 // the number of rows uploaded + expired int64 // the number of expired entity insertField2Path = make(map[UniqueID]*datapb.FieldBinlog) insertPaths = make([]*datapb.FieldBinlog, 0) @@ -242,10 +212,6 @@ func (t *compactionTask) merge( statField2Path = make(map[UniqueID]*datapb.FieldBinlog) statPaths = make([]*datapb.FieldBinlog, 0) ) - writeBuffer, err := storage.NewInsertData(meta.GetSchema()) - if err != nil { - return nil, nil, -1, err - } isDeletedValue := func(v *storage.Value) bool { ts, ok := delta[v.PK.GetValue()] @@ -306,7 +272,7 @@ func (t *compactionTask) merge( numRows = 0 numBinlogs = 0 currentTs := t.GetCurrentTime() - currentRows := 0 + unflushedRows := 0 downloadTimeCost := time.Duration(0) uploadInsertTimeCost := time.Duration(0) @@ -325,6 +291,30 @@ func (t *compactionTask) merge( timestampFrom int64 = -1 ) + flush := func() error { + uploadInsertStart := time.Now() + writer.Close() + fieldData := make([]*Blob, len(finalizers)) + + for i, f := range finalizers { + blob, err := f() + if err != nil { + return err + } + fieldData[i] = blob + } + inPaths, err := uploadInsertLog(ctx, t.binlogIO, t.Allocator, meta.ID, partID, targetSegID, fieldData) + if err != nil { + log.Warn("failed to upload single insert log", zap.Error(err)) + return err + } + numBinlogs += len(inPaths) + uploadInsertTimeCost += time.Since(uploadInsertStart) + addInsertFieldPath(inPaths, timestampFrom, timestampTo) + unflushedRows = 0 + return nil + } + for _, path := range unMergedInsertlogs { downloadStart := time.Now() data, err := downloadBlobs(ctx, t.binlogIO, path) @@ -370,55 +360,50 @@ func (t *compactionTask) merge( timestampTo = v.Timestamp } - row, ok := v.Value.(map[UniqueID]interface{}) - if !ok { - log.Warn("transfer interface to map wrong", zap.Strings("path", path)) - return nil, nil, 0, errors.New("unexpected error") - } - - err = writeBuffer.Append(row) + err = writer.Write(v) + numRows++ + unflushedRows++ if err != nil { return nil, nil, 0, err } - currentRows++ stats.Update(v.PK) // check size every 100 rows in case of too many `GetMemorySize` call - if (currentRows+1)%100 == 0 && writeBuffer.GetMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsInt() { - numRows += int64(writeBuffer.GetRowNum()) - uploadInsertStart := time.Now() - inPaths, err := t.uploadSingleInsertLog(ctx, targetSegID, partID, meta, writeBuffer) - if err != nil { - log.Warn("failed to upload single insert log", zap.Error(err)) - return nil, nil, 0, err + if (unflushedRows+1)%100 == 0 { + writer.Flush() // Flush to update memory size + + if writer.WrittenMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64() { + if err := flush(); err != nil { + return nil, nil, 0, err + } + timestampFrom = -1 + timestampTo = -1 + + writer, finalizers, err = newBinlogWriter(meta.ID, targetSegID, partID, meta.Schema) + if err != nil { + return nil, nil, 0, err + } } - uploadInsertTimeCost += time.Since(uploadInsertStart) - addInsertFieldPath(inPaths, timestampFrom, timestampTo) - timestampFrom = -1 - timestampTo = -1 - - writeBuffer, _ = storage.NewInsertData(meta.GetSchema()) - currentRows = 0 - numBinlogs++ } } } - // upload stats log and remain insert rows - if writeBuffer.GetRowNum() > 0 || numRows > 0 { - numRows += int64(writeBuffer.GetRowNum()) - uploadStart := time.Now() - inPaths, statsPaths, err := t.uploadRemainLog(ctx, targetSegID, partID, meta, - stats, numRows+int64(currentRows), writeBuffer) - if err != nil { + // final flush if there is unflushed rows + if unflushedRows > 0 { + if err := flush(); err != nil { return nil, nil, 0, err } + } - uploadInsertTimeCost += time.Since(uploadStart) - addInsertFieldPath(inPaths, timestampFrom, timestampTo) + // upload stats log + if numRows > 0 { + iCodec := storage.NewInsertCodecWithSchema(meta) + statsPaths, err := uploadStatsLog(ctx, t.binlogIO, t.Allocator, meta.GetID(), partID, targetSegID, stats, int64(numRows), iCodec) + if err != nil { + return nil, nil, 0, err + } addStatFieldPath(statsPaths) - numBinlogs += len(inPaths) } for _, path := range insertField2Path { @@ -430,14 +415,14 @@ func (t *compactionTask) merge( } log.Info("compact merge end", - zap.Int64("remaining insert numRows", numRows), + zap.Uint64("remaining insert numRows", numRows), zap.Int64("expired entities", expired), zap.Int("binlog file number", numBinlogs), zap.Duration("download insert log elapse", downloadTimeCost), zap.Duration("upload insert log elapse", uploadInsertTimeCost), zap.Duration("merge elapse", time.Since(mergeStart))) - return insertPaths, statPaths, numRows, nil + return insertPaths, statPaths, int64(numRows), nil } func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) { diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index 331b9594d8e69..46f831702047e 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -21,7 +21,6 @@ import ( "fmt" "math" "testing" - "time" "github.com/cockroachdb/errors" "github.com/samber/lo" @@ -304,8 +303,12 @@ func TestCompactionTaskInnerMethods(t *testing.T) { paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0") iData := genInsertDataWithExpiredTS() iCodec := storage.NewInsertCodecWithSchema(meta) + var partId int64 = 0 + var segmentId int64 = 1 + blobs, err := iCodec.Serialize(partId, 0, iData) + assert.NoError(t, err) var allPaths [][]string - inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec) + inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partId, segmentId, blobs) assert.NoError(t, err) assert.Equal(t, 12, len(inpath)) binlogNum := len(inpath[0].GetBinlogs()) @@ -344,18 +347,22 @@ func TestCompactionTaskInnerMethods(t *testing.T) { }) t.Run("Merge without expiration2", func(t *testing.T) { mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool()) + iData := genInsertDataWithExpiredTS() iCodec := storage.NewInsertCodecWithSchema(meta) + var partId int64 = 0 + var segmentId int64 = 1 + blobs, err := iCodec.Serialize(partId, 0, iData) + assert.NoError(t, err) paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0") BinLogMaxSize := Params.DataNodeCfg.BinLogMaxSize.GetValue() defer func() { Params.Save(Params.DataNodeCfg.BinLogMaxSize.Key, BinLogMaxSize) }() paramtable.Get().Save(Params.DataNodeCfg.BinLogMaxSize.Key, "64") - iData := genInsertDataWithExpiredTS() meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64) var allPaths [][]string - inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec) + inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partId, segmentId, blobs) assert.NoError(t, err) assert.Equal(t, 12, len(inpath)) binlogNum := len(inpath[0].GetBinlogs()) @@ -402,9 +409,13 @@ func TestCompactionTaskInnerMethods(t *testing.T) { }() paramtable.Get().Save(Params.DataNodeCfg.BinLogMaxSize.Key, "1") iData := genInsertData(101) + var partId int64 = 0 + var segmentId int64 = 1 + blobs, err := iCodec.Serialize(partId, segmentId, iData) + assert.NoError(t, err) var allPaths [][]string - inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec) + inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partId, segmentId, blobs) assert.NoError(t, err) assert.Equal(t, 12, len(inpath)) binlogNum := len(inpath[0].GetBinlogs()) @@ -448,10 +459,14 @@ func TestCompactionTaskInnerMethods(t *testing.T) { mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool()) iCodec := storage.NewInsertCodecWithSchema(meta) iData := genInsertDataWithExpiredTS() + var partId int64 = 0 + var segmentId int64 = 1 + blobs, err := iCodec.Serialize(partId, 0, iData) + assert.NoError(t, err) meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64) var allPaths [][]string - inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec) + inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partId, segmentId, blobs) assert.NoError(t, err) assert.Equal(t, 12, len(inpath)) binlogNum := len(inpath[0].GetBinlogs()) @@ -493,6 +508,10 @@ func TestCompactionTaskInnerMethods(t *testing.T) { mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool()) iData := genInsertDataWithExpiredTS() iCodec := storage.NewInsertCodecWithSchema(meta) + var partId int64 = 0 + var segmentId int64 = 1 + blobs, err := iCodec.Serialize(partId, 0, iData) + assert.NoError(t, err) meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64) metaCache := metacache.NewMockMetaCache(t) metaCache.EXPECT().Schema().Return(meta.GetSchema()).Maybe() @@ -507,7 +526,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) { }) var allPaths [][]string - inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec) + inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partId, segmentId, blobs) assert.NoError(t, err) assert.Equal(t, 12, len(inpath)) binlogNum := len(inpath[0].GetBinlogs()) @@ -547,10 +566,14 @@ func TestCompactionTaskInnerMethods(t *testing.T) { iCodec := storage.NewInsertCodecWithSchema(meta) paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0") iData := genInsertDataWithExpiredTS() + var partId int64 = 0 + var segmentId int64 = 1 + blobs, err := iCodec.Serialize(partId, 0, iData) + assert.NoError(t, err) meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64) var allPaths [][]string - inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec) + inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partId, segmentId, blobs) assert.NoError(t, err) assert.Equal(t, 12, len(inpath)) binlogNum := len(inpath[0].GetBinlogs()) @@ -594,10 +617,14 @@ func TestCompactionTaskInnerMethods(t *testing.T) { iCodec := storage.NewInsertCodecWithSchema(meta) paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0") iData := genInsertDataWithExpiredTS() + var partId int64 = 0 + var segmentId int64 = 1 + blobs, err := iCodec.Serialize(partId, 0, iData) + assert.NoError(t, err) meta := NewMetaFactory().GetCollectionMeta(1, "test", schemapb.DataType_Int64) var allPaths [][]string - inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), 0, 1, iData, iCodec) + inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partId, segmentId, blobs) assert.NoError(t, err) assert.Equal(t, 12, len(inpath)) binlogNum := len(inpath[0].GetBinlogs()) @@ -722,32 +749,6 @@ func TestCompactionTaskInnerMethods(t *testing.T) { _, err := ct.getNumRows() assert.Error(t, err, "segment not found") }) - - t.Run("Test uploadRemainLog error", func(t *testing.T) { - f := &MetaFactory{} - - t.Run("upload failed", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() - - alloc := allocator.NewMockAllocator(t) - alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil) - - meta := f.GetCollectionMeta(UniqueID(10001), "test_upload_remain_log", schemapb.DataType_Int64) - stats, err := storage.NewPrimaryKeyStats(106, int64(schemapb.DataType_Int64), 10) - - require.NoError(t, err) - - ct := &compactionTask{ - binlogIO: io.NewBinlogIO(&mockCm{errSave: true}, getOrCreateIOPool()), - Allocator: alloc, - done: make(chan struct{}, 1), - } - - _, _, err = ct.uploadRemainLog(ctx, 1, 2, meta, stats, 10, nil) - assert.Error(t, err) - }) - }) } func getInt64DeltaBlobs(segID UniqueID, pks []UniqueID, tss []Timestamp) ([]*Blob, error) { @@ -906,12 +907,16 @@ func TestCompactorInterfaceMethods(t *testing.T) { }) iData1 := genInsertDataWithPKs(c.pks1, c.pkType) + iblobs1, err := iCodec.Serialize(c.parID, 0, iData1) + assert.NoError(t, err) dData1 := &DeleteData{ Pks: []storage.PrimaryKey{c.pks1[0]}, Tss: []Timestamp{20000}, RowCount: 1, } iData2 := genInsertDataWithPKs(c.pks2, c.pkType) + iblobs2, err := iCodec.Serialize(c.parID, 3, iData2) + assert.NoError(t, err) dData2 := &DeleteData{ Pks: []storage.PrimaryKey{c.pks2[0]}, Tss: []Timestamp{30000}, @@ -920,7 +925,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { stats1, err := storage.NewPrimaryKeyStats(1, int64(c.pkType), 1) require.NoError(t, err) - iPaths1, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), c.parID, c.segID1, iData1, iCodec) + iPaths1, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), c.parID, c.segID1, iblobs1) require.NoError(t, err) sPaths1, err := uploadStatsLog(context.Background(), mockbIO, alloc, meta.GetID(), c.parID, c.segID1, stats1, 2, iCodec) require.NoError(t, err) @@ -930,7 +935,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { stats2, err := storage.NewPrimaryKeyStats(1, int64(c.pkType), 1) require.NoError(t, err) - iPaths2, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), c.parID, c.segID2, iData2, iCodec) + iPaths2, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), c.parID, c.segID2, iblobs2) require.NoError(t, err) sPaths2, err := uploadStatsLog(context.Background(), mockbIO, alloc, meta.GetID(), c.parID, c.segID2, stats2, 2, iCodec) require.NoError(t, err) @@ -1046,7 +1051,11 @@ func TestCompactorInterfaceMethods(t *testing.T) { // the same pk for segmentI and segmentII pks := [2]storage.PrimaryKey{storage.NewInt64PrimaryKey(1), storage.NewInt64PrimaryKey(2)} iData1 := genInsertDataWithPKs(pks, schemapb.DataType_Int64) + iblobs1, err := iCodec.Serialize(partID, 0, iData1) + assert.NoError(t, err) iData2 := genInsertDataWithPKs(pks, schemapb.DataType_Int64) + iblobs2, err := iCodec.Serialize(partID, 1, iData2) + assert.NoError(t, err) pk1 := storage.NewInt64PrimaryKey(1) dData1 := &DeleteData{ @@ -1063,7 +1072,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { stats1, err := storage.NewPrimaryKeyStats(1, int64(schemapb.DataType_Int64), 1) require.NoError(t, err) - iPaths1, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partID, segID1, iData1, iCodec) + iPaths1, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partID, segID1, iblobs1) require.NoError(t, err) sPaths1, err := uploadStatsLog(context.Background(), mockbIO, alloc, meta.GetID(), partID, segID1, stats1, 1, iCodec) require.NoError(t, err) @@ -1073,7 +1082,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { stats2, err := storage.NewPrimaryKeyStats(1, int64(schemapb.DataType_Int64), 1) require.NoError(t, err) - iPaths2, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partID, segID2, iData2, iCodec) + iPaths2, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partID, segID2, iblobs2) require.NoError(t, err) sPaths2, err := uploadStatsLog(context.Background(), mockbIO, alloc, meta.GetID(), partID, segID2, stats2, 1, iCodec) require.NoError(t, err) @@ -1139,3 +1148,78 @@ func TestInjectDone(t *testing.T) { task.injectDone() task.injectDone() } + +func BenchmarkDeserializeReader(b *testing.B) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cm := storage.NewLocalChunkManager(storage.RootPath(compactTestDir)) + defer cm.RemoveWithPrefix(ctx, cm.RootPath()) + + collectionID := int64(1) + meta := NewMetaFactory().GetCollectionMeta(collectionID, "test", schemapb.DataType_Int64) + mockbIO := io.NewBinlogIO(cm, getOrCreateIOPool()) + paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0") + iData := genInsertDataWithExpiredTS() + iCodec := storage.NewInsertCodecWithSchema(meta) + var partId int64 = 0 + var segmentId int64 = 1 + blobs, err := iCodec.Serialize(partId, 0, iData) + assert.NoError(b, err) + var allPaths [][]string + alloc := allocator.NewMockAllocator(b) + alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil) + alloc.EXPECT().AllocOne().Call.Return(int64(19530), nil) + inpath, err := uploadInsertLog(context.Background(), mockbIO, alloc, meta.GetID(), partId, segmentId, blobs) + assert.NoError(b, err) + assert.Equal(b, 12, len(inpath)) + binlogNum := len(inpath[0].GetBinlogs()) + assert.Equal(b, 1, binlogNum) + + for idx := 0; idx < binlogNum; idx++ { + var ps []string + for _, path := range inpath { + ps = append(ps, path.GetBinlogs()[idx].GetLogPath()) + } + allPaths = append(allPaths, ps) + } + + dm := map[interface{}]Timestamp{ + 1: 10000, + } + + metaCache := metacache.NewMockMetaCache(b) + metaCache.EXPECT().Schema().Return(meta.GetSchema()).Maybe() + metaCache.EXPECT().GetSegmentByID(mock.Anything).RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) { + segment := metacache.NewSegmentInfo(&datapb.SegmentInfo{ + CollectionID: 1, + PartitionID: 0, + ID: id, + NumOfRows: 10, + }, nil) + return segment, true + }) + + ct := &compactionTask{ + metaCache: metaCache, + binlogIO: mockbIO, + Allocator: alloc, + done: make(chan struct{}, 1), + plan: &datapb.CompactionPlan{ + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + {SegmentID: 1}, + }, + }, + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + inPaths, statsPaths, numOfRow, err := ct.merge(context.Background(), allPaths, 2, 0, meta, dm) + assert.NoError(b, err) + assert.Equal(b, int64(2), numOfRow) + assert.Equal(b, 1, len(inPaths[0].GetBinlogs())) + assert.Equal(b, 1, len(statsPaths)) + assert.NotEqual(b, -1, inPaths[0].GetBinlogs()[0].GetTimestampFrom()) + assert.NotEqual(b, -1, inPaths[0].GetBinlogs()[0].GetTimestampTo()) + } +} diff --git a/internal/datanode/io/binlog_io.go b/internal/datanode/io/binlog_io.go index b0731d23d5bc1..268653cc2870f 100644 --- a/internal/datanode/io/binlog_io.go +++ b/internal/datanode/io/binlog_io.go @@ -86,7 +86,7 @@ func (b *BinlogIoImpl) Upload(ctx context.Context, kvs map[string][]byte) error ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "Upload") defer span.End() future := b.pool.Submit(func() (any, error) { - log.Debug("BinlogIO uplaod", zap.Strings("paths", lo.Keys(kvs))) + log.Debug("BinlogIO upload", zap.Strings("paths", lo.Keys(kvs))) err := retry.Do(ctx, func() error { return b.MultiWrite(ctx, kvs) }) diff --git a/internal/storage/serde.go b/internal/storage/serde.go index 7a64edf79b653..bd88b83b87ea0 100644 --- a/internal/storage/serde.go +++ b/internal/storage/serde.go @@ -750,6 +750,7 @@ type singleFieldRecordWriter struct { fw *pqarrow.FileWriter fieldId FieldID + numRows int grouped bool } @@ -758,6 +759,8 @@ func (sfw *singleFieldRecordWriter) Write(r Record) error { sfw.grouped = true sfw.fw.NewRowGroup() } + + sfw.numRows++ // TODO: adding row group support by calling fw.NewRowGroup() a := r.Column(sfw.fieldId) return sfw.fw.WriteColumnData(a) @@ -790,6 +793,9 @@ type SerializeWriter[T any] struct { } func (sw *SerializeWriter[T]) Flush() error { + if sw.pos == 0 { + return nil + } buf := sw.buffer[:sw.pos] r, size, err := sw.serializer(buf) if err != nil { @@ -881,7 +887,7 @@ type BinlogStreamWriter struct { memorySize int // To be updated on the fly buf bytes.Buffer - rw RecordWriter + rw *singleFieldRecordWriter } func (bsw *BinlogStreamWriter) GetRecordWriter() (RecordWriter, error) { @@ -916,8 +922,9 @@ func (bsw *BinlogStreamWriter) Finalize() (*Blob, error) { return nil, err } return &Blob{ - Key: strconv.Itoa(int(bsw.fieldSchema.FieldID)), - Value: b.Bytes(), + Key: strconv.Itoa(int(bsw.fieldSchema.FieldID)), + Value: b.Bytes(), + RowNum: int64(bsw.rw.numRows), }, nil }