Skip to content

Commit

Permalink
enhance: delete codc deserialize data by stream batch (#30407)
Browse files Browse the repository at this point in the history
relate: #30404

---------

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
  • Loading branch information
aoiasd authored Feb 6, 2024
1 parent d4100d5 commit a053715
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 61 deletions.
68 changes: 38 additions & 30 deletions internal/storage/data_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,43 +1015,51 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
return InvalidUniqueID, InvalidUniqueID, nil, err
}

stringArray, err := eventReader.GetStringFromPayload()
dataset, err := eventReader.GetByteArrayDataSet()
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, nil, err
}
for i := 0; i < len(stringArray); i++ {
deleteLog := &DeleteLog{}
if err = json.Unmarshal([]byte(stringArray[i]), deleteLog); err != nil {
// compatible with versions that only support int64 type primary keys
// compatible with fmt.Sprintf("%d,%d", pk, ts)
// compatible error info (unmarshal err invalid character ',' after top-level value)
splits := strings.Split(stringArray[i], ",")
if len(splits) != 2 {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("the format of delta log is incorrect, %v can not be split", stringArray[i])
}
pk, err := strconv.ParseInt(splits[0], 10, 64)
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, nil, err
}
deleteLog.Pk = &Int64PrimaryKey{
Value: pk,
}
deleteLog.PkType = int64(schemapb.DataType_Int64)
deleteLog.Ts, err = strconv.ParseUint(splits[1], 10, 64)
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, nil, err
}

batchSize := int64(1024)
for dataset.HasNext() {
stringArray, err := dataset.NextBatch(batchSize)
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
}
for i := 0; i < len(stringArray); i++ {
deleteLog := &DeleteLog{}
if err = json.Unmarshal(stringArray[i], deleteLog); err != nil {
// compatible with versions that only support int64 type primary keys
// compatible with fmt.Sprintf("%d,%d", pk, ts)
// compatible error info (unmarshal err invalid character ',' after top-level value)
splits := strings.Split(stringArray[i].String(), ",")
if len(splits) != 2 {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("the format of delta log is incorrect, %v can not be split", stringArray[i])
}
pk, err := strconv.ParseInt(splits[0], 10, 64)
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, nil, err
}
deleteLog.Pk = &Int64PrimaryKey{
Value: pk,
}
deleteLog.PkType = int64(schemapb.DataType_Int64)
deleteLog.Ts, err = strconv.ParseUint(splits[1], 10, 64)
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, nil, err
}
}

result.Append(deleteLog.Pk, deleteLog.Ts)
result.Append(deleteLog.Pk, deleteLog.Ts)
}
}
eventReader.Close()
binlogReader.Close()
Expand Down
158 changes: 127 additions & 31 deletions internal/storage/data_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,42 +494,138 @@ func TestDeleteCodec(t *testing.T) {
}

func TestUpgradeDeleteLog(t *testing.T) {
binlogWriter := NewDeleteBinlogWriter(schemapb.DataType_String, CollectionID, 1, 1)
eventWriter, err := binlogWriter.NextDeleteEventWriter()
assert.NoError(t, err)
t.Run("normal", func(t *testing.T) {
binlogWriter := NewDeleteBinlogWriter(schemapb.DataType_String, CollectionID, 1, 1)
eventWriter, err := binlogWriter.NextDeleteEventWriter()
assert.NoError(t, err)

dData := &DeleteData{
Pks: []PrimaryKey{&Int64PrimaryKey{Value: 1}, &Int64PrimaryKey{Value: 2}},
Tss: []Timestamp{100, 200},
RowCount: 2,
}
dData := &DeleteData{
Pks: []PrimaryKey{&Int64PrimaryKey{Value: 1}, &Int64PrimaryKey{Value: 2}},
Tss: []Timestamp{100, 200},
RowCount: 2,
}

sizeTotal := 0
for i := int64(0); i < dData.RowCount; i++ {
int64PkValue := dData.Pks[i].(*Int64PrimaryKey).Value
ts := dData.Tss[i]
err = eventWriter.AddOneStringToPayload(fmt.Sprintf("%d,%d", int64PkValue, ts))
assert.NoError(t, err)
sizeTotal += binary.Size(int64PkValue)
sizeTotal += binary.Size(ts)
}
eventWriter.SetEventTimestamp(100, 200)
binlogWriter.SetEventTimeStamp(100, 200)
binlogWriter.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))

sizeTotal := 0
for i := int64(0); i < dData.RowCount; i++ {
int64PkValue := dData.Pks[i].(*Int64PrimaryKey).Value
ts := dData.Tss[i]
err = eventWriter.AddOneStringToPayload(fmt.Sprintf("%d,%d", int64PkValue, ts))
err = binlogWriter.Finish()
assert.NoError(t, err)
sizeTotal += binary.Size(int64PkValue)
sizeTotal += binary.Size(ts)
}
eventWriter.SetEventTimestamp(100, 200)
binlogWriter.SetEventTimeStamp(100, 200)
binlogWriter.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
buffer, err := binlogWriter.GetBuffer()
assert.NoError(t, err)
blob := &Blob{Value: buffer}

err = binlogWriter.Finish()
assert.NoError(t, err)
buffer, err := binlogWriter.GetBuffer()
assert.NoError(t, err)
blob := &Blob{Value: buffer}
dCodec := NewDeleteCodec()
parID, segID, deleteData, err := dCodec.Deserialize([]*Blob{blob})
assert.NoError(t, err)
assert.Equal(t, int64(1), parID)
assert.Equal(t, int64(1), segID)
assert.ElementsMatch(t, dData.Pks, deleteData.Pks)
assert.ElementsMatch(t, dData.Tss, deleteData.Tss)
})

