From dc89c6f8105f7fe120a0a3d2d03897984b1f8c7e Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Fri, 10 May 2024 15:27:31 +0800 Subject: [PATCH] enhance: remove duplicated data generation APIs for bulk insert test (#32889) Issue: #22837 including following changes: 1. Add API CreateInsertData() and BuildArrayData() in internal/util/testutil 2. Remove duplicated test APIs from importutilv2 unittest and bulk insert integration test Signed-off-by: Cai Yudong --- .../util/importutilv2/binlog/reader_test.go | 176 +---- .../util/importutilv2/json/reader_test.go | 207 ++---- .../util/importutilv2/numpy/reader_test.go | 141 +--- .../util/importutilv2/parquet/reader_test.go | 441 ++----------- internal/util/testutil/test_util.go | 495 +++++++++++++++ tests/integration/import/util_test.go | 600 ++++-------------- 6 files changed, 730 insertions(+), 1330 deletions(-) diff --git a/internal/util/importutilv2/binlog/reader_test.go b/internal/util/importutilv2/binlog/reader_test.go index 63e8dd42d6e9c..73ab65dde208d 100644 --- a/internal/util/importutilv2/binlog/reader_test.go +++ b/internal/util/importutilv2/binlog/reader_test.go @@ -18,11 +18,8 @@ package binlog import ( "context" - rand2 "crypto/rand" "fmt" "math" - "math/rand" - "strconv" "testing" "time" @@ -36,9 +33,9 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/testutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/testutils" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -179,124 +176,7 @@ func createDeltaBuf(t *testing.T, deletePKs []storage.PrimaryKey, deleteTss []in return blob.Value } -func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount int) *storage.InsertData { - insertData, err := storage.NewInsertData(schema) - assert.NoError(t, err) - for _, field := range schema.GetFields() { - switch field.GetDataType() { - case schemapb.DataType_Bool: - boolData := make([]bool, 0) - for i := 0; i < rowCount; i++ { - boolData = append(boolData, i%3 != 0) - } - insertData.Data[field.GetFieldID()] = &storage.BoolFieldData{Data: boolData} - case schemapb.DataType_Float: - floatData := make([]float32, 0) - for i := 0; i < rowCount; i++ { - floatData = append(floatData, float32(i/2)) - } - insertData.Data[field.GetFieldID()] = &storage.FloatFieldData{Data: floatData} - case schemapb.DataType_Double: - doubleData := make([]float64, 0) - for i := 0; i < rowCount; i++ { - doubleData = append(doubleData, float64(i/5)) - } - insertData.Data[field.GetFieldID()] = &storage.DoubleFieldData{Data: doubleData} - case schemapb.DataType_Int8: - int8Data := make([]int8, 0) - for i := 0; i < rowCount; i++ { - int8Data = append(int8Data, int8(i%256)) - } - insertData.Data[field.GetFieldID()] = &storage.Int8FieldData{Data: int8Data} - case schemapb.DataType_Int16: - int16Data := make([]int16, 0) - for i := 0; i < rowCount; i++ { - int16Data = append(int16Data, int16(i%65536)) - } - insertData.Data[field.GetFieldID()] = &storage.Int16FieldData{Data: int16Data} - case schemapb.DataType_Int32: - int32Data := make([]int32, 0) - for i := 0; i < rowCount; i++ { - int32Data = append(int32Data, int32(i%1000)) - } - insertData.Data[field.GetFieldID()] = &storage.Int32FieldData{Data: int32Data} - case schemapb.DataType_Int64: - int64Data := make([]int64, 0) - for i := 0; i < rowCount; i++ { - int64Data = append(int64Data, int64(i)) - } - insertData.Data[field.GetFieldID()] = &storage.Int64FieldData{Data: int64Data} - case schemapb.DataType_BinaryVector: - dim, err := typeutil.GetDim(field) - assert.NoError(t, err) - binVecData := make([]byte, 0) - total := rowCount * int(dim) / 8 - for i := 0; i < total; i++ { - binVecData = append(binVecData, byte(i%256)) - } - insertData.Data[field.GetFieldID()] = &storage.BinaryVectorFieldData{Data: binVecData, Dim: int(dim)} - case schemapb.DataType_FloatVector: - dim, err := typeutil.GetDim(field) - assert.NoError(t, err) - floatVecData := make([]float32, 0) - total := rowCount * int(dim) - for i := 0; i < total; i++ { - floatVecData = append(floatVecData, rand.Float32()) - } - insertData.Data[field.GetFieldID()] = &storage.FloatVectorFieldData{Data: floatVecData, Dim: int(dim)} - case schemapb.DataType_Float16Vector: - dim, err := typeutil.GetDim(field) - assert.NoError(t, err) - total := int64(rowCount) * dim * 2 - float16VecData := make([]byte, total) - _, err = rand2.Read(float16VecData) - assert.NoError(t, err) - insertData.Data[field.GetFieldID()] = &storage.Float16VectorFieldData{Data: float16VecData, Dim: int(dim)} - case schemapb.DataType_BFloat16Vector: - dim, err := typeutil.GetDim(field) - assert.NoError(t, err) - total := int64(rowCount) * dim * 2 - bfloat16VecData := make([]byte, total) - _, err = rand2.Read(bfloat16VecData) - assert.NoError(t, err) - insertData.Data[field.GetFieldID()] = &storage.BFloat16VectorFieldData{Data: bfloat16VecData, Dim: int(dim)} - case schemapb.DataType_SparseFloatVector: - sparseFloatVecData := testutils.GenerateSparseFloatVectors(rowCount) - insertData.Data[field.GetFieldID()] = &storage.SparseFloatVectorFieldData{ - SparseFloatArray: *sparseFloatVecData, - } - case schemapb.DataType_String, schemapb.DataType_VarChar: - varcharData := make([]string, 0) - for i := 0; i < rowCount; i++ { - varcharData = append(varcharData, strconv.Itoa(i)) - } - insertData.Data[field.GetFieldID()] = &storage.StringFieldData{Data: varcharData} - case schemapb.DataType_JSON: - jsonData := make([][]byte, 0) - for i := 0; i < rowCount; i++ { - jsonData = append(jsonData, []byte(fmt.Sprintf("{\"y\": %d}", i))) - } - insertData.Data[field.GetFieldID()] = &storage.JSONFieldData{Data: jsonData} - case schemapb.DataType_Array: - arrayData := make([]*schemapb.ScalarField, 0) - for i := 0; i < rowCount; i++ { - arrayData = append(arrayData, &schemapb.ScalarField{ - Data: &schemapb.ScalarField_IntData{ - IntData: &schemapb.IntArray{ - Data: []int32{int32(i), int32(i + 1), int32(i + 2)}, - }, - }, - }) - } - insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} - default: - panic(fmt.Sprintf("unexpected data type: %s", field.GetDataType().String())) - } - } - return insertData -} - -func (suite *ReaderSuite) run(dt schemapb.DataType) { +func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType) { const ( insertPrefix = "mock-insert-binlog-prefix" deltaPrefix = "mock-delta-binlog-prefix" @@ -344,16 +224,18 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) { }, }, { - FieldID: 102, - Name: dt.String(), - DataType: dt, + FieldID: 102, + Name: dataType.String(), + DataType: dataType, + ElementType: elemType, }, }, } cm := mocks.NewChunkManager(suite.T()) schema = typeutil.AppendSystemFields(schema) - originalInsertData := createInsertData(suite.T(), schema, suite.numRows) + originalInsertData, err := testutil.CreateInsertData(schema, suite.numRows) + suite.NoError(err) insertLogs := lo.Flatten(lo.Values(insertBinlogs)) cm.EXPECT().WalkWithPrefix(mock.Anything, insertPrefix, mock.Anything, mock.Anything).RunAndReturn( @@ -435,16 +317,24 @@ OUTER: } func (suite *ReaderSuite) TestReadScalarFields() { - suite.run(schemapb.DataType_Bool) - suite.run(schemapb.DataType_Int8) - suite.run(schemapb.DataType_Int16) - suite.run(schemapb.DataType_Int32) - suite.run(schemapb.DataType_Int64) - suite.run(schemapb.DataType_Float) - suite.run(schemapb.DataType_Double) - suite.run(schemapb.DataType_VarChar) - suite.run(schemapb.DataType_Array) - suite.run(schemapb.DataType_JSON) + suite.run(schemapb.DataType_Bool, schemapb.DataType_None) + suite.run(schemapb.DataType_Int8, schemapb.DataType_None) + suite.run(schemapb.DataType_Int16, schemapb.DataType_None) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None) + suite.run(schemapb.DataType_Int64, schemapb.DataType_None) + suite.run(schemapb.DataType_Float, schemapb.DataType_None) + suite.run(schemapb.DataType_Double, schemapb.DataType_None) + suite.run(schemapb.DataType_VarChar, schemapb.DataType_None) + suite.run(schemapb.DataType_JSON, schemapb.DataType_None) + + suite.run(schemapb.DataType_Array, schemapb.DataType_Bool) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int8) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int16) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int32) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int64) + suite.run(schemapb.DataType_Array, schemapb.DataType_Float) + suite.run(schemapb.DataType_Array, schemapb.DataType_Double) + suite.run(schemapb.DataType_Array, schemapb.DataType_String) } func (suite *ReaderSuite) TestWithTSRangeAndDelete() { @@ -460,7 +350,7 @@ func (suite *ReaderSuite) TestWithTSRangeAndDelete() { suite.deleteTss = []int64{ 8, 8, 1, 8, } - suite.run(schemapb.DataType_Int32) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None) } func (suite *ReaderSuite) TestStringPK() { @@ -477,20 +367,20 @@ func (suite *ReaderSuite) TestStringPK() { suite.deleteTss = []int64{ 8, 8, 1, 8, } - suite.run(schemapb.DataType_Int32) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None) } func (suite *ReaderSuite) TestVector() { suite.vecDataType = schemapb.DataType_BinaryVector - suite.run(schemapb.DataType_Int32) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None) suite.vecDataType = schemapb.DataType_FloatVector - suite.run(schemapb.DataType_Int32) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None) suite.vecDataType = schemapb.DataType_Float16Vector - suite.run(schemapb.DataType_Int32) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None) suite.vecDataType = schemapb.DataType_BFloat16Vector - suite.run(schemapb.DataType_Int32) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None) suite.vecDataType = schemapb.DataType_SparseFloatVector - suite.run(schemapb.DataType_Int32) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None) } func TestUtil(t *testing.T) { diff --git a/internal/util/importutilv2/json/reader_test.go b/internal/util/importutilv2/json/reader_test.go index 494f6eb5aafff..b30de4e5b592a 100644 --- a/internal/util/importutilv2/json/reader_test.go +++ b/internal/util/importutilv2/json/reader_test.go @@ -18,18 +18,13 @@ package json import ( "context" - rand2 "crypto/rand" "encoding/json" - "fmt" "io" "math" - "math/rand" - "strconv" "strings" "testing" "github.com/samber/lo" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "golang.org/x/exp/slices" @@ -38,9 +33,9 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/testutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/testutils" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -63,136 +58,7 @@ func (suite *ReaderSuite) SetupTest() { suite.vecDataType = schemapb.DataType_FloatVector } -func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount int) *storage.InsertData { - insertData, err := storage.NewInsertData(schema) - assert.NoError(t, err) - for _, field := range schema.GetFields() { - switch field.GetDataType() { - case schemapb.DataType_Bool: - boolData := make([]bool, 0) - for i := 0; i < rowCount; i++ { - boolData = append(boolData, i%3 != 0) - } - insertData.Data[field.GetFieldID()] = &storage.BoolFieldData{Data: boolData} - case schemapb.DataType_Float: - floatData := make([]float32, 0) - for i := 0; i < rowCount; i++ { - floatData = append(floatData, float32(i/2)) - } - insertData.Data[field.GetFieldID()] = &storage.FloatFieldData{Data: floatData} - case schemapb.DataType_Double: - doubleData := make([]float64, 0) - for i := 0; i < rowCount; i++ { - doubleData = append(doubleData, float64(i/5)) - } - insertData.Data[field.GetFieldID()] = &storage.DoubleFieldData{Data: doubleData} - case schemapb.DataType_Int8: - int8Data := make([]int8, 0) - for i := 0; i < rowCount; i++ { - int8Data = append(int8Data, int8(i%256)) - } - insertData.Data[field.GetFieldID()] = &storage.Int8FieldData{Data: int8Data} - case schemapb.DataType_Int16: - int16Data := make([]int16, 0) - for i := 0; i < rowCount; i++ { - int16Data = append(int16Data, int16(i%65536)) - } - insertData.Data[field.GetFieldID()] = &storage.Int16FieldData{Data: int16Data} - case schemapb.DataType_Int32: - int32Data := make([]int32, 0) - for i := 0; i < rowCount; i++ { - int32Data = append(int32Data, int32(i%1000)) - } - insertData.Data[field.GetFieldID()] = &storage.Int32FieldData{Data: int32Data} - case schemapb.DataType_Int64: - int64Data := make([]int64, 0) - for i := 0; i < rowCount; i++ { - int64Data = append(int64Data, int64(i)) - } - insertData.Data[field.GetFieldID()] = &storage.Int64FieldData{Data: int64Data} - case schemapb.DataType_BinaryVector: - dim, err := typeutil.GetDim(field) - assert.NoError(t, err) - binVecData := make([]byte, 0) - total := rowCount * int(dim) / 8 - for i := 0; i < total; i++ { - binVecData = append(binVecData, byte(i%256)) - } - insertData.Data[field.GetFieldID()] = &storage.BinaryVectorFieldData{Data: binVecData, Dim: int(dim)} - case schemapb.DataType_FloatVector: - dim, err := typeutil.GetDim(field) - assert.NoError(t, err) - floatVecData := make([]float32, 0) - total := rowCount * int(dim) - for i := 0; i < total; i++ { - floatVecData = append(floatVecData, rand.Float32()) - } - insertData.Data[field.GetFieldID()] = &storage.FloatVectorFieldData{Data: floatVecData, Dim: int(dim)} - case schemapb.DataType_Float16Vector: - dim, err := typeutil.GetDim(field) - assert.NoError(t, err) - total := int64(rowCount) * dim * 2 - float16VecData := make([]byte, total) - _, err = rand2.Read(float16VecData) - assert.NoError(t, err) - insertData.Data[field.GetFieldID()] = &storage.Float16VectorFieldData{Data: float16VecData, Dim: int(dim)} - case schemapb.DataType_BFloat16Vector: - dim, err := typeutil.GetDim(field) - assert.NoError(t, err) - total := int64(rowCount) * dim * 2 - bfloat16VecData := make([]byte, total) - _, err = rand2.Read(bfloat16VecData) - assert.NoError(t, err) - insertData.Data[field.GetFieldID()] = &storage.BFloat16VectorFieldData{Data: bfloat16VecData, Dim: int(dim)} - case schemapb.DataType_SparseFloatVector: - sparseFloatVecData := testutils.GenerateSparseFloatVectors(rowCount) - insertData.Data[field.GetFieldID()] = &storage.SparseFloatVectorFieldData{ - SparseFloatArray: *sparseFloatVecData, - } - case schemapb.DataType_String, schemapb.DataType_VarChar: - varcharData := make([]string, 0) - for i := 0; i < rowCount; i++ { - varcharData = append(varcharData, strconv.Itoa(i)) - } - insertData.Data[field.GetFieldID()] = &storage.StringFieldData{Data: varcharData} - case schemapb.DataType_JSON: - jsonData := make([][]byte, 0) - for i := 0; i < rowCount; i++ { - if i%4 == 0 { - v, _ := json.Marshal("{\"a\": \"%s\", \"b\": %d}") - jsonData = append(jsonData, v) - } else if i%4 == 1 { - v, _ := json.Marshal(i) - jsonData = append(jsonData, v) - } else if i%4 == 2 { - v, _ := json.Marshal(float32(i) * 0.1) - jsonData = append(jsonData, v) - } else if i%4 == 3 { - v, _ := json.Marshal(strconv.Itoa(i)) - jsonData = append(jsonData, v) - } - } - insertData.Data[field.GetFieldID()] = &storage.JSONFieldData{Data: jsonData} - case schemapb.DataType_Array: - arrayData := make([]*schemapb.ScalarField, 0) - for i := 0; i < rowCount; i++ { - arrayData = append(arrayData, &schemapb.ScalarField{ - Data: &schemapb.ScalarField_IntData{ - IntData: &schemapb.IntArray{ - Data: []int32{int32(i), int32(i + 1), int32(i + 2)}, - }, - }, - }) - } - insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} - default: - panic(fmt.Sprintf("unexpected data type: %s", field.GetDataType().String())) - } - } - return insertData -} - -func (suite *ReaderSuite) run(dt schemapb.DataType) { +func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType) { schema := &schemapb.CollectionSchema{ Fields: []*schemapb.FieldSchema{ { @@ -220,9 +86,9 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) { }, { FieldID: 102, - Name: dt.String(), - DataType: dt, - ElementType: schemapb.DataType_Int32, + Name: dataType.String(), + DataType: dataType, + ElementType: elemType, TypeParams: []*commonpb.KeyValuePair{ { Key: common.MaxLengthKey, @@ -232,7 +98,8 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) { }, }, } - insertData := createInsertData(suite.T(), schema, suite.numRows) + insertData, err := testutil.CreateInsertData(schema, suite.numRows) + suite.NoError(err) rows := make([]map[string]any, 0, suite.numRows) fieldIDToField := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 { return field.GetFieldID() @@ -240,10 +107,25 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) { for i := 0; i < insertData.GetRowNum(); i++ { data := make(map[int64]interface{}) for fieldID, v := range insertData.Data { - dataType := fieldIDToField[fieldID].GetDataType() + field := fieldIDToField[fieldID] + dataType := field.GetDataType() + elemType := field.GetElementType() switch dataType { case schemapb.DataType_Array: - data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetIntData().GetData() + switch elemType { + case schemapb.DataType_Bool: + data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetBoolData().GetData() + case schemapb.DataType_Int8, schemapb.DataType_Int16, schemapb.DataType_Int32: + data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetIntData().GetData() + case schemapb.DataType_Int64: + data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetLongData().GetData() + case schemapb.DataType_Float: + data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetFloatData().GetData() + case schemapb.DataType_Double: + data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetDoubleData().GetData() + case schemapb.DataType_String: + data[fieldID] = v.GetRow(i).(*schemapb.ScalarField).GetStringData().GetData() + } case schemapb.DataType_JSON: data[fieldID] = string(v.GetRow(i).([]byte)) case schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_SparseFloatVector: @@ -302,34 +184,43 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) { } func (suite *ReaderSuite) TestReadScalarFields() { - suite.run(schemapb.DataType_Bool) - suite.run(schemapb.DataType_Int8) - suite.run(schemapb.DataType_Int16) - suite.run(schemapb.DataType_Int32) - suite.run(schemapb.DataType_Int64) - suite.run(schemapb.DataType_Float) - suite.run(schemapb.DataType_Double) - suite.run(schemapb.DataType_VarChar) - suite.run(schemapb.DataType_Array) - suite.run(schemapb.DataType_JSON) + suite.run(schemapb.DataType_Bool, schemapb.DataType_None) + suite.run(schemapb.DataType_Int8, schemapb.DataType_None) + suite.run(schemapb.DataType_Int16, schemapb.DataType_None) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None) + suite.run(schemapb.DataType_Int64, schemapb.DataType_None) + suite.run(schemapb.DataType_Float, schemapb.DataType_None) + suite.run(schemapb.DataType_Double, schemapb.DataType_None) + suite.run(schemapb.DataType_String, schemapb.DataType_None) + suite.run(schemapb.DataType_VarChar, schemapb.DataType_None) + suite.run(schemapb.DataType_JSON, schemapb.DataType_None) + + suite.run(schemapb.DataType_Array, schemapb.DataType_Bool) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int8) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int16) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int32) + suite.run(schemapb.DataType_Array, schemapb.DataType_Int64) + suite.run(schemapb.DataType_Array, schemapb.DataType_Float) + suite.run(schemapb.DataType_Array, schemapb.DataType_Double) + suite.run(schemapb.DataType_Array, schemapb.DataType_String) } func (suite *ReaderSuite) TestStringPK() { suite.pkDataType = schemapb.DataType_VarChar - suite.run(schemapb.DataType_Int32) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None) } func (suite *ReaderSuite) TestVector() { suite.vecDataType = schemapb.DataType_BinaryVector - suite.run(schemapb.DataType_Int32) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None) suite.vecDataType = schemapb.DataType_FloatVector - suite.run(schemapb.DataType_Int32) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None) suite.vecDataType = schemapb.DataType_Float16Vector - suite.run(schemapb.DataType_Int32) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None) suite.vecDataType = schemapb.DataType_BFloat16Vector - suite.run(schemapb.DataType_Int32) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None) suite.vecDataType = schemapb.DataType_SparseFloatVector - suite.run(schemapb.DataType_Int32) + suite.run(schemapb.DataType_Int32, schemapb.DataType_None) } func TestUtil(t *testing.T) { diff --git a/internal/util/importutilv2/numpy/reader_test.go b/internal/util/importutilv2/numpy/reader_test.go index ad9b12d18b547..f94abb6b1a9ef 100644 --- a/internal/util/importutilv2/numpy/reader_test.go +++ b/internal/util/importutilv2/numpy/reader_test.go @@ -19,13 +19,9 @@ package numpy import ( "bytes" "context" - rand2 "crypto/rand" - "encoding/json" "fmt" "io" "math" - "math/rand" - "strconv" "strings" "testing" @@ -40,9 +36,9 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/testutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/testutils" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -65,135 +61,6 @@ func (suite *ReaderSuite) SetupTest() { suite.vecDataType = schemapb.DataType_FloatVector } -func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount int) *storage.InsertData { - insertData, err := storage.NewInsertData(schema) - assert.NoError(t, err) - for _, field := range schema.GetFields() { - switch field.GetDataType() { - case schemapb.DataType_Bool: - boolData := make([]bool, 0) - for i := 0; i < rowCount; i++ { - boolData = append(boolData, i%3 != 0) - } - insertData.Data[field.GetFieldID()] = &storage.BoolFieldData{Data: boolData} - case schemapb.DataType_Float: - floatData := make([]float32, 0) - for i := 0; i < rowCount; i++ { - floatData = append(floatData, float32(i/2)) - } - insertData.Data[field.GetFieldID()] = &storage.FloatFieldData{Data: floatData} - case schemapb.DataType_Double: - doubleData := make([]float64, 0) - for i := 0; i < rowCount; i++ { - doubleData = append(doubleData, float64(i/5)) - } - insertData.Data[field.GetFieldID()] = &storage.DoubleFieldData{Data: doubleData} - case schemapb.DataType_Int8: - int8Data := make([]int8, 0) - for i := 0; i < rowCount; i++ { - int8Data = append(int8Data, int8(i%256)) - } - insertData.Data[field.GetFieldID()] = &storage.Int8FieldData{Data: int8Data} - case schemapb.DataType_Int16: - int16Data := make([]int16, 0) - for i := 0; i < rowCount; i++ { - int16Data = append(int16Data, int16(i%65536)) - } - insertData.Data[field.GetFieldID()] = &storage.Int16FieldData{Data: int16Data} - case schemapb.DataType_Int32: - int32Data := make([]int32, 0) - for i := 0; i < rowCount; i++ { - int32Data = append(int32Data, int32(i%1000)) - } - insertData.Data[field.GetFieldID()] = &storage.Int32FieldData{Data: int32Data} - case schemapb.DataType_Int64: - int64Data := make([]int64, 0) - for i := 0; i < rowCount; i++ { - int64Data = append(int64Data, int64(i)) - } - insertData.Data[field.GetFieldID()] = &storage.Int64FieldData{Data: int64Data} - case schemapb.DataType_BinaryVector: - dim, err := typeutil.GetDim(field) - assert.NoError(t, err) - binVecData := make([]byte, 0) - total := rowCount * int(dim) / 8 - for i := 0; i < total; i++ { - binVecData = append(binVecData, byte(i%256)) - } - insertData.Data[field.GetFieldID()] = &storage.BinaryVectorFieldData{Data: binVecData, Dim: int(dim)} - case schemapb.DataType_FloatVector: - dim, err := typeutil.GetDim(field) - assert.NoError(t, err) - floatVecData := make([]float32, 0) - total := rowCount * int(dim) - for i := 0; i < total; i++ { - floatVecData = append(floatVecData, rand.Float32()) - } - insertData.Data[field.GetFieldID()] = &storage.FloatVectorFieldData{Data: floatVecData, Dim: int(dim)} - case schemapb.DataType_Float16Vector: - dim, err := typeutil.GetDim(field) - assert.NoError(t, err) - total := int64(rowCount) * dim * 2 - float16VecData := make([]byte, total) - _, err = rand2.Read(float16VecData) - assert.NoError(t, err) - insertData.Data[field.GetFieldID()] = &storage.Float16VectorFieldData{Data: float16VecData, Dim: int(dim)} - case schemapb.DataType_BFloat16Vector: - dim, err := typeutil.GetDim(field) - assert.NoError(t, err) - total := int64(rowCount) * dim * 2 - bfloat16VecData := make([]byte, total) - _, err = rand2.Read(bfloat16VecData) - assert.NoError(t, err) - insertData.Data[field.GetFieldID()] = &storage.BFloat16VectorFieldData{Data: bfloat16VecData, Dim: int(dim)} - case schemapb.DataType_SparseFloatVector: - sparseFloatVecData := testutils.GenerateSparseFloatVectors(rowCount) - insertData.Data[field.GetFieldID()] = &storage.SparseFloatVectorFieldData{ - SparseFloatArray: *sparseFloatVecData, - } - case schemapb.DataType_String, schemapb.DataType_VarChar: - varcharData := make([]string, 0) - for i := 0; i < rowCount; i++ { - varcharData = append(varcharData, strconv.Itoa(i)) - } - insertData.Data[field.GetFieldID()] = &storage.StringFieldData{Data: varcharData} - case schemapb.DataType_JSON: - jsonData := make([][]byte, 0) - for i := 0; i < rowCount; i++ { - if i%4 == 0 { - v, _ := json.Marshal("{\"a\": \"%s\", \"b\": %d}") - jsonData = append(jsonData, v) - } else if i%4 == 1 { - v, _ := json.Marshal(i) - jsonData = append(jsonData, v) - } else if i%4 == 2 { - v, _ := json.Marshal(float32(i) * 0.1) - jsonData = append(jsonData, v) - } else if i%4 == 3 { - v, _ := json.Marshal(strconv.Itoa(i)) - jsonData = append(jsonData, v) - } - } - insertData.Data[field.GetFieldID()] = &storage.JSONFieldData{Data: jsonData} - case schemapb.DataType_Array: - arrayData := make([]*schemapb.ScalarField, 0) - for i := 0; i < rowCount; i++ { - arrayData = append(arrayData, &schemapb.ScalarField{ - Data: &schemapb.ScalarField_IntData{ - IntData: &schemapb.IntArray{ - Data: []int32{int32(i), int32(i + 1), int32(i + 2)}, - }, - }, - }) - } - insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} - default: - panic(fmt.Sprintf("unexpected data type: %s", field.GetDataType().String())) - } - } - return insertData -} - func CreateReader(data interface{}) (io.Reader, error) { buf := new(bytes.Buffer) err := npyio.Write(buf, data) @@ -244,7 +111,8 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) { }, }, } - insertData := createInsertData(suite.T(), schema, suite.numRows) + insertData, err := testutil.CreateInsertData(schema, suite.numRows) + suite.NoError(err) fieldIDToField := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 { return field.GetFieldID() }) @@ -383,7 +251,8 @@ func (suite *ReaderSuite) failRun(dt schemapb.DataType, isDynamic bool) { }, }, } - insertData := createInsertData(suite.T(), schema, suite.numRows) + insertData, err := testutil.CreateInsertData(schema, suite.numRows) + suite.NoError(err) fieldIDToField := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 { return field.GetFieldID() }) diff --git a/internal/util/importutilv2/parquet/reader_test.go b/internal/util/importutilv2/parquet/reader_test.go index 5d21061463b1e..cfcfd8d0a7cf5 100644 --- a/internal/util/importutilv2/parquet/reader_test.go +++ b/internal/util/importutilv2/parquet/reader_test.go @@ -18,19 +18,15 @@ package parquet import ( "context" - "encoding/json" "fmt" "io" "math/rand" "os" "testing" - "github.com/apache/arrow/go/v12/arrow" "github.com/apache/arrow/go/v12/arrow/array" - "github.com/apache/arrow/go/v12/arrow/memory" "github.com/apache/arrow/go/v12/parquet" "github.com/apache/arrow/go/v12/parquet/pqarrow" - "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "golang.org/x/exp/slices" @@ -38,9 +34,9 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/testutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/testutils" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -72,383 +68,6 @@ func randomString(length int) string { return string(b) } -func buildArrayData(schema *schemapb.CollectionSchema, rows int) ([]arrow.Array, *storage.InsertData, error) { - mem := memory.NewGoAllocator() - columns := make([]arrow.Array, 0, len(schema.Fields)) - insertData, err := storage.NewInsertData(schema) - if err != nil { - return nil, nil, err - } - for _, field := range schema.Fields { - dim := 1 - if typeutil.IsVectorType(field.GetDataType()) && !typeutil.IsSparseFloatVectorType(field.GetDataType()) { - dim2, err := typeutil.GetDim(field) - if err != nil { - return nil, nil, err - } - dim = int(dim2) - } - dataType := field.GetDataType() - elementType := field.GetElementType() - isBinary := field.GetName() == "FieldBinaryVector2" - switch dataType { - case schemapb.DataType_Bool: - builder := array.NewBooleanBuilder(mem) - boolData := make([]bool, 0) - for i := 0; i < rows; i++ { - boolData = append(boolData, i%2 == 0) - } - insertData.Data[field.GetFieldID()] = &storage.BoolFieldData{Data: boolData} - builder.AppendValues(boolData, nil) - columns = append(columns, builder.NewBooleanArray()) - case schemapb.DataType_Int8: - builder := array.NewInt8Builder(mem) - int8Data := make([]int8, 0) - for i := 0; i < rows; i++ { - int8Data = append(int8Data, int8(i)) - } - insertData.Data[field.GetFieldID()] = &storage.Int8FieldData{Data: int8Data} - builder.AppendValues(int8Data, nil) - columns = append(columns, builder.NewInt8Array()) - case schemapb.DataType_Int16: - int16Data := make([]int16, 0) - builder := array.NewInt16Builder(mem) - for i := 0; i < rows; i++ { - int16Data = append(int16Data, int16(i)) - } - insertData.Data[field.GetFieldID()] = &storage.Int16FieldData{Data: int16Data} - builder.AppendValues(int16Data, nil) - columns = append(columns, builder.NewInt16Array()) - case schemapb.DataType_Int32: - int32Data := make([]int32, 0) - builder := array.NewInt32Builder(mem) - for i := 0; i < rows; i++ { - int32Data = append(int32Data, int32(i)) - } - insertData.Data[field.GetFieldID()] = &storage.Int32FieldData{Data: int32Data} - builder.AppendValues(int32Data, nil) - columns = append(columns, builder.NewInt32Array()) - case schemapb.DataType_Int64: - int64Data := make([]int64, 0) - builder := array.NewInt64Builder(mem) - for i := 0; i < rows; i++ { - int64Data = append(int64Data, int64(i)) - } - insertData.Data[field.GetFieldID()] = &storage.Int64FieldData{Data: int64Data} - builder.AppendValues(int64Data, nil) - columns = append(columns, builder.NewInt64Array()) - case schemapb.DataType_Float: - floatData := make([]float32, 0) - builder := array.NewFloat32Builder(mem) - for i := 0; i < rows; i++ { - floatData = append(floatData, float32(i)*0.1) - } - insertData.Data[field.GetFieldID()] = &storage.FloatFieldData{Data: floatData} - builder.AppendValues(floatData, nil) - columns = append(columns, builder.NewFloat32Array()) - case schemapb.DataType_Double: - doubleData := make([]float64, 0) - builder := array.NewFloat64Builder(mem) - for i := 0; i < rows; i++ { - doubleData = append(doubleData, float64(i)*0.02) - } - insertData.Data[field.GetFieldID()] = &storage.DoubleFieldData{Data: doubleData} - builder.AppendValues(doubleData, nil) - columns = append(columns, builder.NewFloat64Array()) - case schemapb.DataType_VarChar, schemapb.DataType_String: - varcharData := make([]string, 0) - builder := array.NewStringBuilder(mem) - for i := 0; i < rows; i++ { - varcharData = append(varcharData, randomString(10)) - } - insertData.Data[field.GetFieldID()] = &storage.StringFieldData{Data: varcharData} - builder.AppendValues(varcharData, nil) - columns = append(columns, builder.NewStringArray()) - case schemapb.DataType_FloatVector: - floatVecData := make([]float32, 0) - builder := array.NewListBuilder(mem, &arrow.Float32Type{}) - offsets := make([]int32, 0, rows) - valid := make([]bool, 0, rows) - for i := 0; i < rows; i++ { - for j := 0; j < dim; j++ { - floatVecData = append(floatVecData, float32(i*dim+j)) - } - offsets = append(offsets, int32(i*dim)) - valid = append(valid, true) - } - builder.ValueBuilder().(*array.Float32Builder).AppendValues(floatVecData, nil) - insertData.Data[field.GetFieldID()] = &storage.FloatVectorFieldData{Data: floatVecData, Dim: dim} - builder.AppendValues(offsets, valid) - columns = append(columns, builder.NewListArray()) - case schemapb.DataType_Float16Vector: - float16VecData := make([]byte, 0) - builder := array.NewListBuilder(mem, &arrow.Uint8Type{}) - offsets := make([]int32, 0, rows) - valid := make([]bool, 0, rows) - rowBytes := dim * 2 - for i := 0; i < rows; i++ { - for j := 0; j < rowBytes; j++ { - float16VecData = append(float16VecData, uint8((i+j)%256)) - } - offsets = append(offsets, int32(i*rowBytes)) - valid = append(valid, true) - } - builder.ValueBuilder().(*array.Uint8Builder).AppendValues(float16VecData, nil) - insertData.Data[field.GetFieldID()] = &storage.Float16VectorFieldData{Data: float16VecData, Dim: dim} - builder.AppendValues(offsets, valid) - columns = append(columns, builder.NewListArray()) - case schemapb.DataType_BFloat16Vector: - bfloat16VecData := make([]byte, 0) - builder := array.NewListBuilder(mem, &arrow.Uint8Type{}) - offsets := make([]int32, 0, rows) - valid := make([]bool, 0, rows) - rowBytes := dim * 2 - for i := 0; i < rows; i++ { - for j := 0; j < rowBytes; j++ { - bfloat16VecData = append(bfloat16VecData, uint8((i+j)%256)) - } - offsets = append(offsets, int32(i*rowBytes)) - valid = append(valid, true) - } - builder.ValueBuilder().(*array.Uint8Builder).AppendValues(bfloat16VecData, nil) - insertData.Data[field.GetFieldID()] = &storage.BFloat16VectorFieldData{Data: bfloat16VecData, Dim: dim} - builder.AppendValues(offsets, valid) - columns = append(columns, builder.NewListArray()) - case schemapb.DataType_SparseFloatVector: - sparsefloatVecData := make([]byte, 0) - builder := array.NewListBuilder(mem, &arrow.Uint8Type{}) - offsets := make([]int32, 0, rows) - valid := make([]bool, 0, rows) - vecData := testutils.GenerateSparseFloatVectors(rows) - currOffset := int32(0) - for i := 0; i < rows; i++ { - rowVecData := vecData.GetContents()[i] - sparsefloatVecData = append(sparsefloatVecData, rowVecData...) - offsets = append(offsets, currOffset) - currOffset = currOffset + int32(len(rowVecData)) - valid = append(valid, true) - } - builder.ValueBuilder().(*array.Uint8Builder).AppendValues(sparsefloatVecData, nil) - insertData.Data[field.GetFieldID()] = &storage.SparseFloatVectorFieldData{ - SparseFloatArray: *vecData, - } - builder.AppendValues(offsets, valid) - columns = append(columns, builder.NewListArray()) - case schemapb.DataType_BinaryVector: - if isBinary { - binVecData := make([][]byte, 0) - builder := array.NewBinaryBuilder(mem, &arrow.BinaryType{}) - for i := 0; i < rows; i++ { - element := make([]byte, dim/8) - for j := 0; j < dim/8; j++ { - element[j] = randomString(1)[0] - } - binVecData = append(binVecData, element) - } - builder.AppendValues(binVecData, nil) - columns = append(columns, builder.NewBinaryArray()) - insertData.Data[field.GetFieldID()] = &storage.BinaryVectorFieldData{Data: lo.Flatten(binVecData), Dim: dim} - } else { - binVecData := make([]byte, 0) - builder := array.NewListBuilder(mem, &arrow.Uint8Type{}) - offsets := make([]int32, 0, rows) - valid := make([]bool, 0) - rowBytes := dim / 8 - for i := 0; i < rows; i++ { - for j := 0; j < rowBytes; j++ { - binVecData = append(binVecData, uint8((i+j)%256)) - } - offsets = append(offsets, int32(i*rowBytes)) - valid = append(valid, true) - } - builder.ValueBuilder().(*array.Uint8Builder).AppendValues(binVecData, nil) - builder.AppendValues(offsets, valid) - columns = append(columns, builder.NewListArray()) - insertData.Data[field.GetFieldID()] = &storage.BinaryVectorFieldData{Data: binVecData, Dim: dim} - } - case schemapb.DataType_JSON: - jsonData := make([][]byte, 0) - builder := array.NewStringBuilder(mem) - for i := 0; i < rows; i++ { - if i%4 == 0 { - v, _ := json.Marshal(fmt.Sprintf("{\"a\": \"%s\", \"b\": %d}", randomString(3), i)) - jsonData = append(jsonData, v) - } else if i%4 == 1 { - v, _ := json.Marshal(i) - jsonData = append(jsonData, v) - } else if i%4 == 2 { - v, _ := json.Marshal(float32(i) * 0.1) - jsonData = append(jsonData, v) - } else if i%4 == 3 { - v, _ := json.Marshal(randomString(10)) - jsonData = append(jsonData, v) - } - } - insertData.Data[field.GetFieldID()] = &storage.JSONFieldData{Data: jsonData} - builder.AppendValues(lo.Map(jsonData, func(bs []byte, _ int) string { - return string(bs) - }), nil) - columns = append(columns, builder.NewStringArray()) - case schemapb.DataType_Array: - offsets := make([]int32, 0, rows) - valid := make([]bool, 0, rows) - index := 0 - for i := 0; i < rows; i++ { - offsets = append(offsets, int32(index)) - valid = append(valid, true) - index += 3 - } - arrayData := make([]*schemapb.ScalarField, 0) - switch elementType { - case schemapb.DataType_Bool: - builder := array.NewListBuilder(mem, &arrow.BooleanType{}) - valueBuilder := builder.ValueBuilder().(*array.BooleanBuilder) - for i := 0; i < rows; i++ { - data := []bool{i%2 == 0, i%3 == 0, i%4 == 0} - arrayData = append(arrayData, &schemapb.ScalarField{ - Data: &schemapb.ScalarField_BoolData{ - BoolData: &schemapb.BoolArray{ - Data: data, - }, - }, - }) - valueBuilder.AppendValues(data, nil) - } - insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} - builder.AppendValues(offsets, valid) - columns = append(columns, builder.NewListArray()) - case schemapb.DataType_Int8: - builder := array.NewListBuilder(mem, &arrow.Int8Type{}) - valueBuilder := builder.ValueBuilder().(*array.Int8Builder) - for i := 0; i < rows; i++ { - data := []int32{int32(i), int32(i + 1), int32(i + 2)} - data2 := []int8{int8(i), int8(i + 1), int8(i + 2)} - arrayData = append(arrayData, &schemapb.ScalarField{ - Data: &schemapb.ScalarField_IntData{ - IntData: &schemapb.IntArray{ - Data: data, - }, - }, - }) - valueBuilder.AppendValues(data2, nil) - } - insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} - builder.AppendValues(offsets, valid) - columns = append(columns, builder.NewListArray()) - case schemapb.DataType_Int16: - builder := array.NewListBuilder(mem, &arrow.Int16Type{}) - valueBuilder := builder.ValueBuilder().(*array.Int16Builder) - for i := 0; i < rows; i++ { - data := []int32{int32(i), int32(i + 1), int32(i + 2)} - data2 := []int16{int16(i), int16(i + 1), int16(i + 2)} - arrayData = append(arrayData, &schemapb.ScalarField{ - Data: &schemapb.ScalarField_IntData{ - IntData: &schemapb.IntArray{ - Data: data, - }, - }, - }) - valueBuilder.AppendValues(data2, nil) - } - insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} - builder.AppendValues(offsets, valid) - columns = append(columns, builder.NewListArray()) - case schemapb.DataType_Int32: - builder := array.NewListBuilder(mem, &arrow.Int32Type{}) - valueBuilder := builder.ValueBuilder().(*array.Int32Builder) - for i := 0; i < rows; i++ { - data := []int32{int32(i), int32(i + 1), int32(i + 2)} - arrayData = append(arrayData, &schemapb.ScalarField{ - Data: &schemapb.ScalarField_IntData{ - IntData: &schemapb.IntArray{ - Data: data, - }, - }, - }) - valueBuilder.AppendValues(data, nil) - } - insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} - builder.AppendValues(offsets, valid) - columns = append(columns, builder.NewListArray()) - case schemapb.DataType_Int64: - builder := array.NewListBuilder(mem, &arrow.Int64Type{}) - valueBuilder := builder.ValueBuilder().(*array.Int64Builder) - for i := 0; i < rows; i++ { - data := []int32{int32(i), int32(i + 1), int32(i + 2)} - data2 := []int64{int64(i), int64(i + 1), int64(i + 2)} - arrayData = append(arrayData, &schemapb.ScalarField{ - Data: &schemapb.ScalarField_IntData{ - IntData: &schemapb.IntArray{ - Data: data, - }, - }, - }) - valueBuilder.AppendValues(data2, nil) - } - insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} - builder.AppendValues(offsets, valid) - columns = append(columns, builder.NewListArray()) - case schemapb.DataType_Float: - builder := array.NewListBuilder(mem, &arrow.Float32Type{}) - valueBuilder := builder.ValueBuilder().(*array.Float32Builder) - for i := 0; i < rows; i++ { - data := []float32{float32(i) * 0.1, float32(i+1) * 0.1, float32(i+2) * 0.1} - arrayData = append(arrayData, &schemapb.ScalarField{ - Data: &schemapb.ScalarField_FloatData{ - FloatData: &schemapb.FloatArray{ - Data: data, - }, - }, - }) - valueBuilder.AppendValues(data, nil) - } - insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} - builder.AppendValues(offsets, valid) - columns = append(columns, builder.NewListArray()) - case schemapb.DataType_Double: - builder := array.NewListBuilder(mem, &arrow.Float64Type{}) - valueBuilder := builder.ValueBuilder().(*array.Float64Builder) - for i := 0; i < rows; i++ { - data := []float64{float64(i) * 0.02, float64(i+1) * 0.02, float64(i+2) * 0.02} - arrayData = append(arrayData, &schemapb.ScalarField{ - Data: &schemapb.ScalarField_DoubleData{ - DoubleData: &schemapb.DoubleArray{ - Data: data, - }, - }, - }) - valueBuilder.AppendValues(data, nil) - } - insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} - builder.AppendValues(offsets, valid) - columns = append(columns, builder.NewListArray()) - case schemapb.DataType_VarChar, schemapb.DataType_String: - builder := array.NewListBuilder(mem, &arrow.StringType{}) - valueBuilder := builder.ValueBuilder().(*array.StringBuilder) - for i := 0; i < rows; i++ { - data := []string{ - randomString(5) + "-" + fmt.Sprintf("%d", i), - randomString(5) + "-" + fmt.Sprintf("%d", i), - randomString(5) + "-" + fmt.Sprintf("%d", i), - } - arrayData = append(arrayData, &schemapb.ScalarField{ - Data: &schemapb.ScalarField_StringData{ - StringData: &schemapb.StringArray{ - Data: data, - }, - }, - }) - valueBuilder.AppendValues(data, nil) - } - insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} - builder.AppendValues(offsets, valid) - columns = append(columns, builder.NewListArray()) - } - } - } - return columns, insertData, nil -} - func writeParquet(w io.Writer, schema *schemapb.CollectionSchema, numRows int) (*storage.InsertData, error) { pqSchema, err := ConvertToArrowSchema(schema) if err != nil { @@ -460,10 +79,16 @@ func writeParquet(w io.Writer, schema *schemapb.CollectionSchema, numRows int) ( } defer fw.Close() - columns, insertData, err := buildArrayData(schema, numRows) + insertData, err := testutil.CreateInsertData(schema, numRows) + if err != nil { + return nil, err + } + + columns, err := testutil.BuildArrayData(schema, insertData) if err != nil { return nil, err } + recordBatch := array.NewRecord(pqSchema, columns, int64(numRows)) err = fw.Write(recordBatch) if err != nil { @@ -473,7 +98,7 @@ func writeParquet(w io.Writer, schema *schemapb.CollectionSchema, numRows int) ( return insertData, nil } -func (s *ReaderSuite) run(dt schemapb.DataType) { +func (s *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.DataType) { schema := &schemapb.CollectionSchema{ Fields: []*schemapb.FieldSchema{ { @@ -501,9 +126,9 @@ func (s *ReaderSuite) run(dt schemapb.DataType) { }, { FieldID: 102, - Name: dt.String(), - DataType: dt, - ElementType: schemapb.DataType_Int32, + Name: dataType.String(), + DataType: dataType, + ElementType: elemType, TypeParams: []*commonpb.KeyValuePair{ { Key: "max_length", @@ -611,35 +236,45 @@ func (s *ReaderSuite) failRun(dt schemapb.DataType, isDynamic bool) { } func (s *ReaderSuite) TestReadScalarFields() { - s.run(schemapb.DataType_Bool) - s.run(schemapb.DataType_Int8) - s.run(schemapb.DataType_Int16) - s.run(schemapb.DataType_Int32) - s.run(schemapb.DataType_Int64) - s.run(schemapb.DataType_Float) - s.run(schemapb.DataType_Double) - s.run(schemapb.DataType_VarChar) - s.run(schemapb.DataType_Array) - s.run(schemapb.DataType_JSON) + s.run(schemapb.DataType_Bool, schemapb.DataType_None) + s.run(schemapb.DataType_Int8, schemapb.DataType_None) + s.run(schemapb.DataType_Int16, schemapb.DataType_None) + s.run(schemapb.DataType_Int32, schemapb.DataType_None) + s.run(schemapb.DataType_Int64, schemapb.DataType_None) + s.run(schemapb.DataType_Float, schemapb.DataType_None) + s.run(schemapb.DataType_Double, schemapb.DataType_None) + s.run(schemapb.DataType_String, schemapb.DataType_None) + s.run(schemapb.DataType_VarChar, schemapb.DataType_None) + s.run(schemapb.DataType_JSON, schemapb.DataType_None) + + s.run(schemapb.DataType_Array, schemapb.DataType_Bool) + s.run(schemapb.DataType_Array, schemapb.DataType_Int8) + s.run(schemapb.DataType_Array, schemapb.DataType_Int16) + s.run(schemapb.DataType_Array, schemapb.DataType_Int32) + s.run(schemapb.DataType_Array, schemapb.DataType_Int64) + s.run(schemapb.DataType_Array, schemapb.DataType_Float) + s.run(schemapb.DataType_Array, schemapb.DataType_Double) + s.run(schemapb.DataType_Array, schemapb.DataType_String) + s.failRun(schemapb.DataType_JSON, true) } func (s *ReaderSuite) TestStringPK() { s.pkDataType = schemapb.DataType_VarChar - s.run(schemapb.DataType_Int32) + s.run(schemapb.DataType_Int32, schemapb.DataType_None) } func (s *ReaderSuite) TestVector() { s.vecDataType = schemapb.DataType_BinaryVector - s.run(schemapb.DataType_Int32) + s.run(schemapb.DataType_Int32, schemapb.DataType_None) s.vecDataType = schemapb.DataType_FloatVector - s.run(schemapb.DataType_Int32) + s.run(schemapb.DataType_Int32, schemapb.DataType_None) s.vecDataType = schemapb.DataType_Float16Vector - s.run(schemapb.DataType_Int32) + s.run(schemapb.DataType_Int32, schemapb.DataType_None) s.vecDataType = schemapb.DataType_BFloat16Vector - s.run(schemapb.DataType_Int32) + s.run(schemapb.DataType_Int32, schemapb.DataType_None) s.vecDataType = schemapb.DataType_SparseFloatVector - s.run(schemapb.DataType_Int32) + s.run(schemapb.DataType_Int32, schemapb.DataType_None) } func TestUtil(t *testing.T) { diff --git a/internal/util/testutil/test_util.go b/internal/util/testutil/test_util.go index 4bad85fd2bf1e..1da27f9a64986 100644 --- a/internal/util/testutil/test_util.go +++ b/internal/util/testutil/test_util.go @@ -1,11 +1,22 @@ package testutil import ( + rand2 "crypto/rand" + "encoding/json" + "fmt" + "math/rand" "strconv" + "github.com/apache/arrow/go/v12/arrow" + "github.com/apache/arrow/go/v12/arrow/array" + "github.com/apache/arrow/go/v12/arrow/memory" + "github.com/samber/lo" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/testutils" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -82,3 +93,487 @@ func ConstructCollectionSchemaByDataType(collectionName string, Fields: fieldsSchema, } } + +func randomString(length int) string { + letterRunes := []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + b := make([]rune, length) + for i := range b { + b[i] = letterRunes[rand.Intn(len(letterRunes))] + } + return string(b) +} + +func CreateInsertData(schema *schemapb.CollectionSchema, rows int) (*storage.InsertData, error) { + insertData, err := storage.NewInsertData(schema) + if err != nil { + return nil, err + } + for _, field := range schema.GetFields() { + if field.GetAutoID() { + continue + } + switch field.GetDataType() { + case schemapb.DataType_Bool: + boolData := make([]bool, 0) + for i := 0; i < rows; i++ { + boolData = append(boolData, i%3 != 0) + } + insertData.Data[field.GetFieldID()] = &storage.BoolFieldData{Data: boolData} + case schemapb.DataType_Float: + floatData := make([]float32, 0) + for i := 0; i < rows; i++ { + floatData = append(floatData, float32(i/2)) + } + insertData.Data[field.GetFieldID()] = &storage.FloatFieldData{Data: floatData} + case schemapb.DataType_Double: + doubleData := make([]float64, 0) + for i := 0; i < rows; i++ { + doubleData = append(doubleData, float64(i/5)) + } + insertData.Data[field.GetFieldID()] = &storage.DoubleFieldData{Data: doubleData} + case schemapb.DataType_Int8: + int8Data := make([]int8, 0) + for i := 0; i < rows; i++ { + int8Data = append(int8Data, int8(i%256)) + } + insertData.Data[field.GetFieldID()] = &storage.Int8FieldData{Data: int8Data} + case schemapb.DataType_Int16: + int16Data := make([]int16, 0) + for i := 0; i < rows; i++ { + int16Data = append(int16Data, int16(i%65536)) + } + insertData.Data[field.GetFieldID()] = &storage.Int16FieldData{Data: int16Data} + case schemapb.DataType_Int32: + int32Data := make([]int32, 0) + for i := 0; i < rows; i++ { + int32Data = append(int32Data, int32(i%1000)) + } + insertData.Data[field.GetFieldID()] = &storage.Int32FieldData{Data: int32Data} + case schemapb.DataType_Int64: + int64Data := make([]int64, 0) + for i := 0; i < rows; i++ { + int64Data = append(int64Data, int64(i)) + } + insertData.Data[field.GetFieldID()] = &storage.Int64FieldData{Data: int64Data} + case schemapb.DataType_BinaryVector: + dim, err := typeutil.GetDim(field) + if err != nil { + return nil, err + } + binVecData := make([]byte, 0) + total := rows * int(dim) / 8 + for i := 0; i < total; i++ { + binVecData = append(binVecData, byte(i%256)) + } + insertData.Data[field.GetFieldID()] = &storage.BinaryVectorFieldData{Data: binVecData, Dim: int(dim)} + case schemapb.DataType_FloatVector: + dim, err := typeutil.GetDim(field) + if err != nil { + return nil, err + } + floatVecData := make([]float32, 0) + total := rows * int(dim) + for i := 0; i < total; i++ { + floatVecData = append(floatVecData, rand.Float32()) + } + insertData.Data[field.GetFieldID()] = &storage.FloatVectorFieldData{Data: floatVecData, Dim: int(dim)} + case schemapb.DataType_Float16Vector: + dim, err := typeutil.GetDim(field) + if err != nil { + return nil, err + } + total := int64(rows) * dim * 2 + float16VecData := make([]byte, total) + _, err = rand2.Read(float16VecData) + if err != nil { + return nil, err + } + insertData.Data[field.GetFieldID()] = &storage.Float16VectorFieldData{Data: float16VecData, Dim: int(dim)} + case schemapb.DataType_BFloat16Vector: + dim, err := typeutil.GetDim(field) + if err != nil { + return nil, err + } + total := int64(rows) * dim * 2 + bfloat16VecData := make([]byte, total) + _, err = rand2.Read(bfloat16VecData) + if err != nil { + return nil, err + } + insertData.Data[field.GetFieldID()] = &storage.BFloat16VectorFieldData{Data: bfloat16VecData, Dim: int(dim)} + case schemapb.DataType_SparseFloatVector: + sparseFloatVecData := testutils.GenerateSparseFloatVectors(rows) + insertData.Data[field.GetFieldID()] = &storage.SparseFloatVectorFieldData{ + SparseFloatArray: *sparseFloatVecData, + } + case schemapb.DataType_String, schemapb.DataType_VarChar: + varcharData := make([]string, 0) + for i := 0; i < rows; i++ { + varcharData = append(varcharData, strconv.Itoa(i)) + } + insertData.Data[field.GetFieldID()] = &storage.StringFieldData{Data: varcharData} + case schemapb.DataType_JSON: + jsonData := make([][]byte, 0) + for i := 0; i < rows; i++ { + if i%4 == 0 { + v, _ := json.Marshal("{\"a\": \"%s\", \"b\": %d}") + jsonData = append(jsonData, v) + } else if i%4 == 1 { + v, _ := json.Marshal(i) + jsonData = append(jsonData, v) + } else if i%4 == 2 { + v, _ := json.Marshal(float32(i) * 0.1) + jsonData = append(jsonData, v) + } else if i%4 == 3 { + v, _ := json.Marshal(strconv.Itoa(i)) + jsonData = append(jsonData, v) + } + } + insertData.Data[field.GetFieldID()] = &storage.JSONFieldData{Data: jsonData} + case schemapb.DataType_Array: + arrayData := make([]*schemapb.ScalarField, 0) + switch field.GetElementType() { + case schemapb.DataType_Bool: + for i := 0; i < rows; i++ { + data := []bool{i%2 == 0, i%3 == 0, i%4 == 0} + arrayData = append(arrayData, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_BoolData{ + BoolData: &schemapb.BoolArray{ + Data: data, + }, + }, + }) + } + insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} + case schemapb.DataType_Int8, schemapb.DataType_Int16, schemapb.DataType_Int32: + for i := 0; i < rows; i++ { + data := []int32{int32(i), int32(i + 1), int32(i + 2)} + arrayData = append(arrayData, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: data, + }, + }, + }) + } + insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} + case schemapb.DataType_Int64: + for i := 0; i < rows; i++ { + data := []int64{int64(i), int64(i + 1), int64(i + 2)} + arrayData = append(arrayData, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{ + Data: data, + }, + }, + }) + } + insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} + case schemapb.DataType_Float: + for i := 0; i < rows; i++ { + data := []float32{float32(i) * 0.1, float32(i+1) * 0.1, float32(i+2) * 0.1} + arrayData = append(arrayData, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_FloatData{ + FloatData: &schemapb.FloatArray{ + Data: data, + }, + }, + }) + } + insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} + case schemapb.DataType_Double: + for i := 0; i < rows; i++ { + data := []float64{float64(i) * 0.02, float64(i+1) * 0.02, float64(i+2) * 0.02} + arrayData = append(arrayData, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_DoubleData{ + DoubleData: &schemapb.DoubleArray{ + Data: data, + }, + }, + }) + } + insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} + case schemapb.DataType_String, schemapb.DataType_VarChar: + for i := 0; i < rows; i++ { + data := []string{ + randomString(5) + "-" + fmt.Sprintf("%d", i), + randomString(5) + "-" + fmt.Sprintf("%d", i), + randomString(5) + "-" + fmt.Sprintf("%d", i), + } + arrayData = append(arrayData, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_StringData{ + StringData: &schemapb.StringArray{ + Data: data, + }, + }, + }) + } + insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} + } + default: + panic(fmt.Sprintf("unexpected data type: %s", field.GetDataType().String())) + } + } + return insertData, nil +} + +func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.InsertData) ([]arrow.Array, error) { + mem := memory.NewGoAllocator() + columns := make([]arrow.Array, 0, len(schema.Fields)) + for _, field := range schema.Fields { + if field.GetIsPrimaryKey() && field.GetAutoID() { + continue + } + fieldID := field.GetFieldID() + dataType := field.GetDataType() + elementType := field.GetElementType() + switch dataType { + case schemapb.DataType_Bool: + builder := array.NewBooleanBuilder(mem) + boolData := insertData.Data[fieldID].(*storage.BoolFieldData).Data + builder.AppendValues(boolData, nil) + columns = append(columns, builder.NewBooleanArray()) + case schemapb.DataType_Int8: + builder := array.NewInt8Builder(mem) + int8Data := insertData.Data[fieldID].(*storage.Int8FieldData).Data + builder.AppendValues(int8Data, nil) + columns = append(columns, builder.NewInt8Array()) + case schemapb.DataType_Int16: + builder := array.NewInt16Builder(mem) + int16Data := insertData.Data[fieldID].(*storage.Int16FieldData).Data + builder.AppendValues(int16Data, nil) + columns = append(columns, builder.NewInt16Array()) + case schemapb.DataType_Int32: + builder := array.NewInt32Builder(mem) + int32Data := insertData.Data[fieldID].(*storage.Int32FieldData).Data + builder.AppendValues(int32Data, nil) + columns = append(columns, builder.NewInt32Array()) + case schemapb.DataType_Int64: + builder := array.NewInt64Builder(mem) + int64Data := insertData.Data[fieldID].(*storage.Int64FieldData).Data + builder.AppendValues(int64Data, nil) + columns = append(columns, builder.NewInt64Array()) + case schemapb.DataType_Float: + builder := array.NewFloat32Builder(mem) + floatData := insertData.Data[fieldID].(*storage.FloatFieldData).Data + builder.AppendValues(floatData, nil) + columns = append(columns, builder.NewFloat32Array()) + case schemapb.DataType_Double: + builder := array.NewFloat64Builder(mem) + doubleData := insertData.Data[fieldID].(*storage.DoubleFieldData).Data + builder.AppendValues(doubleData, nil) + columns = append(columns, builder.NewFloat64Array()) + case schemapb.DataType_String, schemapb.DataType_VarChar: + builder := array.NewStringBuilder(mem) + stringData := insertData.Data[fieldID].(*storage.StringFieldData).Data + builder.AppendValues(stringData, nil) + columns = append(columns, builder.NewStringArray()) + case schemapb.DataType_BinaryVector: + builder := array.NewListBuilder(mem, &arrow.Uint8Type{}) + dim := insertData.Data[fieldID].(*storage.BinaryVectorFieldData).Dim + binVecData := insertData.Data[fieldID].(*storage.BinaryVectorFieldData).Data + rowBytes := dim / 8 + rows := len(binVecData) / rowBytes + offsets := make([]int32, 0, rows) + valid := make([]bool, 0) + for i := 0; i < rows; i++ { + offsets = append(offsets, int32(i*rowBytes)) + valid = append(valid, true) + } + builder.ValueBuilder().(*array.Uint8Builder).AppendValues(binVecData, nil) + builder.AppendValues(offsets, valid) + columns = append(columns, builder.NewListArray()) + case schemapb.DataType_FloatVector: + builder := array.NewListBuilder(mem, &arrow.Float32Type{}) + dim := insertData.Data[fieldID].(*storage.FloatVectorFieldData).Dim + floatVecData := insertData.Data[fieldID].(*storage.FloatVectorFieldData).Data + rows := len(floatVecData) / dim + offsets := make([]int32, 0, rows) + valid := make([]bool, 0, rows) + for i := 0; i < rows; i++ { + offsets = append(offsets, int32(i*dim)) + valid = append(valid, true) + } + builder.ValueBuilder().(*array.Float32Builder).AppendValues(floatVecData, nil) + builder.AppendValues(offsets, valid) + columns = append(columns, builder.NewListArray()) + case schemapb.DataType_Float16Vector: + builder := array.NewListBuilder(mem, &arrow.Uint8Type{}) + dim := insertData.Data[fieldID].(*storage.Float16VectorFieldData).Dim + float16VecData := insertData.Data[fieldID].(*storage.Float16VectorFieldData).Data + rowBytes := dim * 2 + rows := len(float16VecData) / rowBytes + offsets := make([]int32, 0, rows) + valid := make([]bool, 0, rows) + for i := 0; i < rows; i++ { + offsets = append(offsets, int32(i*rowBytes)) + valid = append(valid, true) + } + builder.ValueBuilder().(*array.Uint8Builder).AppendValues(float16VecData, nil) + builder.AppendValues(offsets, valid) + columns = append(columns, builder.NewListArray()) + case schemapb.DataType_BFloat16Vector: + builder := array.NewListBuilder(mem, &arrow.Uint8Type{}) + dim := insertData.Data[fieldID].(*storage.BFloat16VectorFieldData).Dim + bfloat16VecData := insertData.Data[fieldID].(*storage.BFloat16VectorFieldData).Data + rowBytes := dim * 2 + rows := len(bfloat16VecData) / rowBytes + offsets := make([]int32, 0, rows) + valid := make([]bool, 0, rows) + for i := 0; i < rows; i++ { + offsets = append(offsets, int32(i*rowBytes)) + valid = append(valid, true) + } + builder.ValueBuilder().(*array.Uint8Builder).AppendValues(bfloat16VecData, nil) + builder.AppendValues(offsets, valid) + columns = append(columns, builder.NewListArray()) + case schemapb.DataType_SparseFloatVector: + sparseFloatVecData := make([]byte, 0) + builder := array.NewListBuilder(mem, &arrow.Uint8Type{}) + contents := insertData.Data[fieldID].(*storage.SparseFloatVectorFieldData).GetContents() + rows := len(contents) + offsets := make([]int32, 0, rows) + valid := make([]bool, 0, rows) + currOffset := int32(0) + for i := 0; i < rows; i++ { + rowVecData := contents[i] + sparseFloatVecData = append(sparseFloatVecData, rowVecData...) + offsets = append(offsets, currOffset) + currOffset = currOffset + int32(len(rowVecData)) + valid = append(valid, true) + } + builder.ValueBuilder().(*array.Uint8Builder).AppendValues(sparseFloatVecData, nil) + builder.AppendValues(offsets, valid) + columns = append(columns, builder.NewListArray()) + case schemapb.DataType_JSON: + builder := array.NewStringBuilder(mem) + jsonData := insertData.Data[fieldID].(*storage.JSONFieldData).Data + builder.AppendValues(lo.Map(jsonData, func(bs []byte, _ int) string { + return string(bs) + }), nil) + columns = append(columns, builder.NewStringArray()) + case schemapb.DataType_Array: + data := insertData.Data[fieldID].(*storage.ArrayFieldData).Data + rows := len(data) + offsets := make([]int32, 0, rows) + valid := make([]bool, 0, rows) + currOffset := int32(0) + + switch elementType { + case schemapb.DataType_Bool: + builder := array.NewListBuilder(mem, &arrow.BooleanType{}) + valueBuilder := builder.ValueBuilder().(*array.BooleanBuilder) + for i := 0; i < rows; i++ { + boolData := data[i].Data.(*schemapb.ScalarField_BoolData).BoolData.GetData() + valueBuilder.AppendValues(boolData, nil) + + offsets = append(offsets, currOffset) + valid = append(valid, true) + currOffset = currOffset + int32(len(boolData)) + } + builder.AppendValues(offsets, valid) + columns = append(columns, builder.NewListArray()) + case schemapb.DataType_Int8: + builder := array.NewListBuilder(mem, &arrow.Int8Type{}) + valueBuilder := builder.ValueBuilder().(*array.Int8Builder) + for i := 0; i < rows; i++ { + intData := data[i].Data.(*schemapb.ScalarField_IntData).IntData.GetData() + int8Data := make([]int8, 0) + for j := 0; j < len(intData); j++ { + int8Data = append(int8Data, int8(intData[j])) + } + valueBuilder.AppendValues(int8Data, nil) + + offsets = append(offsets, currOffset) + valid = append(valid, true) + currOffset = currOffset + int32(len(int8Data)) + } + builder.AppendValues(offsets, valid) + columns = append(columns, builder.NewListArray()) + case schemapb.DataType_Int16: + builder := array.NewListBuilder(mem, &arrow.Int16Type{}) + valueBuilder := builder.ValueBuilder().(*array.Int16Builder) + for i := 0; i < rows; i++ { + intData := data[i].Data.(*schemapb.ScalarField_IntData).IntData.GetData() + int16Data := make([]int16, 0) + for j := 0; j < len(intData); j++ { + int16Data = append(int16Data, int16(intData[j])) + } + valueBuilder.AppendValues(int16Data, nil) + + offsets = append(offsets, currOffset) + valid = append(valid, true) + currOffset = currOffset + int32(len(int16Data)) + } + builder.AppendValues(offsets, valid) + columns = append(columns, builder.NewListArray()) + case schemapb.DataType_Int32: + builder := array.NewListBuilder(mem, &arrow.Int32Type{}) + valueBuilder := builder.ValueBuilder().(*array.Int32Builder) + for i := 0; i < rows; i++ { + intData := data[i].Data.(*schemapb.ScalarField_IntData).IntData.GetData() + valueBuilder.AppendValues(intData, nil) + + offsets = append(offsets, currOffset) + valid = append(valid, true) + currOffset = currOffset + int32(len(intData)) + } + builder.AppendValues(offsets, valid) + columns = append(columns, builder.NewListArray()) + case schemapb.DataType_Int64: + builder := array.NewListBuilder(mem, &arrow.Int64Type{}) + valueBuilder := builder.ValueBuilder().(*array.Int64Builder) + for i := 0; i < rows; i++ { + longData := data[i].Data.(*schemapb.ScalarField_LongData).LongData.GetData() + valueBuilder.AppendValues(longData, nil) + + offsets = append(offsets, currOffset) + valid = append(valid, true) + currOffset = currOffset + int32(len(longData)) + } + builder.AppendValues(offsets, valid) + columns = append(columns, builder.NewListArray()) + case schemapb.DataType_Float: + builder := array.NewListBuilder(mem, &arrow.Float32Type{}) + valueBuilder := builder.ValueBuilder().(*array.Float32Builder) + for i := 0; i < rows; i++ { + floatData := data[i].Data.(*schemapb.ScalarField_FloatData).FloatData.GetData() + valueBuilder.AppendValues(floatData, nil) + + offsets = append(offsets, currOffset) + valid = append(valid, true) + currOffset = currOffset + int32(len(floatData)) + } + builder.AppendValues(offsets, valid) + columns = append(columns, builder.NewListArray()) + case schemapb.DataType_Double: + builder := array.NewListBuilder(mem, &arrow.Float64Type{}) + valueBuilder := builder.ValueBuilder().(*array.Float64Builder) + for i := 0; i < rows; i++ { + doubleData := data[i].Data.(*schemapb.ScalarField_DoubleData).DoubleData.GetData() + valueBuilder.AppendValues(doubleData, nil) + + offsets = append(offsets, currOffset) + valid = append(valid, true) + currOffset = currOffset + int32(len(doubleData)) + } + builder.AppendValues(offsets, valid) + columns = append(columns, builder.NewListArray()) + case schemapb.DataType_VarChar, schemapb.DataType_String: + builder := array.NewListBuilder(mem, &arrow.StringType{}) + valueBuilder := builder.ValueBuilder().(*array.StringBuilder) + for i := 0; i < rows; i++ { + stringData := data[i].Data.(*schemapb.ScalarField_StringData).StringData.GetData() + valueBuilder.AppendValues(stringData, nil) + + offsets = append(offsets, currOffset) + valid = append(valid, true) + currOffset = currOffset + int32(len(stringData)) + } + builder.AppendValues(offsets, valid) + columns = append(columns, builder.NewListArray()) + } + } + } + return columns, nil +} diff --git a/tests/integration/import/util_test.go b/tests/integration/import/util_test.go index 61e7ba5ada17a..70fdfa3e6e592 100644 --- a/tests/integration/import/util_test.go +++ b/tests/integration/import/util_test.go @@ -18,18 +18,13 @@ package importv2 import ( "context" - rand2 "crypto/rand" "encoding/json" "fmt" - "math/rand" "os" - "strconv" "testing" "time" - "github.com/apache/arrow/go/v12/arrow" "github.com/apache/arrow/go/v12/arrow/array" - "github.com/apache/arrow/go/v12/arrow/memory" "github.com/apache/arrow/go/v12/parquet" "github.com/apache/arrow/go/v12/parquet/pqarrow" "github.com/samber/lo" @@ -41,337 +36,14 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/storage" pq "github.com/milvus-io/milvus/internal/util/importutilv2/parquet" + "github.com/milvus-io/milvus/internal/util/testutil" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" - "github.com/milvus-io/milvus/pkg/util/testutils" - "github.com/milvus-io/milvus/pkg/util/typeutil" "github.com/milvus-io/milvus/tests/integration" ) const dim = 128 -func createInsertData(t *testing.T, schema *schemapb.CollectionSchema, rowCount int) *storage.InsertData { - insertData, err := storage.NewInsertData(schema) - assert.NoError(t, err) - for _, field := range schema.GetFields() { - if field.GetAutoID() { - continue - } - switch field.GetDataType() { - case schemapb.DataType_Bool: - boolData := make([]bool, 0) - for i := 0; i < rowCount; i++ { - boolData = append(boolData, i%3 != 0) - } - insertData.Data[field.GetFieldID()] = &storage.BoolFieldData{Data: boolData} - case schemapb.DataType_Float: - floatData := make([]float32, 0) - for i := 0; i < rowCount; i++ { - floatData = append(floatData, float32(i/2)) - } - insertData.Data[field.GetFieldID()] = &storage.FloatFieldData{Data: floatData} - case schemapb.DataType_Double: - doubleData := make([]float64, 0) - for i := 0; i < rowCount; i++ { - doubleData = append(doubleData, float64(i/5)) - } - insertData.Data[field.GetFieldID()] = &storage.DoubleFieldData{Data: doubleData} - case schemapb.DataType_Int8: - int8Data := make([]int8, 0) - for i := 0; i < rowCount; i++ { - int8Data = append(int8Data, int8(i%256)) - } - insertData.Data[field.GetFieldID()] = &storage.Int8FieldData{Data: int8Data} - case schemapb.DataType_Int16: - int16Data := make([]int16, 0) - for i := 0; i < rowCount; i++ { - int16Data = append(int16Data, int16(i%65536)) - } - insertData.Data[field.GetFieldID()] = &storage.Int16FieldData{Data: int16Data} - case schemapb.DataType_Int32: - int32Data := make([]int32, 0) - for i := 0; i < rowCount; i++ { - int32Data = append(int32Data, int32(i%1000)) - } - insertData.Data[field.GetFieldID()] = &storage.Int32FieldData{Data: int32Data} - case schemapb.DataType_Int64: - int64Data := make([]int64, 0) - for i := 0; i < rowCount; i++ { - int64Data = append(int64Data, int64(i)) - } - insertData.Data[field.GetFieldID()] = &storage.Int64FieldData{Data: int64Data} - case schemapb.DataType_BinaryVector: - dim, err := typeutil.GetDim(field) - assert.NoError(t, err) - binVecData := make([]byte, 0) - total := rowCount * int(dim) / 8 - for i := 0; i < total; i++ { - binVecData = append(binVecData, byte(i%256)) - } - insertData.Data[field.GetFieldID()] = &storage.BinaryVectorFieldData{Data: binVecData, Dim: int(dim)} - case schemapb.DataType_FloatVector: - dim, err := typeutil.GetDim(field) - assert.NoError(t, err) - floatVecData := make([]float32, 0) - total := rowCount * int(dim) - for i := 0; i < total; i++ { - floatVecData = append(floatVecData, rand.Float32()) - } - insertData.Data[field.GetFieldID()] = &storage.FloatVectorFieldData{Data: floatVecData, Dim: int(dim)} - case schemapb.DataType_Float16Vector: - dim, err := typeutil.GetDim(field) - assert.NoError(t, err) - total := int64(rowCount) * dim * 2 - float16VecData := make([]byte, total) - _, err = rand2.Read(float16VecData) - assert.NoError(t, err) - insertData.Data[field.GetFieldID()] = &storage.Float16VectorFieldData{Data: float16VecData, Dim: int(dim)} - case schemapb.DataType_BFloat16Vector: - dim, err := typeutil.GetDim(field) - assert.NoError(t, err) - total := int64(rowCount) * dim * 2 - bfloat16VecData := make([]byte, total) - _, err = rand2.Read(bfloat16VecData) - assert.NoError(t, err) - insertData.Data[field.GetFieldID()] = &storage.BFloat16VectorFieldData{Data: bfloat16VecData, Dim: int(dim)} - case schemapb.DataType_SparseFloatVector: - sparseFloatVecData := testutils.GenerateSparseFloatVectors(rowCount) - insertData.Data[field.GetFieldID()] = &storage.SparseFloatVectorFieldData{ - SparseFloatArray: *sparseFloatVecData, - } - case schemapb.DataType_String, schemapb.DataType_VarChar: - varcharData := make([]string, 0) - for i := 0; i < rowCount; i++ { - varcharData = append(varcharData, strconv.Itoa(i)) - } - insertData.Data[field.GetFieldID()] = &storage.StringFieldData{Data: varcharData} - case schemapb.DataType_JSON: - jsonData := make([][]byte, 0) - for i := 0; i < rowCount; i++ { - jsonData = append(jsonData, []byte(fmt.Sprintf("{\"y\": %d}", i))) - } - insertData.Data[field.GetFieldID()] = &storage.JSONFieldData{Data: jsonData} - case schemapb.DataType_Array: - arrayData := make([]*schemapb.ScalarField, 0) - for i := 0; i < rowCount; i++ { - arrayData = append(arrayData, &schemapb.ScalarField{ - Data: &schemapb.ScalarField_IntData{ - IntData: &schemapb.IntArray{ - Data: []int32{int32(i), int32(i + 1), int32(i + 2)}, - }, - }, - }) - } - insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} - default: - panic(fmt.Sprintf("unexpected data type: %s", field.GetDataType().String())) - } - } - return insertData -} - -func randomString(length int) string { - letterRunes := []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") - b := make([]rune, length) - for i := range b { - b[i] = letterRunes[rand.Intn(len(letterRunes))] - } - return string(b) -} - -func buildArrayData(dataType, elemType schemapb.DataType, dim, rows int) arrow.Array { - mem := memory.NewGoAllocator() - switch dataType { - case schemapb.DataType_Bool: - builder := array.NewBooleanBuilder(mem) - for i := 0; i < rows; i++ { - builder.Append(i%2 == 0) - } - return builder.NewBooleanArray() - case schemapb.DataType_Int8: - builder := array.NewInt8Builder(mem) - for i := 0; i < rows; i++ { - builder.Append(int8(i)) - } - return builder.NewInt8Array() - case schemapb.DataType_Int16: - builder := array.NewInt16Builder(mem) - for i := 0; i < rows; i++ { - builder.Append(int16(i)) - } - return builder.NewInt16Array() - case schemapb.DataType_Int32: - builder := array.NewInt32Builder(mem) - for i := 0; i < rows; i++ { - builder.Append(int32(i)) - } - return builder.NewInt32Array() - case schemapb.DataType_Int64: - builder := array.NewInt64Builder(mem) - for i := 0; i < rows; i++ { - builder.Append(int64(i)) - } - return builder.NewInt64Array() - case schemapb.DataType_Float: - builder := array.NewFloat32Builder(mem) - for i := 0; i < rows; i++ { - builder.Append(float32(i) * 0.1) - } - return builder.NewFloat32Array() - case schemapb.DataType_Double: - builder := array.NewFloat64Builder(mem) - for i := 0; i < rows; i++ { - builder.Append(float64(i) * 0.02) - } - return builder.NewFloat64Array() - case schemapb.DataType_VarChar, schemapb.DataType_String: - builder := array.NewStringBuilder(mem) - for i := 0; i < rows; i++ { - builder.Append(randomString(10)) - } - return builder.NewStringArray() - case schemapb.DataType_BinaryVector: - builder := array.NewListBuilder(mem, &arrow.Uint8Type{}) - offsets := make([]int32, 0, rows) - valid := make([]bool, 0) - rowBytes := dim / 8 - for i := 0; i < rowBytes*rows; i++ { - builder.ValueBuilder().(*array.Uint8Builder).Append(uint8(i % 256)) - } - for i := 0; i < rows; i++ { - offsets = append(offsets, int32(rowBytes*i)) - valid = append(valid, true) - } - builder.AppendValues(offsets, valid) - return builder.NewListArray() - case schemapb.DataType_FloatVector: - builder := array.NewListBuilder(mem, &arrow.Float32Type{}) - offsets := make([]int32, 0, rows) - valid := make([]bool, 0, rows) - for i := 0; i < dim*rows; i++ { - builder.ValueBuilder().(*array.Float32Builder).Append(float32(i)) - } - for i := 0; i < rows; i++ { - offsets = append(offsets, int32(dim*i)) - valid = append(valid, true) - } - builder.AppendValues(offsets, valid) - return builder.NewListArray() - case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: - builder := array.NewListBuilder(mem, &arrow.Uint8Type{}) - offsets := make([]int32, 0, rows) - valid := make([]bool, 0) - rowBytes := dim * 2 - for i := 0; i < rowBytes*rows; i++ { - builder.ValueBuilder().(*array.Uint8Builder).Append(uint8(i % 256)) - } - for i := 0; i < rows; i++ { - offsets = append(offsets, int32(rowBytes*i)) - valid = append(valid, true) - } - builder.AppendValues(offsets, valid) - return builder.NewListArray() - case schemapb.DataType_SparseFloatVector: - sparsefloatVecData := make([]byte, 0) - builder := array.NewListBuilder(mem, &arrow.Uint8Type{}) - offsets := make([]int32, 0, rows+1) - valid := make([]bool, 0, rows) - vecData := testutils.GenerateSparseFloatVectors(rows) - offsets = append(offsets, 0) - for i := 0; i < rows; i++ { - rowVecData := vecData.GetContents()[i] - sparsefloatVecData = append(sparsefloatVecData, rowVecData...) - offsets = append(offsets, offsets[i]+int32(len(rowVecData))) - valid = append(valid, true) - } - builder.ValueBuilder().(*array.Uint8Builder).AppendValues(sparsefloatVecData, nil) - builder.AppendValues(offsets, valid) - return builder.NewListArray() - case schemapb.DataType_JSON: - builder := array.NewStringBuilder(mem) - for i := 0; i < rows; i++ { - builder.Append(fmt.Sprintf("{\"a\": \"%s\", \"b\": %d}", randomString(3), i)) - } - return builder.NewStringArray() - case schemapb.DataType_Array: - offsets := make([]int32, 0, rows) - valid := make([]bool, 0, rows) - index := 0 - for i := 0; i < rows; i++ { - index += i % 10 - offsets = append(offsets, int32(index)) - valid = append(valid, true) - } - switch elemType { - case schemapb.DataType_Bool: - builder := array.NewListBuilder(mem, &arrow.BooleanType{}) - valueBuilder := builder.ValueBuilder().(*array.BooleanBuilder) - for i := 0; i < index; i++ { - valueBuilder.Append(i%2 == 0) - } - builder.AppendValues(offsets, valid) - return builder.NewListArray() - case schemapb.DataType_Int8: - builder := array.NewListBuilder(mem, &arrow.Int8Type{}) - valueBuilder := builder.ValueBuilder().(*array.Int8Builder) - for i := 0; i < index; i++ { - valueBuilder.Append(int8(i)) - } - builder.AppendValues(offsets, valid) - return builder.NewListArray() - case schemapb.DataType_Int16: - builder := array.NewListBuilder(mem, &arrow.Int16Type{}) - valueBuilder := builder.ValueBuilder().(*array.Int16Builder) - for i := 0; i < index; i++ { - valueBuilder.Append(int16(i)) - } - builder.AppendValues(offsets, valid) - return builder.NewListArray() - case schemapb.DataType_Int32: - builder := array.NewListBuilder(mem, &arrow.Int32Type{}) - valueBuilder := builder.ValueBuilder().(*array.Int32Builder) - for i := 0; i < index; i++ { - valueBuilder.Append(int32(i)) - } - builder.AppendValues(offsets, valid) - return builder.NewListArray() - case schemapb.DataType_Int64: - builder := array.NewListBuilder(mem, &arrow.Int64Type{}) - valueBuilder := builder.ValueBuilder().(*array.Int64Builder) - for i := 0; i < index; i++ { - valueBuilder.Append(int64(i)) - } - builder.AppendValues(offsets, valid) - return builder.NewListArray() - case schemapb.DataType_Float: - builder := array.NewListBuilder(mem, &arrow.Float32Type{}) - valueBuilder := builder.ValueBuilder().(*array.Float32Builder) - for i := 0; i < index; i++ { - valueBuilder.Append(float32(i) * 0.1) - } - builder.AppendValues(offsets, valid) - return builder.NewListArray() - case schemapb.DataType_Double: - builder := array.NewListBuilder(mem, &arrow.Float64Type{}) - valueBuilder := builder.ValueBuilder().(*array.Float64Builder) - for i := 0; i < index; i++ { - valueBuilder.Append(float64(i) * 0.02) - } - builder.AppendValues(offsets, valid) - return builder.NewListArray() - case schemapb.DataType_VarChar, schemapb.DataType_String: - builder := array.NewListBuilder(mem, &arrow.StringType{}) - valueBuilder := builder.ValueBuilder().(*array.StringBuilder) - for i := 0; i < index; i++ { - valueBuilder.Append(randomString(5) + "-" + fmt.Sprintf("%d", i)) - } - builder.AppendValues(offsets, valid) - return builder.NewListArray() - } - } - return nil -} - func GenerateParquetFile(filePath string, schema *schemapb.CollectionSchema, numRows int) error { w, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0o666) if err != nil { @@ -388,37 +60,21 @@ func GenerateParquetFile(filePath string, schema *schemapb.CollectionSchema, num } defer fw.Close() - columns := make([]arrow.Array, 0, len(schema.Fields)) - for _, field := range schema.Fields { - if field.GetIsPrimaryKey() && field.GetAutoID() { - continue - } - columnData := buildArrayData(field.DataType, field.ElementType, dim, numRows) - columns = append(columns, columnData) + insertData, err := testutil.CreateInsertData(schema, numRows) + if err != nil { + return err } + + columns, err := testutil.BuildArrayData(schema, insertData) + if err != nil { + return err + } + recordBatch := array.NewRecord(pqSchema, columns, int64(numRows)) return fw.Write(recordBatch) } func GenerateNumpyFiles(cm storage.ChunkManager, schema *schemapb.CollectionSchema, rowCount int) (*internalpb.ImportFile, error) { - paths := make([]string, 0) - for _, field := range schema.GetFields() { - if field.GetAutoID() && field.GetIsPrimaryKey() { - continue - } - path := fmt.Sprintf("%s/%s.npy", cm.RootPath(), field.GetName()) - err := GenerateNumpyFile(path, rowCount, field.GetDataType()) - if err != nil { - return nil, err - } - paths = append(paths, path) - } - return &internalpb.ImportFile{ - Paths: paths, - }, nil -} - -func GenerateNumpyFile(filePath string, rowCount int, dType schemapb.DataType) error { writeFn := func(path string, data interface{}) error { f, err := os.Create(path) if err != nil { @@ -434,153 +90,117 @@ func GenerateNumpyFile(filePath string, rowCount int, dType schemapb.DataType) e return nil } - switch dType { - case schemapb.DataType_Bool: - boolData := make([]bool, 0) - for i := 0; i < rowCount; i++ { - boolData = append(boolData, i%3 != 0) - } - err := writeFn(filePath, boolData) - if err != nil { - return err - } - case schemapb.DataType_Float: - floatData := make([]float32, 0) - for i := 0; i < rowCount; i++ { - floatData = append(floatData, float32(i/2)) - } - err := writeFn(filePath, floatData) - if err != nil { - return err - } - case schemapb.DataType_Double: - doubleData := make([]float64, 0) - for i := 0; i < rowCount; i++ { - doubleData = append(doubleData, float64(i/5)) - } - err := writeFn(filePath, doubleData) - if err != nil { - return err - } - case schemapb.DataType_Int8: - int8Data := make([]int8, 0) - for i := 0; i < rowCount; i++ { - int8Data = append(int8Data, int8(i%256)) - } - err := writeFn(filePath, int8Data) - if err != nil { - return err - } - case schemapb.DataType_Int16: - int16Data := make([]int16, 0) - for i := 0; i < rowCount; i++ { - int16Data = append(int16Data, int16(i%65536)) - } - err := writeFn(filePath, int16Data) - if err != nil { - return err - } - case schemapb.DataType_Int32: - int32Data := make([]int32, 0) - for i := 0; i < rowCount; i++ { - int32Data = append(int32Data, int32(i%1000)) - } - err := writeFn(filePath, int32Data) - if err != nil { - return err - } - case schemapb.DataType_Int64: - int64Data := make([]int64, 0) - for i := 0; i < rowCount; i++ { - int64Data = append(int64Data, int64(i)) - } - err := writeFn(filePath, int64Data) - if err != nil { - return err + insertData, err := testutil.CreateInsertData(schema, rowCount) + if err != nil { + return nil, err + } + + var data interface{} + paths := make([]string, 0) + for _, field := range schema.GetFields() { + if field.GetAutoID() && field.GetIsPrimaryKey() { + continue } - case schemapb.DataType_BinaryVector: - const rowBytes = dim / 8 - binVecData := make([][rowBytes]byte, 0, rowCount) - for i := 0; i < rowCount; i++ { - vec := [rowBytes]byte{} - for j := 0; j < rowBytes; j++ { - vec[j] = byte((i + j) % 256) + path := fmt.Sprintf("%s/%s.npy", cm.RootPath(), field.GetName()) + + fieldID := field.GetFieldID() + dType := field.GetDataType() + switch dType { + case schemapb.DataType_Bool: + data = insertData.Data[fieldID].(*storage.BoolFieldData).Data + case schemapb.DataType_Int8: + data = insertData.Data[fieldID].(*storage.Int8FieldData).Data + case schemapb.DataType_Int16: + data = insertData.Data[fieldID].(*storage.Int16FieldData).Data + case schemapb.DataType_Int32: + data = insertData.Data[fieldID].(*storage.Int32FieldData).Data + case schemapb.DataType_Int64: + data = insertData.Data[fieldID].(*storage.Int64FieldData).Data + case schemapb.DataType_Float: + data = insertData.Data[fieldID].(*storage.FloatFieldData).Data + case schemapb.DataType_Double: + data = insertData.Data[fieldID].(*storage.DoubleFieldData).Data + case schemapb.DataType_String, schemapb.DataType_VarChar: + data = insertData.Data[fieldID].(*storage.StringFieldData).Data + case schemapb.DataType_BinaryVector: + vecData := insertData.Data[fieldID].(*storage.BinaryVectorFieldData).Data + if dim != insertData.Data[fieldID].(*storage.BinaryVectorFieldData).Dim { + panic(fmt.Sprintf("dim mis-match: %d, %d", dim, insertData.Data[fieldID].(*storage.BinaryVectorFieldData).Dim)) } - binVecData = append(binVecData, vec) - } - err := writeFn(filePath, binVecData) - if err != nil { - return err - } - case schemapb.DataType_FloatVector: - data := make([][dim]float32, 0, rowCount) - for i := 0; i < rowCount; i++ { - vec := [dim]float32{} - for j := 0; j < dim; j++ { - vec[j] = rand.Float32() + const rowBytes = dim / 8 + rows := len(vecData) / rowBytes + binVecData := make([][rowBytes]byte, 0, rows) + for i := 0; i < rows; i++ { + rowVec := [rowBytes]byte{} + copy(rowVec[:], vecData[i*rowBytes:(i+1)*rowBytes]) + binVecData = append(binVecData, rowVec) } - data = append(data, vec) - } - err := writeFn(filePath, data) - if err != nil { - return err - } - case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: - const rowBytes = dim * 2 - data := make([][rowBytes]byte, 0, rowCount) - for i := 0; i < rowCount; i++ { - vec := [rowBytes]byte{} - for j := 0; j < rowBytes; j++ { - vec[j] = byte(rand.Uint32() % 256) + data = binVecData + case schemapb.DataType_FloatVector: + vecData := insertData.Data[fieldID].(*storage.FloatVectorFieldData).Data + if dim != insertData.Data[fieldID].(*storage.FloatVectorFieldData).Dim { + panic(fmt.Sprintf("dim mis-match: %d, %d", dim, insertData.Data[fieldID].(*storage.FloatVectorFieldData).Dim)) } - data = append(data, vec) - } - err := writeFn(filePath, data) - if err != nil { - return err - } - case schemapb.DataType_String, schemapb.DataType_VarChar: - varcharData := make([]string, 0) - for i := 0; i < rowCount; i++ { - varcharData = append(varcharData, strconv.Itoa(i)) - } - err := writeFn(filePath, varcharData) - if err != nil { - return err - } - case schemapb.DataType_JSON: - jsonData := make([][]byte, 0) - for i := 0; i < rowCount; i++ { - jsonData = append(jsonData, []byte(fmt.Sprintf("{\"y\": %d}", i))) - } - err := writeFn(filePath, jsonData) - if err != nil { - return err - } - case schemapb.DataType_Array: - arrayData := make([]*schemapb.ScalarField, 0) - for i := 0; i < rowCount; i++ { - arrayData = append(arrayData, &schemapb.ScalarField{ - Data: &schemapb.ScalarField_IntData{ - IntData: &schemapb.IntArray{ - Data: []int32{int32(i), int32(i + 1), int32(i + 2)}, - }, - }, - }) + rows := len(vecData) / dim + floatVecData := make([][dim]float32, 0, rows) + for i := 0; i < rows; i++ { + rowVec := [dim]float32{} + copy(rowVec[:], vecData[i*dim:(i+1)*dim]) + floatVecData = append(floatVecData, rowVec) + } + data = floatVecData + case schemapb.DataType_Float16Vector: + vecData := insertData.Data[fieldID].(*storage.Float16VectorFieldData).Data + if dim != insertData.Data[fieldID].(*storage.Float16VectorFieldData).Dim { + panic(fmt.Sprintf("dim mis-match: %d, %d", dim, insertData.Data[fieldID].(*storage.Float16VectorFieldData).Dim)) + } + const rowBytes = dim * 2 + rows := len(vecData) / rowBytes + float16VecData := make([][rowBytes]byte, 0, rows) + for i := 0; i < rows; i++ { + rowVec := [rowBytes]byte{} + copy(rowVec[:], vecData[i*rowBytes:(i+1)*rowBytes]) + float16VecData = append(float16VecData, rowVec) + } + data = float16VecData + case schemapb.DataType_BFloat16Vector: + vecData := insertData.Data[fieldID].(*storage.BFloat16VectorFieldData).Data + if dim != insertData.Data[fieldID].(*storage.BFloat16VectorFieldData).Dim { + panic(fmt.Sprintf("dim mis-match: %d, %d", dim, insertData.Data[fieldID].(*storage.BFloat16VectorFieldData).Dim)) + } + const rowBytes = dim * 2 + rows := len(vecData) / rowBytes + bfloat16VecData := make([][rowBytes]byte, 0, rows) + for i := 0; i < rows; i++ { + rowVec := [rowBytes]byte{} + copy(rowVec[:], vecData[i*rowBytes:(i+1)*rowBytes]) + bfloat16VecData = append(bfloat16VecData, rowVec) + } + data = bfloat16VecData + case schemapb.DataType_SparseFloatVector: + data = insertData.Data[fieldID].(*storage.SparseFloatVectorFieldData).GetContents() + case schemapb.DataType_JSON: + data = insertData.Data[fieldID].(*storage.JSONFieldData).Data + case schemapb.DataType_Array: + data = insertData.Data[fieldID].(*storage.ArrayFieldData).Data + default: + panic(fmt.Sprintf("unsupported data type: %s", dType.String())) } - err := writeFn(filePath, arrayData) + + err := writeFn(path, data) if err != nil { - return err + return nil, err } - default: - panic(fmt.Sprintf("unimplemented data type: %s", dType.String())) + paths = append(paths, path) } - - return nil + return &internalpb.ImportFile{ + Paths: paths, + }, nil } func GenerateJSONFile(t *testing.T, filePath string, schema *schemapb.CollectionSchema, count int) { - insertData := createInsertData(t, schema, count) + insertData, err := testutil.CreateInsertData(schema, count) + assert.NoError(t, err) rows := make([]map[string]any, 0, count) fieldIDToField := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 { return field.GetFieldID()