Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: enable stream writer in compactions #32612

Merged
merged 3 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 8 additions & 13 deletions internal/datanode/binlog_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,22 +80,18 @@
}

// genInsertBlobs returns insert-paths and save blob to kvs
func genInsertBlobs(b io.BinlogIO, allocator allocator.Allocator, data *InsertData, collectionID, partID, segID UniqueID, iCodec *storage.InsertCodec, kvs map[string][]byte) (map[UniqueID]*datapb.FieldBinlog, error) {
inlogs, err := iCodec.Serialize(partID, segID, data)
if err != nil {
return nil, err
}

func genInsertBlobs(b io.BinlogIO, allocator allocator.Allocator, data []*Blob, collectionID, partID, segID UniqueID, kvs map[string][]byte,
) (map[UniqueID]*datapb.FieldBinlog, error) {
inpaths := make(map[UniqueID]*datapb.FieldBinlog)
notifyGenIdx := make(chan struct{})
defer close(notifyGenIdx)

generator, err := allocator.GetGenerator(len(inlogs), notifyGenIdx)
generator, err := allocator.GetGenerator(len(data), notifyGenIdx)
if err != nil {
return nil, err
}

for _, blob := range inlogs {
for _, blob := range data {
// Blob Key is generated by Serialize from int64 fieldID in collection schema, which won't raise error in ParseInt
fID, _ := strconv.ParseInt(blob.GetKey(), 10, 64)
k := metautil.JoinIDPath(collectionID, partID, segID, fID, <-generator)
Expand Down Expand Up @@ -177,22 +173,21 @@
collectionID UniqueID,
partID UniqueID,
segID UniqueID,
iData *InsertData,
iCodec *storage.InsertCodec,
data []*Blob,
) (map[UniqueID]*datapb.FieldBinlog, error) {
ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "UploadInsertLog")
defer span.End()
kvs := make(map[string][]byte)

if iData.IsEmpty() {
if len(data) <= 0 || data[0].RowNum <= 0 {
log.Warn("binlog io uploading empty insert data",
zap.Int64("segmentID", segID),
zap.Int64("collectionID", iCodec.Schema.GetID()),
zap.Int64("collectionID", collectionID),

Check warning on line 185 in internal/datanode/binlog_io.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/binlog_io.go#L185

Added line #L185 was not covered by tests
)
return nil, nil
}

inpaths, err := genInsertBlobs(b, allocator, iData, collectionID, partID, segID, iCodec, kvs)
inpaths, err := genInsertBlobs(b, allocator, data, collectionID, partID, segID, kvs)
if err != nil {
return nil, err
}
Expand Down
51 changes: 24 additions & 27 deletions internal/datanode/binlog_io_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,21 +124,17 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
f := &MetaFactory{}
meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", schemapb.DataType_Int64)

t.Run("empty insert", func(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool())
iCodec := storage.NewInsertCodecWithSchema(meta)
paths, err := uploadInsertLog(context.Background(), binlogIO, alloc, meta.GetID(), 10, 1, genEmptyInsertData(), iCodec)
assert.NoError(t, err)
assert.Nil(t, paths)
})

t.Run("gen insert blob failed", func(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool())
iCodec := storage.NewInsertCodecWithSchema(meta)
var partId int64 = 10
var segId int64 = 1
iData := genInsertData(2)
blobs, err := iCodec.Serialize(10, 1, iData)
assert.NoError(t, err)
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(nil, fmt.Errorf("mock err"))
_, err := uploadInsertLog(context.Background(), binlogIO, alloc, meta.GetID(), 10, 1, genInsertData(2), iCodec)
_, err = uploadInsertLog(context.Background(), binlogIO, alloc, meta.GetID(), partId, segId, blobs)
assert.Error(t, err)
})

Expand All @@ -147,13 +143,18 @@ func TestBinlogIOInterfaceMethods(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
binlogIO := io.NewBinlogIO(mkc, getOrCreateIOPool())
iCodec := storage.NewInsertCodecWithSchema(meta)
var partId int64 = 1
var segId int64 = 10
iData := genInsertData(2)
blobs, err := iCodec.Serialize(10, 1, iData)
assert.NoError(t, err)

alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Call.Return(validGeneratorFn, nil)

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

_, err := uploadInsertLog(ctx, binlogIO, alloc, meta.GetID(), 1, 10, genInsertData(2), iCodec)
_, err = uploadInsertLog(ctx, binlogIO, alloc, meta.GetID(), partId, segId, blobs)
assert.Error(t, err)
})
})
Expand Down Expand Up @@ -256,9 +257,13 @@ func TestBinlogIOInnerMethods(t *testing.T) {
t.Run(test.description, func(t *testing.T) {
meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", test.pkType)
iCodec := storage.NewInsertCodecWithSchema(meta)

var partId int64 = 10
var segId int64 = 1
iData := genInsertData(2)
blobs, err := iCodec.Serialize(10, 1, iData)
assert.NoError(t, err)
kvs := make(map[string][]byte)
pin, err := genInsertBlobs(binlogIO, alloc, genInsertData(2), meta.GetID(), 10, 1, iCodec, kvs)
pin, err := genInsertBlobs(binlogIO, alloc, blobs, meta.GetID(), partId, segId, kvs)

assert.NoError(t, err)
assert.Equal(t, 12, len(pin))
Expand All @@ -277,30 +282,22 @@ func TestBinlogIOInnerMethods(t *testing.T) {
cm := storage.NewLocalChunkManager(storage.RootPath(binlogTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())

t.Run("serialize error", func(t *testing.T) {
iCodec := storage.NewInsertCodecWithSchema(nil)

alloc := allocator.NewMockAllocator(t)
binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool())
kvs := make(map[string][]byte)
pin, err := genInsertBlobs(binlogIO, alloc, genEmptyInsertData(), 0, 10, 1, iCodec, kvs)

assert.Error(t, err)
assert.Empty(t, kvs)
assert.Empty(t, pin)
})

t.Run("GetGenerator error", func(t *testing.T) {
f := &MetaFactory{}
meta := f.GetCollectionMeta(UniqueID(10001), "test_gen_blobs", schemapb.DataType_Int64)
iCodec := storage.NewInsertCodecWithSchema(meta)
var partId int64 = 10
var segId int64 = 1
iData := genInsertData(2)
blobs, err := iCodec.Serialize(partId, segId, iData)
assert.NoError(t, err)

alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().GetGenerator(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock GetGenerator error"))
binlogIO := io.NewBinlogIO(cm, getOrCreateIOPool())
kvs := make(map[string][]byte)

pin, err := genInsertBlobs(binlogIO, alloc, genInsertData(2), meta.GetID(), 10, 1, iCodec, kvs)
pin, err := genInsertBlobs(binlogIO, alloc, blobs, meta.GetID(), partId, segId, kvs)

assert.Error(t, err)
assert.Empty(t, kvs)
Expand Down
157 changes: 71 additions & 86 deletions internal/datanode/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@
errContext = errors.New("context done or timeout")
)

type iterator = storage.Iterator

type compactor interface {
complete()
compact() (*datapb.CompactionPlanResult, error)
Expand Down Expand Up @@ -174,48 +172,15 @@
return pk2ts, nil
}

func (t *compactionTask) uploadRemainLog(
ctxTimeout context.Context,
targetSegID UniqueID,
partID UniqueID,
meta *etcdpb.CollectionMeta,
stats *storage.PrimaryKeyStats,
totRows int64,
writeBuffer *storage.InsertData,
) (map[UniqueID]*datapb.FieldBinlog, map[UniqueID]*datapb.FieldBinlog, error) {
iCodec := storage.NewInsertCodecWithSchema(meta)
inPaths := make(map[int64]*datapb.FieldBinlog, 0)
var err error
if !writeBuffer.IsEmpty() {
inPaths, err = uploadInsertLog(ctxTimeout, t.binlogIO, t.Allocator, meta.GetID(), partID, targetSegID, writeBuffer, iCodec)
if err != nil {
return nil, nil, err
}
}

statPaths, err := uploadStatsLog(ctxTimeout, t.binlogIO, t.Allocator, meta.GetID(), partID, targetSegID, stats, totRows, iCodec)
if err != nil {
return nil, nil, err
func newBinlogWriter(collectionId, partitionId, segmentId UniqueID, schema *schemapb.CollectionSchema,
) (writer *storage.SerializeWriter[*storage.Value], closers []func() (*Blob, error), err error) {
fieldWriters := storage.NewBinlogStreamWriters(collectionId, partitionId, segmentId, schema.Fields)
closers = make([]func() (*Blob, error), 0, len(fieldWriters))
for _, w := range fieldWriters {
closers = append(closers, w.Finalize)
}

return inPaths, statPaths, nil
}

func (t *compactionTask) uploadSingleInsertLog(
ctxTimeout context.Context,
targetSegID UniqueID,
partID UniqueID,
meta *etcdpb.CollectionMeta,
writeBuffer *storage.InsertData,
) (map[UniqueID]*datapb.FieldBinlog, error) {
iCodec := storage.NewInsertCodecWithSchema(meta)

inPaths, err := uploadInsertLog(ctxTimeout, t.binlogIO, t.Allocator, meta.GetID(), partID, targetSegID, writeBuffer, iCodec)
if err != nil {
return nil, err
}

return inPaths, nil
writer, err = storage.NewBinlogSerializeWriter(schema, partitionId, segmentId, fieldWriters, 1024)
return
}

func (t *compactionTask) merge(
Expand All @@ -231,21 +196,22 @@
log := log.With(zap.Int64("planID", t.getPlanID()))
mergeStart := time.Now()

writer, finalizers, err := newBinlogWriter(meta.GetID(), partID, targetSegID, meta.GetSchema())
if err != nil {
return nil, nil, 0, err
}

var (
numBinlogs int // binlog number
numRows int64 // the number of rows uploaded
expired int64 // the number of expired entity
numBinlogs int // binlog number
numRows uint64 // the number of rows uploaded
expired int64 // the number of expired entity

insertField2Path = make(map[UniqueID]*datapb.FieldBinlog)
insertPaths = make([]*datapb.FieldBinlog, 0)

statField2Path = make(map[UniqueID]*datapb.FieldBinlog)
statPaths = make([]*datapb.FieldBinlog, 0)
)
writeBuffer, err := storage.NewInsertData(meta.GetSchema())
if err != nil {
return nil, nil, -1, err
}

isDeletedValue := func(v *storage.Value) bool {
ts, ok := delta[v.PK.GetValue()]
Expand Down Expand Up @@ -306,7 +272,7 @@
numRows = 0
numBinlogs = 0
currentTs := t.GetCurrentTime()
currentRows := 0
unflushedRows := 0
downloadTimeCost := time.Duration(0)
uploadInsertTimeCost := time.Duration(0)

Expand All @@ -325,6 +291,30 @@
timestampFrom int64 = -1
)

flush := func() error {
uploadInsertStart := time.Now()
writer.Close()
fieldData := make([]*Blob, len(finalizers))

for i, f := range finalizers {
blob, err := f()
if err != nil {
return err

Check warning on line 302 in internal/datanode/compactor.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/compactor.go#L302

Added line #L302 was not covered by tests
}
fieldData[i] = blob
}
inPaths, err := uploadInsertLog(ctx, t.binlogIO, t.Allocator, meta.ID, partID, targetSegID, fieldData)
if err != nil {
log.Warn("failed to upload single insert log", zap.Error(err))
return err

Check warning on line 309 in internal/datanode/compactor.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/compactor.go#L308-L309

Added lines #L308 - L309 were not covered by tests
}
numBinlogs += len(inPaths)
uploadInsertTimeCost += time.Since(uploadInsertStart)
addInsertFieldPath(inPaths, timestampFrom, timestampTo)
unflushedRows = 0
return nil
}

for _, path := range unMergedInsertlogs {
downloadStart := time.Now()
data, err := downloadBlobs(ctx, t.binlogIO, path)
Expand Down Expand Up @@ -370,55 +360,50 @@
timestampTo = v.Timestamp
}

row, ok := v.Value.(map[UniqueID]interface{})
if !ok {
log.Warn("transfer interface to map wrong", zap.Strings("path", path))
return nil, nil, 0, errors.New("unexpected error")
}

err = writeBuffer.Append(row)
err = writer.Write(v)
if err != nil {
return nil, nil, 0, err
}
numRows++
unflushedRows++

currentRows++
stats.Update(v.PK)

// check size every 100 rows in case of too many `GetMemorySize` call
if (currentRows+1)%100 == 0 && writeBuffer.GetMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsInt() {
numRows += int64(writeBuffer.GetRowNum())
uploadInsertStart := time.Now()
inPaths, err := t.uploadSingleInsertLog(ctx, targetSegID, partID, meta, writeBuffer)
if err != nil {
log.Warn("failed to upload single insert log", zap.Error(err))
return nil, nil, 0, err
if (unflushedRows+1)%100 == 0 {
writer.Flush() // Flush to update memory size

if writer.WrittenMemorySize() > paramtable.Get().DataNodeCfg.BinLogMaxSize.GetAsUint64() {
if err := flush(); err != nil {
return nil, nil, 0, err

Check warning on line 378 in internal/datanode/compactor.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/compactor.go#L378

Added line #L378 was not covered by tests
}
timestampFrom = -1
timestampTo = -1

writer, finalizers, err = newBinlogWriter(meta.ID, targetSegID, partID, meta.Schema)
if err != nil {
return nil, nil, 0, err

Check warning on line 385 in internal/datanode/compactor.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/compactor.go#L385

Added line #L385 was not covered by tests
}
}
uploadInsertTimeCost += time.Since(uploadInsertStart)
addInsertFieldPath(inPaths, timestampFrom, timestampTo)
timestampFrom = -1
timestampTo = -1

writeBuffer, _ = storage.NewInsertData(meta.GetSchema())
currentRows = 0
numBinlogs++
}
}
}

// upload stats log and remain insert rows
if writeBuffer.GetRowNum() > 0 || numRows > 0 {
numRows += int64(writeBuffer.GetRowNum())
uploadStart := time.Now()
inPaths, statsPaths, err := t.uploadRemainLog(ctx, targetSegID, partID, meta,
stats, numRows+int64(currentRows), writeBuffer)
if err != nil {
// final flush if there is unflushed rows
if unflushedRows > 0 {
if err := flush(); err != nil {
return nil, nil, 0, err
}
}

uploadInsertTimeCost += time.Since(uploadStart)
addInsertFieldPath(inPaths, timestampFrom, timestampTo)
// upload stats log
if numRows > 0 {
iCodec := storage.NewInsertCodecWithSchema(meta)
statsPaths, err := uploadStatsLog(ctx, t.binlogIO, t.Allocator, meta.GetID(), partID, targetSegID, stats, int64(numRows), iCodec)
if err != nil {
return nil, nil, 0, err

Check warning on line 404 in internal/datanode/compactor.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/compactor.go#L404

Added line #L404 was not covered by tests
}
addStatFieldPath(statsPaths)
numBinlogs += len(inPaths)
}

for _, path := range insertField2Path {
Expand All @@ -430,14 +415,14 @@
}

log.Info("compact merge end",
zap.Int64("remaining insert numRows", numRows),
zap.Uint64("remaining insert numRows", numRows),
zap.Int64("expired entities", expired),
zap.Int("binlog file number", numBinlogs),
zap.Duration("download insert log elapse", downloadTimeCost),
zap.Duration("upload insert log elapse", uploadInsertTimeCost),
zap.Duration("merge elapse", time.Since(mergeStart)))

return insertPaths, statPaths, numRows, nil
return insertPaths, statPaths, int64(numRows), nil
}

func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) {
Expand Down
Loading
Loading