Skip to content

Commit

Permalink
Adding a generic stream payload reader.
Browse files Browse the repository at this point in the history
Signed-off-by: Ted Xu <ted.xu@zilliz.com>
  • Loading branch information
tedxu committed Feb 20, 2024
1 parent 43e8cd5 commit 342e0ab
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 22 deletions.
35 changes: 13 additions & 22 deletions internal/storage/data_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,43 +1007,38 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
}
defer binlogReader.Close()

pid, sid = binlogReader.PartitionID, binlogReader.SegmentID
eventReader, err := binlogReader.NextEventReader()
if err != nil {
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, nil, err
}
defer eventReader.Close()

dataset, err := eventReader.GetByteArrayDataSet()
rr, err := eventReader.GetArrowRecordReader()
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, nil, err
}
defer rr.Release()

batchSize := int64(1024)
for dataset.HasNext() {
stringArray, err := dataset.NextBatch(batchSize)
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
}
for i := 0; i < len(stringArray); i++ {
for rr.Next() {
rec := rr.Record()
defer rec.Release()
column := rec.Column(0)
for i := 0; i < column.Len(); i++ {
deleteLog := &DeleteLog{}
if err = json.Unmarshal(stringArray[i], deleteLog); err != nil {
strVal := column.ValueStr(i)
if err = json.Unmarshal([]byte(strVal), deleteLog); err != nil {
// compatible with versions that only support int64 type primary keys
// compatible with fmt.Sprintf("%d,%d", pk, ts)
// compatible error info (unmarshal err invalid character ',' after top-level value)
splits := strings.Split(stringArray[i].String(), ",")
splits := strings.Split(strVal, ",")
if len(splits) != 2 {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("the format of delta log is incorrect, %v can not be split", stringArray[i])
return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("the format of delta log is incorrect, %v can not be split", strVal)
}
pk, err := strconv.ParseInt(splits[0], 10, 64)
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, nil, err
}
deleteLog.Pk = &Int64PrimaryKey{
Expand All @@ -1052,17 +1047,13 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
deleteLog.PkType = int64(schemapb.DataType_Int64)
deleteLog.Ts, err = strconv.ParseUint(splits[1], 10, 64)
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, nil, err
}
}

result.Append(deleteLog.Pk, deleteLog.Ts)
}
}
eventReader.Close()
binlogReader.Close()
}

return pid, sid, result, nil
Expand Down
2 changes: 2 additions & 0 deletions internal/storage/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package storage
import (
"github.com/apache/arrow/go/v12/parquet"
"github.com/apache/arrow/go/v12/parquet/file"
"github.com/apache/arrow/go/v12/parquet/pqarrow"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
)
Expand Down Expand Up @@ -69,6 +70,7 @@ type PayloadReaderInterface interface {
GetPayloadLengthFromReader() (int, error)

GetByteArrayDataSet() (*DataSet[parquet.ByteArray, *file.ByteArrayColumnChunkReader], error)
GetArrowRecordReader() (pqarrow.RecordReader, error)

ReleasePayloadReader() error
Close() error
Expand Down
16 changes: 16 additions & 0 deletions internal/storage/payload_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package storage

import (
"bytes"
"context"
"fmt"

"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/apache/arrow/go/v12/parquet"
"github.com/apache/arrow/go/v12/parquet/file"
"github.com/apache/arrow/go/v12/parquet/pqarrow"
"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"

Expand Down Expand Up @@ -263,6 +266,19 @@ func (r *PayloadReader) GetByteArrayDataSet() (*DataSet[parquet.ByteArray, *file
return NewDataSet[parquet.ByteArray, *file.ByteArrayColumnChunkReader](r.reader, 0, r.numRows), nil
}

func (r *PayloadReader) GetArrowRecordReader() (pqarrow.RecordReader, error) {
arrowReader, err := pqarrow.NewFileReader(r.reader, pqarrow.ArrowReadProperties{BatchSize: 1024}, memory.DefaultAllocator)
if err != nil {
return nil, err
}

rr, err := arrowReader.GetRecordReader(context.Background(), nil, nil)
if err != nil {
return nil, err
}
return rr, nil
}

func (r *PayloadReader) GetArrayFromPayload() ([]*schemapb.ScalarField, error) {
if r.colType != schemapb.DataType_Array {
return nil, fmt.Errorf("failed to get string from datatype %v", r.colType.String())
Expand Down
108 changes: 108 additions & 0 deletions internal/storage/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package storage

import (
"math"
"math/rand"
"testing"

"github.com/apache/arrow/go/v12/arrow/array"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -1385,3 +1387,109 @@ func TestPayload_ReaderAndWriter(t *testing.T) {
w.ReleasePayloadWriter()
})
}

func dataGen(size int) ([]byte, error) {
w, err := NewPayloadWriter(schemapb.DataType_String)
if err != nil {
return nil, err
}
defer w.Close()

letterRunes := []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

for i := 0; i < size; i++ {
b := make([]rune, 20)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
w.AddOneStringToPayload(string(b))
}
err = w.FinishPayloadWriter()
if err != nil {
return nil, err
}
buffer, err := w.GetPayloadBufferFromWriter()
if err != nil {
return nil, err
}
return buffer, err
}

func BenchmarkDefaultReader(b *testing.B) {
size := 1000000
buffer, err := dataGen(size)
assert.NoError(b, err)

b.ResetTimer()
r, err := NewPayloadReader(schemapb.DataType_String, buffer)
require.Nil(b, err)
defer r.ReleasePayloadReader()

length, err := r.GetPayloadLengthFromReader()
assert.NoError(b, err)
assert.Equal(b, length, size)

d, err := r.GetStringFromPayload()
assert.NoError(b, err)
for i := 0; i < 100; i++ {
for _, de := range d {
assert.Equal(b, 20, len(de))
}
}
}
func BenchmarkDataSetReader(b *testing.B) {
size := 1000000
buffer, err := dataGen(size)
assert.NoError(b, err)

b.ResetTimer()
r, err := NewPayloadReader(schemapb.DataType_String, buffer)
require.Nil(b, err)
defer r.ReleasePayloadReader()

length, err := r.GetPayloadLengthFromReader()
assert.NoError(b, err)
assert.Equal(b, length, size)

ds, err := r.GetByteArrayDataSet()
assert.NoError(b, err)

for i := 0; i < 100; i++ {
for ds.HasNext() {
stringArray, err := ds.NextBatch(1024)
assert.NoError(b, err)
for _, de := range stringArray {
assert.Equal(b, 20, len(string(de)))
}
}
}
}
func BenchmarkArrowRecordReader(b *testing.B) {
size := 1000000
buffer, err := dataGen(size)
assert.NoError(b, err)

b.ResetTimer()
r, err := NewPayloadReader(schemapb.DataType_String, buffer)
require.Nil(b, err)
defer r.ReleasePayloadReader()

length, err := r.GetPayloadLengthFromReader()
assert.NoError(b, err)
assert.Equal(b, length, size)

rr, err := r.GetArrowRecordReader()
assert.NoError(b, err)
defer rr.Release()

for i := 0; i < 100; i++ {
for rr.Next() {
rec := rr.Record()
arr := rec.Column(0).(*array.String)
defer rec.Release()
for i := 0; i < arr.Len(); i++ {
assert.Equal(b, 20, len(arr.Value(i)))
}
}
}
}

0 comments on commit 342e0ab

Please sign in to comment.