diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 1544a37efa49d..f4918d072d5be 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -1015,43 +1015,51 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID return InvalidUniqueID, InvalidUniqueID, nil, err } - stringArray, err := eventReader.GetStringFromPayload() + dataset, err := eventReader.GetByteArrayDataSet() if err != nil { eventReader.Close() binlogReader.Close() return InvalidUniqueID, InvalidUniqueID, nil, err } - for i := 0; i < len(stringArray); i++ { - deleteLog := &DeleteLog{} - if err = json.Unmarshal([]byte(stringArray[i]), deleteLog); err != nil { - // compatible with versions that only support int64 type primary keys - // compatible with fmt.Sprintf("%d,%d", pk, ts) - // compatible error info (unmarshal err invalid character ',' after top-level value) - splits := strings.Split(stringArray[i], ",") - if len(splits) != 2 { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("the format of delta log is incorrect, %v can not be split", stringArray[i]) - } - pk, err := strconv.ParseInt(splits[0], 10, 64) - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, nil, err - } - deleteLog.Pk = &Int64PrimaryKey{ - Value: pk, - } - deleteLog.PkType = int64(schemapb.DataType_Int64) - deleteLog.Ts, err = strconv.ParseUint(splits[1], 10, 64) - if err != nil { - eventReader.Close() - binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, nil, err - } + + batchSize := int64(1024) + for dataset.HasNext() { + stringArray, err := dataset.NextBatch(batchSize) + if err != nil { + return InvalidUniqueID, InvalidUniqueID, nil, err } + for i := 0; i < len(stringArray); i++ { + deleteLog := &DeleteLog{} + if err = json.Unmarshal(stringArray[i], deleteLog); err != nil { + // compatible with versions that only support int64 type primary keys + // compatible with fmt.Sprintf("%d,%d", pk, ts) + // compatible error info (unmarshal err invalid character ',' after top-level value) + splits := strings.Split(stringArray[i].String(), ",") + if len(splits) != 2 { + eventReader.Close() + binlogReader.Close() + return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("the format of delta log is incorrect, %v can not be split", stringArray[i]) + } + pk, err := strconv.ParseInt(splits[0], 10, 64) + if err != nil { + eventReader.Close() + binlogReader.Close() + return InvalidUniqueID, InvalidUniqueID, nil, err + } + deleteLog.Pk = &Int64PrimaryKey{ + Value: pk, + } + deleteLog.PkType = int64(schemapb.DataType_Int64) + deleteLog.Ts, err = strconv.ParseUint(splits[1], 10, 64) + if err != nil { + eventReader.Close() + binlogReader.Close() + return InvalidUniqueID, InvalidUniqueID, nil, err + } + } - result.Append(deleteLog.Pk, deleteLog.Ts) + result.Append(deleteLog.Pk, deleteLog.Ts) + } } eventReader.Close() binlogReader.Close() diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index d9e7114ba3e09..8c6468f994627 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -494,42 +494,138 @@ func TestDeleteCodec(t *testing.T) { } func TestUpgradeDeleteLog(t *testing.T) { - binlogWriter := NewDeleteBinlogWriter(schemapb.DataType_String, CollectionID, 1, 1) - eventWriter, err := binlogWriter.NextDeleteEventWriter() - assert.NoError(t, err) + t.Run("normal", func(t *testing.T) { + binlogWriter := NewDeleteBinlogWriter(schemapb.DataType_String, CollectionID, 1, 1) + eventWriter, err := binlogWriter.NextDeleteEventWriter() + assert.NoError(t, err) - dData := &DeleteData{ - Pks: []PrimaryKey{&Int64PrimaryKey{Value: 1}, &Int64PrimaryKey{Value: 2}}, - Tss: []Timestamp{100, 200}, - RowCount: 2, - } + dData := &DeleteData{ + Pks: []PrimaryKey{&Int64PrimaryKey{Value: 1}, &Int64PrimaryKey{Value: 2}}, + Tss: []Timestamp{100, 200}, + RowCount: 2, + } + + sizeTotal := 0 + for i := int64(0); i < dData.RowCount; i++ { + int64PkValue := dData.Pks[i].(*Int64PrimaryKey).Value + ts := dData.Tss[i] + err = eventWriter.AddOneStringToPayload(fmt.Sprintf("%d,%d", int64PkValue, ts)) + assert.NoError(t, err) + sizeTotal += binary.Size(int64PkValue) + sizeTotal += binary.Size(ts) + } + eventWriter.SetEventTimestamp(100, 200) + binlogWriter.SetEventTimeStamp(100, 200) + binlogWriter.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal)) - sizeTotal := 0 - for i := int64(0); i < dData.RowCount; i++ { - int64PkValue := dData.Pks[i].(*Int64PrimaryKey).Value - ts := dData.Tss[i] - err = eventWriter.AddOneStringToPayload(fmt.Sprintf("%d,%d", int64PkValue, ts)) + err = binlogWriter.Finish() assert.NoError(t, err) - sizeTotal += binary.Size(int64PkValue) - sizeTotal += binary.Size(ts) - } - eventWriter.SetEventTimestamp(100, 200) - binlogWriter.SetEventTimeStamp(100, 200) - binlogWriter.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal)) + buffer, err := binlogWriter.GetBuffer() + assert.NoError(t, err) + blob := &Blob{Value: buffer} - err = binlogWriter.Finish() - assert.NoError(t, err) - buffer, err := binlogWriter.GetBuffer() - assert.NoError(t, err) - blob := &Blob{Value: buffer} + dCodec := NewDeleteCodec() + parID, segID, deleteData, err := dCodec.Deserialize([]*Blob{blob}) + assert.NoError(t, err) + assert.Equal(t, int64(1), parID) + assert.Equal(t, int64(1), segID) + assert.ElementsMatch(t, dData.Pks, deleteData.Pks) + assert.ElementsMatch(t, dData.Tss, deleteData.Tss) + }) - dCodec := NewDeleteCodec() - parID, segID, deleteData, err := dCodec.Deserialize([]*Blob{blob}) - assert.NoError(t, err) - assert.Equal(t, int64(1), parID) - assert.Equal(t, int64(1), segID) - assert.ElementsMatch(t, dData.Pks, deleteData.Pks) - assert.ElementsMatch(t, dData.Tss, deleteData.Tss) + t.Run("with split lenth error", func(t *testing.T) { + binlogWriter := NewDeleteBinlogWriter(schemapb.DataType_String, CollectionID, 1, 1) + eventWriter, err := binlogWriter.NextDeleteEventWriter() + assert.NoError(t, err) + + dData := &DeleteData{ + Pks: []PrimaryKey{&Int64PrimaryKey{Value: 1}, &Int64PrimaryKey{Value: 2}}, + Tss: []Timestamp{100, 200}, + RowCount: 2, + } + + for i := int64(0); i < dData.RowCount; i++ { + int64PkValue := dData.Pks[i].(*Int64PrimaryKey).Value + ts := dData.Tss[i] + err = eventWriter.AddOneStringToPayload(fmt.Sprintf("%d,%d,?", int64PkValue, ts)) + assert.NoError(t, err) + } + eventWriter.SetEventTimestamp(100, 200) + binlogWriter.SetEventTimeStamp(100, 200) + binlogWriter.AddExtra(originalSizeKey, fmt.Sprintf("%v", 0)) + + err = binlogWriter.Finish() + assert.NoError(t, err) + buffer, err := binlogWriter.GetBuffer() + assert.NoError(t, err) + blob := &Blob{Value: buffer} + + dCodec := NewDeleteCodec() + _, _, _, err = dCodec.Deserialize([]*Blob{blob}) + assert.Error(t, err) + }) + + t.Run("with parse int error", func(t *testing.T) { + binlogWriter := NewDeleteBinlogWriter(schemapb.DataType_String, CollectionID, 1, 1) + eventWriter, err := binlogWriter.NextDeleteEventWriter() + assert.NoError(t, err) + + dData := &DeleteData{ + Pks: []PrimaryKey{&Int64PrimaryKey{Value: 1}, &Int64PrimaryKey{Value: 2}}, + Tss: []Timestamp{100, 200}, + RowCount: 2, + } + + for i := int64(0); i < dData.RowCount; i++ { + ts := dData.Tss[i] + err = eventWriter.AddOneStringToPayload(fmt.Sprintf("abc,%d", ts)) + assert.NoError(t, err) + } + eventWriter.SetEventTimestamp(100, 200) + binlogWriter.SetEventTimeStamp(100, 200) + binlogWriter.AddExtra(originalSizeKey, fmt.Sprintf("%v", 0)) + + err = binlogWriter.Finish() + assert.NoError(t, err) + buffer, err := binlogWriter.GetBuffer() + assert.NoError(t, err) + blob := &Blob{Value: buffer} + + dCodec := NewDeleteCodec() + _, _, _, err = dCodec.Deserialize([]*Blob{blob}) + assert.Error(t, err) + }) + + t.Run("with parse ts uint error", func(t *testing.T) { + binlogWriter := NewDeleteBinlogWriter(schemapb.DataType_String, CollectionID, 1, 1) + eventWriter, err := binlogWriter.NextDeleteEventWriter() + assert.NoError(t, err) + + dData := &DeleteData{ + Pks: []PrimaryKey{&Int64PrimaryKey{Value: 1}, &Int64PrimaryKey{Value: 2}}, + Tss: []Timestamp{100, 200}, + RowCount: 2, + } + + for i := int64(0); i < dData.RowCount; i++ { + int64PkValue := dData.Pks[i].(*Int64PrimaryKey).Value + err = eventWriter.AddOneStringToPayload(fmt.Sprintf("%d,abc", int64PkValue)) + assert.NoError(t, err) + } + eventWriter.SetEventTimestamp(100, 200) + binlogWriter.SetEventTimeStamp(100, 200) + binlogWriter.AddExtra(originalSizeKey, fmt.Sprintf("%v", 0)) + + err = binlogWriter.Finish() + assert.NoError(t, err) + buffer, err := binlogWriter.GetBuffer() + assert.NoError(t, err) + blob := &Blob{Value: buffer} + + dCodec := NewDeleteCodec() + _, _, _, err = dCodec.Deserialize([]*Blob{blob}) + assert.Error(t, err) + }) } func TestDDCodec(t *testing.T) { diff --git a/internal/storage/payload.go b/internal/storage/payload.go index 07fbc952a8892..d4fc137cbd4c4 100644 --- a/internal/storage/payload.go +++ b/internal/storage/payload.go @@ -17,6 +17,9 @@ package storage import ( + "github.com/apache/arrow/go/v12/parquet" + "github.com/apache/arrow/go/v12/parquet/file" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" ) @@ -64,6 +67,9 @@ type PayloadReaderInterface interface { GetBFloat16VectorFromPayload() ([]byte, int, error) GetFloatVectorFromPayload() ([]float32, int, error) GetPayloadLengthFromReader() (int, error) + + GetByteArrayDataSet() (*DataSet[parquet.ByteArray, *file.ByteArrayColumnChunkReader], error) + ReleasePayloadReader() error Close() error } diff --git a/internal/storage/payload_reader.go b/internal/storage/payload_reader.go index 2b935cbe32636..6f6f185d40ba1 100644 --- a/internal/storage/payload_reader.go +++ b/internal/storage/payload_reader.go @@ -255,6 +255,14 @@ func (r *PayloadReader) GetStringFromPayload() ([]string, error) { }) } +func (r *PayloadReader) GetByteArrayDataSet() (*DataSet[parquet.ByteArray, *file.ByteArrayColumnChunkReader], error) { + if r.colType != schemapb.DataType_String && r.colType != schemapb.DataType_VarChar { + return nil, fmt.Errorf("failed to get string from datatype %v", r.colType.String()) + } + + return NewDataSet[parquet.ByteArray, *file.ByteArrayColumnChunkReader](r.reader, 0, r.numRows), nil +} + func (r *PayloadReader) GetArrayFromPayload() ([]*schemapb.ScalarField, error) { if r.colType != schemapb.DataType_Array { return nil, fmt.Errorf("failed to get string from datatype %v", r.colType.String()) @@ -445,3 +453,69 @@ func ReadDataFromAllRowGroups[T any, E interface { return offset, nil } + +type DataSet[T any, E interface { + ReadBatch(int64, []T, []int16, []int16) (int64, int, error) +}] struct { + reader *file.Reader + cReader E + + cnt, numRows int64 + groupID, columnIdx int +} + +func NewDataSet[T any, E interface { + ReadBatch(int64, []T, []int16, []int16) (int64, int, error) +}](reader *file.Reader, columnIdx int, numRows int64) *DataSet[T, E] { + return &DataSet[T, E]{ + reader: reader, + columnIdx: columnIdx, + numRows: numRows, + } +} + +func (s *DataSet[T, E]) nextGroup() error { + s.cnt = 0 + column, err := s.reader.RowGroup(s.groupID).Column(s.columnIdx) + if err != nil { + return err + } + + cReader, ok := column.(E) + if !ok { + return fmt.Errorf("expect type %T, but got %T", *new(E), column) + } + s.groupID++ + s.cReader = cReader + return nil +} + +func (s *DataSet[T, E]) HasNext() bool { + if s.groupID > s.reader.NumRowGroups() || (s.groupID == s.reader.NumRowGroups() && s.cnt >= s.numRows) || s.numRows == 0 { + return false + } + return true +} + +func (s *DataSet[T, E]) NextBatch(batch int64) ([]T, error) { + if s.groupID > s.reader.NumRowGroups() || (s.groupID == s.reader.NumRowGroups() && s.cnt >= s.numRows) || s.numRows == 0 { + return nil, fmt.Errorf("has no more data") + } + + if s.groupID == 0 || s.cnt >= s.numRows { + err := s.nextGroup() + if err != nil { + return nil, err + } + } + + batch = Min(batch, s.numRows-s.cnt) + result := make([]T, batch) + _, _, err := s.cReader.ReadBatch(batch, result, nil, nil) + if err != nil { + return nil, err + } + + s.cnt += batch + return result, nil +} diff --git a/internal/storage/payload_test.go b/internal/storage/payload_test.go index bbf370dfefb7f..b49058545f4a5 100644 --- a/internal/storage/payload_test.go +++ b/internal/storage/payload_test.go @@ -17,6 +17,7 @@ package storage import ( + "math" "testing" "github.com/stretchr/testify/assert" @@ -1328,6 +1329,39 @@ func TestPayload_ReaderAndWriter(t *testing.T) { assert.Error(t, err) }) + t.Run("TestByteArrayDatasetError", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_String) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddOneStringToPayload("hello0") + assert.NoError(t, err) + + err = w.FinishPayloadWriter() + assert.NoError(t, err) + + buffer, err := w.GetPayloadBufferFromWriter() + assert.NoError(t, err) + + r, err := NewPayloadReader(schemapb.DataType_FloatVector, buffer) + assert.NoError(t, err) + + r.colType = 99 + _, err = r.GetByteArrayDataSet() + assert.Error(t, err) + + r.colType = schemapb.DataType_String + dataset, err := r.GetByteArrayDataSet() + assert.NoError(t, err) + + dataset.columnIdx = math.MaxInt + _, err = dataset.NextBatch(100) + assert.Error(t, err) + + dataset.groupID = math.MaxInt + assert.Error(t, err) + }) + t.Run("TestWriteLargeSizeData", func(t *testing.T) { t.Skip("Large data skip for online ut") size := 1 << 29 // 512M diff --git a/internal/storage/utils.go b/internal/storage/utils.go index 7c9171e3aa99f..0000642ff5d46 100644 --- a/internal/storage/utils.go +++ b/internal/storage/utils.go @@ -1211,3 +1211,10 @@ func TransferInsertMsgToInsertRecord(schema *schemapb.CollectionSchema, msg *msg return insertRecord, nil } + +func Min(a, b int64) int64 { + if a < b { + return a + } + return b +}