dCodec := NewDeleteCodec()
parID, segID, deleteData, err := dCodec.Deserialize([]*Blob{blob})
assert.NoError(t, err)
assert.Equal(t, int64(1), parID)
assert.Equal(t, int64(1), segID)
assert.ElementsMatch(t, dData.Pks, deleteData.Pks)
assert.ElementsMatch(t, dData.Tss, deleteData.Tss)
t.Run("with split lenth error", func(t *testing.T) {
binlogWriter := NewDeleteBinlogWriter(schemapb.DataType_String, CollectionID, 1, 1)
eventWriter, err := binlogWriter.NextDeleteEventWriter()
assert.NoError(t, err)

dData := &DeleteData{
Pks: []PrimaryKey{&Int64PrimaryKey{Value: 1}, &Int64PrimaryKey{Value: 2}},
Tss: []Timestamp{100, 200},
RowCount: 2,
}

for i := int64(0); i < dData.RowCount; i++ {
int64PkValue := dData.Pks[i].(*Int64PrimaryKey).Value
ts := dData.Tss[i]
err = eventWriter.AddOneStringToPayload(fmt.Sprintf("%d,%d,?", int64PkValue, ts))
assert.NoError(t, err)
}
eventWriter.SetEventTimestamp(100, 200)
binlogWriter.SetEventTimeStamp(100, 200)
binlogWriter.AddExtra(originalSizeKey, fmt.Sprintf("%v", 0))

err = binlogWriter.Finish()
assert.NoError(t, err)
buffer, err := binlogWriter.GetBuffer()
assert.NoError(t, err)
blob := &Blob{Value: buffer}

dCodec := NewDeleteCodec()
_, _, _, err = dCodec.Deserialize([]*Blob{blob})
assert.Error(t, err)
})

t.Run("with parse int error", func(t *testing.T) {
binlogWriter := NewDeleteBinlogWriter(schemapb.DataType_String, CollectionID, 1, 1)
eventWriter, err := binlogWriter.NextDeleteEventWriter()
assert.NoError(t, err)

dData := &DeleteData{
Pks: []PrimaryKey{&Int64PrimaryKey{Value: 1}, &Int64PrimaryKey{Value: 2}},
Tss: []Timestamp{100, 200},
RowCount: 2,
}

for i := int64(0); i < dData.RowCount; i++ {
ts := dData.Tss[i]
err = eventWriter.AddOneStringToPayload(fmt.Sprintf("abc,%d", ts))
assert.NoError(t, err)
}
eventWriter.SetEventTimestamp(100, 200)
binlogWriter.SetEventTimeStamp(100, 200)
binlogWriter.AddExtra(originalSizeKey, fmt.Sprintf("%v", 0))

err = binlogWriter.Finish()
assert.NoError(t, err)
buffer, err := binlogWriter.GetBuffer()
assert.NoError(t, err)
blob := &Blob{Value: buffer}

dCodec := NewDeleteCodec()
_, _, _, err = dCodec.Deserialize([]*Blob{blob})
assert.Error(t, err)
})

