Skip to content

Commit

Permalink
Enhance: enable stream writer in compactions
Browse files Browse the repository at this point in the history
Signed-off-by: Ted Xu <ted.xu@zilliz.com>
  • Loading branch information
tedxu committed Apr 25, 2024
1 parent 0ff7a46 commit 02bea94
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 169 deletions.
21 changes: 8 additions & 13 deletions internal/datanode/binlog_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,22 +80,18 @@ func genDeltaBlobs(b io.BinlogIO, allocator allocator.Allocator, data *DeleteDat
}

// genInsertBlobs returns insert-paths and save blob to kvs
func genInsertBlobs(b io.BinlogIO, allocator allocator.Allocator, data *InsertData, collectionID, partID, segID UniqueID, iCodec *storage.InsertCodec, kvs map[string][]byte) (map[UniqueID]*datapb.FieldBinlog, error) {
inlogs, err := iCodec.Serialize(partID, segID, data)
if err != nil {
return nil, err
}

func genInsertBlobs(b io.BinlogIO, allocator allocator.Allocator, data []*Blob, collectionID, partID, segID UniqueID, kvs map[string][]byte,
) (map[UniqueID]*datapb.FieldBinlog, error) {
inpaths := make(map[UniqueID]*datapb.FieldBinlog)
notifyGenIdx := make(chan struct{})
defer close(notifyGenIdx)

generator, err := allocator.GetGenerator(len(inlogs), notifyGenIdx)
generator, err := allocator.GetGenerator(len(data), notifyGenIdx)
if err != nil {
return nil, err
}

for _, blob := range inlogs {
for _, blob := range data {
// Blob Key is generated by Serialize from int64 fieldID in collection schema, which won't raise error in ParseInt
fID, _ := strconv.ParseInt(blob.GetKey(), 10, 64)
k := metautil.JoinIDPath(collectionID, partID, segID, fID, <-generator)
Expand Down Expand Up @@ -177,22 +173,21 @@ func uploadInsertLog(
collectionID UniqueID,
partID UniqueID,
segID UniqueID,
iData *InsertData,
iCodec *storage.InsertCodec,
data []*Blob,
) (map[UniqueID]*datapb.FieldBinlog, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "UploadInsertLog")
defer span.End()
kvs := make(map[string][]byte)

if iData.IsEmpty() {
if len(data) <= 0 || data[0].RowNum <= 0 {
log.Warn("binlog io uploading empty insert data",
zap.Int64("segmentID", segID),
zap.Int64("collectionID", iCodec.Schema.GetID()),
zap.Int64("collectionID", collectionID),
)
return nil, nil
}

inpaths, err := genInsertBlobs(b, allocator, iData, collectionID, partID, segID, iCodec, kvs)
inpaths, err := genInsertBlobs(b, allocator, data, collectionID, partID, segID, kvs)
if err != nil {
return nil, err
}
Expand Down
51 changes: 24 additions & 27 deletions internal/datanode/binlog_io_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,21 +124,17 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
f := &MetaFactory{}
meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", schemapb.DataType_Int64)

t.Run("empty insert", func(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool())
iCodec := storage.NewInsertCodecWithSchema(meta)
paths, err := uploadInsertLog(context.Background(), binlogIO, alloc, meta.GetID(), 10, 1, genEmptyInsertData(), iCodec)
assert.NoError(t, err)
assert.Nil(t, paths)
})

t.Run("gen insert blob failed", func(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool())
iCodec := storage.NewInsertCodecWithSchema(meta)
var partId int64 = 10
var segId int64 = 1
iData := genInsertData(2)
blobs, err := iCodec.Serialize(10, 1, iData)
assert.NoError(t, err)
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(nil, fmt.Errorf("mock err"))
_, err := uploadInsertLog(context.Background(), binlogIO, alloc, meta.GetID(), 10, 1, genInsertData(2), iCodec)
_, err = uploadInsertLog(context.Background(), binlogIO, alloc, meta.GetID(), partId, segId, blobs)
assert.Error(t, err)
})

Expand All @@ -147,13 +143,18 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
binlogIO := io.NewBinlogIO(mkc, getOrCreateIOPool())
iCodec := storage.NewInsertCodecWithSchema(meta)
var partId int64 = 1
var segId int64 = 10
iData := genInsertData(2)
blobs, err := iCodec.Serialize(10, 1, iData)
assert.NoError(t, err)

alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil)

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

_, err := uploadInsertLog(ctx, binlogIO, alloc, meta.GetID(), 1, 10, genInsertData(2), iCodec)
_, err = uploadInsertLog(ctx, binlogIO, alloc, meta.GetID(), partId, segId, blobs)
assert.Error(t, err)
})
})
Expand Down Expand Up @@ -256,9 +257,13 @@ func TestBinlogIOInnerMethods(t *testing.T) {
t.Run(test.description, func(t *testing.T) {
meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", test.pkType)
iCodec := storage.NewInsertCodecWithSchema(meta)

var partId int64 = 10
var segId int64 = 1
iData := genInsertData(2)
blobs, err := iCodec.Serialize(10, 1, iData)
assert.NoError(t, err)
kvs := make(map[string][]byte)
pin, err := genInsertBlobs(binlogIO, alloc, genInsertData(2), meta.GetID(), 10, 1, iCodec, kvs)
pin, err := genInsertBlobs(binlogIO, alloc, blobs, meta.GetID(), partId, segId, kvs)

assert.NoError(t, err)
assert.Equal(t, 12, len(pin))
Expand All @@ -277,30 +282,22 @@ func TestBinlogIOInnerMethods(t *testing.T) {
cm := storage.NewLocalChunkManager(storage.RootPath(binlogTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())

t.Run("serialize error", func(t *testing.T) {
iCodec := storage.NewInsertCodecWithSchema(nil)

alloc := allocator.NewMockAllocator(t)
binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool())
kvs := make(map[string][]byte)
pin, err := genInsertBlobs(binlogIO, alloc, genEmptyInsertData(), 0, 10, 1, iCodec, kvs)

assert.Error(t, err)
assert.Empty(t, kvs)
assert.Empty(t, pin)
})

t.Run("GetGenerator error", func(t *testing.T) {
f := &MetaFactory{}
meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", schemapb.DataType_Int64)
iCodec := storage.NewInsertCodecWithSchema(meta)
var partId int64 = 10
var segId int64 = 1
iData := genInsertData(2)
blobs, err := iCodec.Serialize(partId, segId, iData)
assert.NoError(t, err)

alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock GetGenerator error"))
binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool())
kvs := make(map[string][]byte)

pin, err := genInsertBlobs(binlogIO, alloc, genInsertData(2), meta.GetID(), 10, 1, iCodec, kvs)
pin, err := genInsertBlobs(binlogIO, alloc, blobs, meta.GetID(), partId, segId, kvs)

assert.Error(t, err)
assert.Empty(t, kvs)
Expand Down
157 changes: 71 additions & 86 deletions internal/datanode/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ var (
errContext = errors.New("context done or timeout")
)

type iterator = storage.Iterator

type compactor interface {
complete()
compact() (*datapb.CompactionPlanResult, error)
Expand Down Expand Up @@ -174,48 +172,15 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob) (map[interf
return pk2ts, nil
}

func (t *compactionTask) uploadRemainLog(
ctxTimeout context.Context,
targetSegID UniqueID,
partID UniqueID,
meta *etcdpb.CollectionMeta,
stats *storage.PrimaryKeyStats,
totRows int64,
writeBuffer *storage.InsertData,
) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) {
iCodec := storage.NewInsertCodecWithSchema(meta)
inPaths := make(map[int64]*datapb.FieldBinlog, 0)
var err error
if !writeBuffer.IsEmpty() {
inPaths, err = uploadInsertLog(ctxTimeout, t.binlogIO, t.Allocator, meta.GetID(), partID, targetSegID, writeBuffer, iCodec)
if err != nil {
return nil, nil, err
}
}

statPaths, err := uploadStatsLog(ctxTimeout, t.binlogIO, t.Allocator, meta.GetID(), partID, targetSegID, stats, totRows, iCodec)
if err != nil {
return nil, nil, err
func newBinlogWriter(collectionId, partitionId, segmentId UniqueID, schema *schemapb.CollectionSchema,
) (writer *storage.SerializeWriter[*storage.Value], closers []func() (*Blob, error), err error) {
fieldWriters := storage.NewBinlogStreamWriters(collectionId, partitionId, segmentId, schema.Fields)
closers = make([]func() (*Blob, error), 0, len(fieldWriters))
for _, w := range fieldWriters {
closers = append(closers, w.Finalize)
}

return inPaths, statPaths, nil
}

func (t *compactionTask) uploadSingleInsertLog(
ctxTimeout context.Context,
targetSegID UniqueID,
partID UniqueID,
meta *etcdpb.CollectionMeta,
writeBuffer *storage.InsertData,
) (map[UniqueID]*datapb.FieldBinlog, error) {
iCodec := storage.NewInsertCodecWithSchema(meta)

inPaths, err := uploadInsertLog(ctxTimeout, t.binlogIO, t.Allocator, meta.GetID(), partID, targetSegID, writeBuffer, iCodec)
if err != nil {
return nil, err
}

return inPaths, nil
writer, err = storage.NewBinlogSerializeWriter(schema, partitionId, segmentId, fieldWriters, 1024)
return
}

func (t *compactionTask) merge(
Expand All @@ -231,21 +196,22 @@ func (t *compactionTask) merge(
log := log.With(zap.Int64("planID", t.getPlanID()))
mergeStart := time.Now()

writer, finalizers, err := newBinlogWriter(meta.GetID(), partID, targetSegID, meta.GetSchema())
if err != nil {
return nil, nil, 0, err
}

var (
numBinlogs int // binlog number
numRows int64 // the number of rows uploaded
expired int64 // the number of expired entity
numBinlogs int // binlog number
numRows uint64 // the number of rows uploaded
expired int64 // the number of expired entity

insertField2Path = make(map[UniqueID]*datapb.FieldBinlog)
insertPaths = make([]*datapb.FieldBinlog, 0)

statField2Path = make(map[UniqueID]*datapb.FieldBinlog)
statPaths = make([]*datapb.FieldBinlog, 0)
)
writeBuffer, err := storage.NewInsertData(meta.GetSchema())
if err != nil {
return nil, nil, -1, err
}

isDeletedValue := func(v *storage.Value) bool {
ts, ok := delta[v.PK.GetValue()]
Expand Down Expand Up @@ -306,7 +272,7 @@ func (t *compactionTask) merge(
numRows = 0
numBinlogs = 0
currentTs := t.GetCurrentTime()
currentRows := 0
unflushedRows := 0
downloadTimeCost := time.Duration(0)
uploadInsertTimeCost := time.Duration(0)

Expand All @@ -325,6 +291,30 @@ func (t *compactionTask) merge(
timestampFrom int64 = -1
)

flush := func() error {
uploadInsertStart := time.Now()
writer.Close()
fieldData := make([]*Blob, len(finalizers))

for i, f := range finalizers {
blob, err := f()
if err != nil {
return err
}
fieldData[i] = blob
}
inPaths, err := uploadInsertLog(ctx, t.binlogIO, t.Allocator, meta.ID, partID, targetSegID, fieldData)
if err != nil {
log.Warn("failed to upload single insert log", zap.Error(err))
return err
}
numBinlogs += len(inPaths)
uploadInsertTimeCost += time.Since(uploadInsertStart)
addInsertFieldPath(inPaths, timestampFrom, timestampTo)
unflushedRows = 0
return nil
}

for _, path := range unMergedInsertlogs {
downloadStart := time.Now()
data, err := downloadBlobs(ctx, t.binlogIO, path)
Expand Down Expand Up @@ -370,55 +360,50 @@ func (t *compactionTask) merge(
timestampTo = v.Timestamp
}

row, ok := v.Value.(map[UniqueID]interface{})
if !ok {
log.Warn("transfer interface to map wrong", zap.Strings("path", path))
return nil, nil, 0, errors.New("unexpected error")
}

err = writeBuffer.Append(row)
err = writer.Write(v)
numRows++
unflushedRows++
if err != nil {
return nil, nil, 0, err
}

currentRows++
stats.Update(v.PK)

// check size every 100 rows in case of too many `GetMemorySize` call
if (currentRows+1)%100 == 0 && writeBuffer.GetMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsInt() {
numRows += int64(writeBuffer.GetRowNum())
uploadInsertStart := time.Now()
inPaths, err := t.uploadSingleInsertLog(ctx, targetSegID, partID, meta, writeBuffer)
if err != nil {
log.Warn("failed to upload single insert log", zap.Error(err))
return nil, nil, 0, err
if (unflushedRows+1)%100 == 0 {
writer.Flush() // Flush to update memory size

if writer.WrittenMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64() {
if err := flush(); err != nil {
return nil, nil, 0, err
}
timestampFrom = -1
timestampTo = -1

writer, finalizers, err = newBinlogWriter(meta.ID, targetSegID, partID, meta.Schema)
if err != nil {
return nil, nil, 0, err
}
}
uploadInsertTimeCost += time.Since(uploadInsertStart)
addInsertFieldPath(inPaths, timestampFrom, timestampTo)
timestampFrom = -1
timestampTo = -1

writeBuffer, _ = storage.NewInsertData(meta.GetSchema())
currentRows = 0
numBinlogs++
}
}
}

// upload stats log and remain insert rows
if writeBuffer.GetRowNum() > 0 || numRows > 0 {
numRows += int64(writeBuffer.GetRowNum())
uploadStart := time.Now()
inPaths, statsPaths, err := t.uploadRemainLog(ctx, targetSegID, partID, meta,
stats, numRows+int64(currentRows), writeBuffer)
if err != nil {
// final flush if there is unflushed rows
if unflushedRows > 0 {
if err := flush(); err != nil {
return nil, nil, 0, err
}
}

uploadInsertTimeCost += time.Since(uploadStart)
addInsertFieldPath(inPaths, timestampFrom, timestampTo)
// upload stats log
if numRows > 0 {
iCodec := storage.NewInsertCodecWithSchema(meta)
statsPaths, err := uploadStatsLog(ctx, t.binlogIO, t.Allocator, meta.GetID(), partID, targetSegID, stats, int64(numRows), iCodec)
if err != nil {
return nil, nil, 0, err
}
addStatFieldPath(statsPaths)
numBinlogs += len(inPaths)
}

for _, path := range insertField2Path {
Expand All @@ -430,14 +415,14 @@ func (t *compactionTask) merge(
}

log.Info("compact merge end",
zap.Int64("remaining insert numRows", numRows),
zap.Uint64("remaining insert numRows", numRows),
zap.Int64("expired entities", expired),
zap.Int("binlog file number", numBinlogs),
zap.Duration("download insert log elapse", downloadTimeCost),
zap.Duration("upload insert log elapse", uploadInsertTimeCost),
zap.Duration("merge elapse", time.Since(mergeStart)))

return insertPaths, statPaths, numRows, nil
return insertPaths, statPaths, int64(numRows), nil
}

func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) {
Expand Down
Loading

0 comments on commit 02bea94

Please sign in to comment.