t.Run("with parse ts uint error", func(t *testing.T) {
binlogWriter := NewDeleteBinlogWriter(schemapb.DataType_String, CollectionID, 1, 1)
eventWriter, err := binlogWriter.NextDeleteEventWriter()
assert.NoError(t, err)

dData := &DeleteData{
Pks: []PrimaryKey{&Int64PrimaryKey{Value: 1}, &Int64PrimaryKey{Value: 2}},
Tss: []Timestamp{100, 200},
RowCount: 2,
}

for i := int64(0); i < dData.RowCount; i++ {
int64PkValue := dData.Pks[i].(*Int64PrimaryKey).Value
err = eventWriter.AddOneStringToPayload(fmt.Sprintf("%d,abc", int64PkValue))
assert.NoError(t, err)
}
eventWriter.SetEventTimestamp(100, 200)
binlogWriter.SetEventTimeStamp(100, 200)
binlogWriter.AddExtra(originalSizeKey, fmt.Sprintf("%v", 0))

err = binlogWriter.Finish()
assert.NoError(t, err)
buffer, err := binlogWriter.GetBuffer()
assert.NoError(t, err)
blob := &Blob{Value: buffer}

dCodec := NewDeleteCodec()
_, _, _, err = dCodec.Deserialize([]*Blob{blob})
assert.Error(t, err)
})
}

func TestDDCodec(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions internal/storage/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package storage

import (
"github.com/apache/arrow/go/v12/parquet"
"github.com/apache/arrow/go/v12/parquet/file"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
)

Expand Down Expand Up @@ -64,6 +67,9 @@ type PayloadReaderInterface interface {
GetBFloat16VectorFromPayload() ([]byte, int, error)
GetFloatVectorFromPayload() ([]float32, int, error)
GetPayloadLengthFromReader() (int, error)

GetByteArrayDataSet() (*DataSet[parquet.ByteArray, *file.ByteArrayColumnChunkReader], error)

ReleasePayloadReader() error
Close() error
}
74 changes: 74 additions & 0 deletions internal/storage/payload_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,14 @@ func (r *PayloadReader) GetStringFromPayload() ([]string, error) {
})
}

func (r *PayloadReader) GetByteArrayDataSet() (*DataSet[parquet.ByteArray, *file.ByteArrayColumnChunkReader], error) {
if r.colType != schemapb.DataType_String && r.colType != schemapb.DataType_VarChar {
return nil, fmt.Errorf("failed to get string from datatype %v", r.colType.String())
}

return NewDataSet[parquet.ByteArray, *file.ByteArrayColumnChunkReader](r.reader, 0, r.numRows), nil
}

func (r *PayloadReader) GetArrayFromPayload() ([]*schemapb.ScalarField, error) {
if r.colType != schemapb.DataType_Array {
return nil, fmt.Errorf("failed to get string from datatype %v", r.colType.String())
Expand Down Expand Up @@ -445,3 +453,69 @@ func ReadDataFromAllRowGroups[T any, E interface {

return offset, nil
}

type DataSet[T any, E interface {
ReadBatch(int64, []T, []int16, []int16) (int64, int, error)
}] struct {
reader *file.Reader
cReader E

cnt, numRows int64
groupID, columnIdx int
}

func NewDataSet[T any, E interface {
ReadBatch(int64, []T, []int16, []int16) (int64, int, error)
}](reader *file.Reader, columnIdx int, numRows int64) *DataSet[T, E] {
return &DataSet[T, E]{
reader: reader,
columnIdx: columnIdx,
numRows: numRows,
}
}

func (s *DataSet[T, E]) nextGroup() error {
s.cnt = 0
column, err := s.reader.RowGroup(s.groupID).Column(s.columnIdx)
if err != nil {
return err
}

cReader, ok := column.(E)
if !ok {
return fmt.Errorf("expect type %T, but got %T", *new(E), column)
}
s.groupID++
s.cReader = cReader
return nil
}

func (s *DataSet[T, E]) HasNext() bool {
if s.groupID > s.reader.NumRowGroups() || (s.groupID == s.reader.NumRowGroups() && s.cnt >= s.numRows) || s.numRows == 0 {
return false
}
return true
}

func (s *DataSet[T, E]) NextBatch(batch int64) ([]T, error) {
if s.groupID > s.reader.NumRowGroups() || (s.groupID == s.reader.NumRowGroups() && s.cnt >= s.numRows) || s.numRows == 0 {
return nil, fmt.Errorf("has no more data")
}

if s.groupID == 0 || s.cnt >= s.numRows {
err := s.nextGroup()
if err != nil {
return nil, err
}
}

batch = Min(batch, s.numRows-s.cnt)
result := make([]T, batch)
_, _, err := s.cReader.ReadBatch(batch, result, nil, nil)
if err != nil {
return nil, err
}

s.cnt += batch
return result, nil
}
Loading

0 comments on commit a053715

Please sign in to comment.