From 77eda3c9000f25428fd43d31e1b23f0343f0180b Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Thu, 11 Apr 2024 10:31:18 +0800 Subject: [PATCH] enhance: Add binlog import intergration test (#32112) issue: https://github.com/milvus-io/milvus/issues/28521 --------- Signed-off-by: bigsheeper --- .../util/importutilv2/parquet/reader_test.go | 512 +++++++++++------- tests/integration/import/binlog_test.go | 255 +++++++++ tests/integration/util_query.go | 23 + 3 files changed, 599 insertions(+), 191 deletions(-) create mode 100644 tests/integration/import/binlog_test.go diff --git a/internal/util/importutilv2/parquet/reader_test.go b/internal/util/importutilv2/parquet/reader_test.go index 9240b6dccee90..3ec0541b85eea 100644 --- a/internal/util/importutilv2/parquet/reader_test.go +++ b/internal/util/importutilv2/parquet/reader_test.go @@ -30,8 +30,10 @@ import ( "github.com/apache/arrow/go/v12/arrow/memory" "github.com/apache/arrow/go/v12/parquet" "github.com/apache/arrow/go/v12/parquet/pqarrow" + "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" + "golang.org/x/exp/slices" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" @@ -147,218 +149,347 @@ func randomString(length int) string { return string(b) } -func buildArrayData(dataType, elementType schemapb.DataType, dim, rows int, isBinary bool) arrow.Array { +func buildArrayData(schema *schemapb.CollectionSchema, rows int) ([]arrow.Array, *storage.InsertData, error) { mem := memory.NewGoAllocator() - switch dataType { - case schemapb.DataType_Bool: - builder := array.NewBooleanBuilder(mem) - for i := 0; i < rows; i++ { - builder.Append(i%2 == 0) - } - return builder.NewBooleanArray() - case schemapb.DataType_Int8: - builder := array.NewInt8Builder(mem) - for i := 0; i < rows; i++ { - builder.Append(int8(i)) - } - return builder.NewInt8Array() - case schemapb.DataType_Int16: - builder := array.NewInt16Builder(mem) - for i := 0; i < rows; i++ { - builder.Append(int16(i)) - } - return builder.NewInt16Array() - case schemapb.DataType_Int32: - builder := array.NewInt32Builder(mem) - for i := 0; i < rows; i++ { - builder.Append(int32(i)) - } - return builder.NewInt32Array() - case schemapb.DataType_Int64: - builder := array.NewInt64Builder(mem) - for i := 0; i < rows; i++ { - builder.Append(int64(i)) - } - return builder.NewInt64Array() - case schemapb.DataType_Float: - builder := array.NewFloat32Builder(mem) - for i := 0; i < rows; i++ { - builder.Append(float32(i) * 0.1) - } - return builder.NewFloat32Array() - case schemapb.DataType_Double: - builder := array.NewFloat64Builder(mem) - for i := 0; i < rows; i++ { - builder.Append(float64(i) * 0.02) - } - return builder.NewFloat64Array() - case schemapb.DataType_VarChar, schemapb.DataType_String: - builder := array.NewStringBuilder(mem) - for i := 0; i < rows; i++ { - builder.Append(randomString(10)) - } - return builder.NewStringArray() - case schemapb.DataType_FloatVector: - builder := array.NewListBuilder(mem, &arrow.Float32Type{}) - offsets := make([]int32, 0, rows) - valid := make([]bool, 0, rows) - for i := 0; i < dim*rows; i++ { - builder.ValueBuilder().(*array.Float32Builder).Append(float32(i)) - } - for i := 0; i < rows; i++ { - offsets = append(offsets, int32(i*dim)) - valid = append(valid, true) - } - builder.AppendValues(offsets, valid) - return builder.NewListArray() - case schemapb.DataType_BinaryVector: - if isBinary { - builder := array.NewBinaryBuilder(mem, &arrow.BinaryType{}) - for i := 0; i < rows; i++ { - element := make([]byte, dim/8) - for j := 0; j < dim/8; j++ { - element[j] = randomString(1)[0] - } - builder.Append(element) - } - return builder.NewBinaryArray() - } - builder := array.NewListBuilder(mem, &arrow.Uint8Type{}) - offsets := make([]int32, 0, rows) - valid := make([]bool, 0) - for i := 0; i < dim*rows/8; i++ { - builder.ValueBuilder().(*array.Uint8Builder).Append(uint8(i)) - } - for i := 0; i < rows; i++ { - offsets = append(offsets, int32(dim*i/8)) - valid = append(valid, true) - } - builder.AppendValues(offsets, valid) - return builder.NewListArray() - case schemapb.DataType_JSON: - builder := array.NewStringBuilder(mem) - for i := 0; i < rows; i++ { - if i%4 == 0 { - v, _ := json.Marshal(fmt.Sprintf("{\"a\": \"%s\", \"b\": %d}", randomString(3), i)) - builder.Append(string(v)) - } else if i%4 == 1 { - v, _ := json.Marshal(i) - builder.Append(string(v)) - } else if i%4 == 2 { - v, _ := json.Marshal(float32(i) * 0.1) - builder.Append(string(v)) - } else if i%4 == 3 { - v, _ := json.Marshal(randomString(10)) - builder.Append(string(v)) + columns := make([]arrow.Array, 0, len(schema.Fields)) + insertData, err := storage.NewInsertData(schema) + if err != nil { + return nil, nil, err + } + for _, field := range schema.Fields { + dim := 1 + if typeutil.IsVectorType(field.GetDataType()) { + dim2, err := typeutil.GetDim(field) + if err != nil { + return nil, nil, err } + dim = int(dim2) } - return builder.NewStringArray() - case schemapb.DataType_Array: - offsets := make([]int32, 0, rows) - valid := make([]bool, 0, rows) - index := 0 - for i := 0; i < rows; i++ { - index += i % 10 - offsets = append(offsets, int32(index)) - valid = append(valid, true) - } - switch elementType { + dataType := field.GetDataType() + elementType := field.GetElementType() + isBinary := field.GetName() == "FieldBinaryVector2" + switch dataType { case schemapb.DataType_Bool: - builder := array.NewListBuilder(mem, &arrow.BooleanType{}) - valueBuilder := builder.ValueBuilder().(*array.BooleanBuilder) - for i := 0; i < index; i++ { - valueBuilder.Append(i%2 == 0) + builder := array.NewBooleanBuilder(mem) + boolData := make([]bool, 0) + for i := 0; i < rows; i++ { + boolData = append(boolData, i%2 == 0) } - builder.AppendValues(offsets, valid) - return builder.NewListArray() + insertData.Data[field.GetFieldID()] = &storage.BoolFieldData{Data: boolData} + builder.AppendValues(boolData, nil) + columns = append(columns, builder.NewBooleanArray()) case schemapb.DataType_Int8: - builder := array.NewListBuilder(mem, &arrow.Int8Type{}) - valueBuilder := builder.ValueBuilder().(*array.Int8Builder) - for i := 0; i < index; i++ { - valueBuilder.Append(int8(i)) + builder := array.NewInt8Builder(mem) + int8Data := make([]int8, 0) + for i := 0; i < rows; i++ { + int8Data = append(int8Data, int8(i)) } - builder.AppendValues(offsets, valid) - return builder.NewListArray() + insertData.Data[field.GetFieldID()] = &storage.Int8FieldData{Data: int8Data} + builder.AppendValues(int8Data, nil) + columns = append(columns, builder.NewInt8Array()) case schemapb.DataType_Int16: - builder := array.NewListBuilder(mem, &arrow.Int16Type{}) - valueBuilder := builder.ValueBuilder().(*array.Int16Builder) - for i := 0; i < index; i++ { - valueBuilder.Append(int16(i)) + int16Data := make([]int16, 0) + builder := array.NewInt16Builder(mem) + for i := 0; i < rows; i++ { + int16Data = append(int16Data, int16(i)) } - builder.AppendValues(offsets, valid) - return builder.NewListArray() + insertData.Data[field.GetFieldID()] = &storage.Int16FieldData{Data: int16Data} + builder.AppendValues(int16Data, nil) + columns = append(columns, builder.NewInt16Array()) case schemapb.DataType_Int32: - builder := array.NewListBuilder(mem, &arrow.Int32Type{}) - valueBuilder := builder.ValueBuilder().(*array.Int32Builder) - for i := 0; i < index; i++ { - valueBuilder.Append(int32(i)) + int32Data := make([]int32, 0) + builder := array.NewInt32Builder(mem) + for i := 0; i < rows; i++ { + int32Data = append(int32Data, int32(i)) } - builder.AppendValues(offsets, valid) - return builder.NewListArray() + insertData.Data[field.GetFieldID()] = &storage.Int32FieldData{Data: int32Data} + builder.AppendValues(int32Data, nil) + columns = append(columns, builder.NewInt32Array()) case schemapb.DataType_Int64: - builder := array.NewListBuilder(mem, &arrow.Int64Type{}) - valueBuilder := builder.ValueBuilder().(*array.Int64Builder) - for i := 0; i < index; i++ { - valueBuilder.Append(int64(i)) + int64Data := make([]int64, 0) + builder := array.NewInt64Builder(mem) + for i := 0; i < rows; i++ { + int64Data = append(int64Data, int64(i)) } - builder.AppendValues(offsets, valid) - return builder.NewListArray() + insertData.Data[field.GetFieldID()] = &storage.Int64FieldData{Data: int64Data} + builder.AppendValues(int64Data, nil) + columns = append(columns, builder.NewInt64Array()) case schemapb.DataType_Float: - builder := array.NewListBuilder(mem, &arrow.Float32Type{}) - valueBuilder := builder.ValueBuilder().(*array.Float32Builder) - for i := 0; i < index; i++ { - valueBuilder.Append(float32(i) * 0.1) + floatData := make([]float32, 0) + builder := array.NewFloat32Builder(mem) + for i := 0; i < rows; i++ { + floatData = append(floatData, float32(i)*0.1) } - builder.AppendValues(offsets, valid) - return builder.NewListArray() + insertData.Data[field.GetFieldID()] = &storage.FloatFieldData{Data: floatData} + builder.AppendValues(floatData, nil) + columns = append(columns, builder.NewFloat32Array()) case schemapb.DataType_Double: - builder := array.NewListBuilder(mem, &arrow.Float64Type{}) - valueBuilder := builder.ValueBuilder().(*array.Float64Builder) - for i := 0; i < index; i++ { - valueBuilder.Append(float64(i) * 0.02) + doubleData := make([]float64, 0) + builder := array.NewFloat64Builder(mem) + for i := 0; i < rows; i++ { + doubleData = append(doubleData, float64(i)*0.02) } - builder.AppendValues(offsets, valid) - return builder.NewListArray() + insertData.Data[field.GetFieldID()] = &storage.DoubleFieldData{Data: doubleData} + builder.AppendValues(doubleData, nil) + columns = append(columns, builder.NewFloat64Array()) case schemapb.DataType_VarChar, schemapb.DataType_String: - builder := array.NewListBuilder(mem, &arrow.StringType{}) - valueBuilder := builder.ValueBuilder().(*array.StringBuilder) - for i := 0; i < index; i++ { - valueBuilder.Append(randomString(5) + "-" + fmt.Sprintf("%d", i)) + varcharData := make([]string, 0) + builder := array.NewStringBuilder(mem) + for i := 0; i < rows; i++ { + varcharData = append(varcharData, randomString(10)) } + insertData.Data[field.GetFieldID()] = &storage.StringFieldData{Data: varcharData} + builder.AppendValues(varcharData, nil) + columns = append(columns, builder.NewStringArray()) + case schemapb.DataType_FloatVector: + floatVecData := make([]float32, 0) + builder := array.NewListBuilder(mem, &arrow.Float32Type{}) + offsets := make([]int32, 0, rows) + valid := make([]bool, 0, rows) + for i := 0; i < dim*rows; i++ { + floatVecData = append(floatVecData, float32(i)) + } + builder.ValueBuilder().(*array.Float32Builder).AppendValues(floatVecData, nil) + for i := 0; i < rows; i++ { + offsets = append(offsets, int32(i*dim)) + valid = append(valid, true) + } + insertData.Data[field.GetFieldID()] = &storage.FloatVectorFieldData{Data: floatVecData, Dim: dim} builder.AppendValues(offsets, valid) - return builder.NewListArray() + columns = append(columns, builder.NewListArray()) + case schemapb.DataType_BinaryVector: + if isBinary { + binVecData := make([][]byte, 0) + builder := array.NewBinaryBuilder(mem, &arrow.BinaryType{}) + for i := 0; i < rows; i++ { + element := make([]byte, dim/8) + for j := 0; j < dim/8; j++ { + element[j] = randomString(1)[0] + } + binVecData = append(binVecData, element) + } + builder.AppendValues(binVecData, nil) + columns = append(columns, builder.NewBinaryArray()) + insertData.Data[field.GetFieldID()] = &storage.BinaryVectorFieldData{Data: lo.Flatten(binVecData), Dim: dim} + } else { + binVecData := make([]byte, 0) + builder := array.NewListBuilder(mem, &arrow.Uint8Type{}) + offsets := make([]int32, 0, rows) + valid := make([]bool, 0) + for i := 0; i < dim*rows/8; i++ { + binVecData = append(binVecData, uint8(i)) + } + builder.ValueBuilder().(*array.Uint8Builder).AppendValues(binVecData, nil) + for i := 0; i < rows; i++ { + offsets = append(offsets, int32(dim*i/8)) + valid = append(valid, true) + } + builder.AppendValues(offsets, valid) + columns = append(columns, builder.NewListArray()) + insertData.Data[field.GetFieldID()] = &storage.BinaryVectorFieldData{Data: binVecData, Dim: dim} + } + case schemapb.DataType_JSON: + jsonData := make([][]byte, 0) + builder := array.NewStringBuilder(mem) + for i := 0; i < rows; i++ { + if i%4 == 0 { + v, _ := json.Marshal(fmt.Sprintf("{\"a\": \"%s\", \"b\": %d}", randomString(3), i)) + jsonData = append(jsonData, v) + } else if i%4 == 1 { + v, _ := json.Marshal(i) + jsonData = append(jsonData, v) + } else if i%4 == 2 { + v, _ := json.Marshal(float32(i) * 0.1) + jsonData = append(jsonData, v) + } else if i%4 == 3 { + v, _ := json.Marshal(randomString(10)) + jsonData = append(jsonData, v) + } + } + insertData.Data[field.GetFieldID()] = &storage.JSONFieldData{Data: jsonData} + builder.AppendValues(lo.Map(jsonData, func(bs []byte, _ int) string { + return string(bs) + }), nil) + columns = append(columns, builder.NewStringArray()) + case schemapb.DataType_Array: + offsets := make([]int32, 0, rows) + valid := make([]bool, 0, rows) + index := 0 + for i := 0; i < rows; i++ { + offsets = append(offsets, int32(index)) + valid = append(valid, true) + index += 3 + } + arrayData := make([]*schemapb.ScalarField, 0) + switch elementType { + case schemapb.DataType_Bool: + builder := array.NewListBuilder(mem, &arrow.BooleanType{}) + valueBuilder := builder.ValueBuilder().(*array.BooleanBuilder) + for i := 0; i < rows; i++ { + data := []bool{i%2 == 0, i%3 == 0, i%4 == 0} + arrayData = append(arrayData, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_BoolData{ + BoolData: &schemapb.BoolArray{ + Data: data, + }, + }, + }) + valueBuilder.AppendValues(data, nil) + } + insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} + builder.AppendValues(offsets, valid) + columns = append(columns, builder.NewListArray()) + case schemapb.DataType_Int8: + builder := array.NewListBuilder(mem, &arrow.Int8Type{}) + valueBuilder := builder.ValueBuilder().(*array.Int8Builder) + for i := 0; i < rows; i++ { + data := []int32{int32(i), int32(i + 1), int32(i + 2)} + data2 := []int8{int8(i), int8(i + 1), int8(i + 2)} + arrayData = append(arrayData, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: data, + }, + }, + }) + valueBuilder.AppendValues(data2, nil) + } + insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} + builder.AppendValues(offsets, valid) + columns = append(columns, builder.NewListArray()) + case schemapb.DataType_Int16: + builder := array.NewListBuilder(mem, &arrow.Int16Type{}) + valueBuilder := builder.ValueBuilder().(*array.Int16Builder) + for i := 0; i < rows; i++ { + data := []int32{int32(i), int32(i + 1), int32(i + 2)} + data2 := []int16{int16(i), int16(i + 1), int16(i + 2)} + arrayData = append(arrayData, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: data, + }, + }, + }) + valueBuilder.AppendValues(data2, nil) + } + insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} + builder.AppendValues(offsets, valid) + columns = append(columns, builder.NewListArray()) + case schemapb.DataType_Int32: + builder := array.NewListBuilder(mem, &arrow.Int32Type{}) + valueBuilder := builder.ValueBuilder().(*array.Int32Builder) + for i := 0; i < rows; i++ { + data := []int32{int32(i), int32(i + 1), int32(i + 2)} + arrayData = append(arrayData, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: data, + }, + }, + }) + valueBuilder.AppendValues(data, nil) + } + insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} + builder.AppendValues(offsets, valid) + columns = append(columns, builder.NewListArray()) + case schemapb.DataType_Int64: + builder := array.NewListBuilder(mem, &arrow.Int64Type{}) + valueBuilder := builder.ValueBuilder().(*array.Int64Builder) + for i := 0; i < rows; i++ { + data := []int32{int32(i), int32(i + 1), int32(i + 2)} + data2 := []int64{int64(i), int64(i + 1), int64(i + 2)} + arrayData = append(arrayData, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{ + Data: data, + }, + }, + }) + valueBuilder.AppendValues(data2, nil) + } + insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} + builder.AppendValues(offsets, valid) + columns = append(columns, builder.NewListArray()) + case schemapb.DataType_Float: + builder := array.NewListBuilder(mem, &arrow.Float32Type{}) + valueBuilder := builder.ValueBuilder().(*array.Float32Builder) + for i := 0; i < rows; i++ { + data := []float32{float32(i) * 0.1, float32(i+1) * 0.1, float32(i+2) * 0.1} + arrayData = append(arrayData, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_FloatData{ + FloatData: &schemapb.FloatArray{ + Data: data, + }, + }, + }) + valueBuilder.AppendValues(data, nil) + } + insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} + builder.AppendValues(offsets, valid) + columns = append(columns, builder.NewListArray()) + case schemapb.DataType_Double: + builder := array.NewListBuilder(mem, &arrow.Float64Type{}) + valueBuilder := builder.ValueBuilder().(*array.Float64Builder) + for i := 0; i < rows; i++ { + data := []float64{float64(i) * 0.02, float64(i+1) * 0.02, float64(i+2) * 0.02} + arrayData = append(arrayData, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_DoubleData{ + DoubleData: &schemapb.DoubleArray{ + Data: data, + }, + }, + }) + valueBuilder.AppendValues(data, nil) + } + insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} + builder.AppendValues(offsets, valid) + columns = append(columns, builder.NewListArray()) + case schemapb.DataType_VarChar, schemapb.DataType_String: + builder := array.NewListBuilder(mem, &arrow.StringType{}) + valueBuilder := builder.ValueBuilder().(*array.StringBuilder) + for i := 0; i < rows; i++ { + data := []string{ + randomString(5) + "-" + fmt.Sprintf("%d", i), + randomString(5) + "-" + fmt.Sprintf("%d", i), + randomString(5) + "-" + fmt.Sprintf("%d", i), + } + arrayData = append(arrayData, &schemapb.ScalarField{ + Data: &schemapb.ScalarField_StringData{ + StringData: &schemapb.StringArray{ + Data: data, + }, + }, + }) + valueBuilder.AppendValues(data, nil) + } + insertData.Data[field.GetFieldID()] = &storage.ArrayFieldData{Data: arrayData} + builder.AppendValues(offsets, valid) + columns = append(columns, builder.NewListArray()) + } } } - return nil + return columns, insertData, nil } -func writeParquet(w io.Writer, schema *schemapb.CollectionSchema, numRows int) error { +func writeParquet(w io.Writer, schema *schemapb.CollectionSchema, numRows int) (*storage.InsertData, error) { pqSchema := convertMilvusSchemaToArrowSchema(schema) fw, err := pqarrow.NewFileWriter(pqSchema, w, parquet.NewWriterProperties(parquet.WithMaxRowGroupLength(int64(numRows))), pqarrow.DefaultWriterProps()) if err != nil { - return err + return nil, err } defer fw.Close() - columns := make([]arrow.Array, 0, len(schema.Fields)) - for _, field := range schema.Fields { - var dim int64 = 1 - if typeutil.IsVectorType(field.GetDataType()) { - dim, err = typeutil.GetDim(field) - if err != nil { - return err - } - } - columnData := buildArrayData(field.DataType, field.ElementType, int(dim), numRows, field.Name == "FieldBinaryVector2") - columns = append(columns, columnData) + columns, insertData, err := buildArrayData(schema, numRows) + if err != nil { + return nil, err } recordBatch := array.NewRecord(pqSchema, columns, int64(numRows)) err = fw.Write(recordBatch) if err != nil { - return err + return nil, err } - return nil + return insertData, nil } func (s *ReaderSuite) run(dt schemapb.DataType) { @@ -406,7 +537,7 @@ func (s *ReaderSuite) run(dt schemapb.DataType) { defer os.Remove(filePath) wf, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0o666) assert.NoError(s.T(), err) - err = writeParquet(wf, schema, s.numRows) + insertData, err := writeParquet(wf, schema, s.numRows) assert.NoError(s.T(), err) ctx := context.Background() @@ -417,20 +548,19 @@ func (s *ReaderSuite) run(dt schemapb.DataType) { s.NoError(err) checkFn := func(actualInsertData *storage.InsertData, offsetBegin, expectRows int) { - // expectInsertData := insertData - for _, data := range actualInsertData.Data { + expectInsertData := insertData + for fieldID, data := range actualInsertData.Data { s.Equal(expectRows, data.RowNum()) - // TODO: dyh, check rows - // fieldDataType := typeutil.GetField(schema, fieldID).GetDataType() - // for i := 0; i < expectRows; i++ { - // expect := expectInsertData.Data[fieldID].GetRow(i + offsetBegin) - // actual := data.GetRow(i) - // if fieldDataType == schemapb.DataType_Array { - // s.True(slices.Equal(expect.(*schemapb.ScalarField).GetIntData().GetData(), actual.(*schemapb.ScalarField).GetIntData().GetData())) - // } else { - // s.Equal(expect, actual) - // } - // } + fieldDataType := typeutil.GetField(schema, fieldID).GetDataType() + for i := 0; i < expectRows; i++ { + expect := expectInsertData.Data[fieldID].GetRow(i + offsetBegin) + actual := data.GetRow(i) + if fieldDataType == schemapb.DataType_Array { + s.True(slices.Equal(expect.(*schemapb.ScalarField).GetIntData().GetData(), actual.(*schemapb.ScalarField).GetIntData().GetData())) + } else { + s.Equal(expect, actual) + } + } } } @@ -485,7 +615,7 @@ func (s *ReaderSuite) failRun(dt schemapb.DataType, isDynamic bool) { defer os.Remove(filePath) wf, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0o666) assert.NoError(s.T(), err) - err = writeParquet(wf, schema, s.numRows) + _, err = writeParquet(wf, schema, s.numRows) assert.NoError(s.T(), err) ctx := context.Background() diff --git a/tests/integration/import/binlog_test.go b/tests/integration/import/binlog_test.go new file mode 100644 index 0000000000000..ae8cfae10da39 --- /dev/null +++ b/tests/integration/import/binlog_test.go @@ -0,0 +1,255 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importv2 + +import ( + "context" + "fmt" + "time" + + "github.com/golang/protobuf/proto" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metric" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/tests/integration" +) + +func (s *BulkInsertSuite) PrepareCollectionA() (int64, int64) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := s.Cluster + + const ( + dim = 128 + dbName = "" + rowNum = 50000 + ) + + collectionName := "TestBinlogImport_A_" + funcutil.GenRandomStr() + + schema := integration.ConstructSchema(collectionName, dim, true) + marshaledSchema, err := proto.Marshal(schema) + s.NoError(err) + + createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: common.DefaultShardsNum, + }) + s.NoError(err) + s.Equal(int32(0), createCollectionStatus.GetCode()) + + showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) + s.NoError(err) + s.Equal(int32(0), showCollectionsResp.GetStatus().GetCode()) + log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) + + showPartitionsResp, err := c.Proxy.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{ + CollectionName: collectionName, + }) + s.NoError(err) + s.Equal(int32(0), showPartitionsResp.GetStatus().GetCode()) + log.Info("ShowPartitions result", zap.Any("showPartitionsResp", showPartitionsResp)) + + fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim) + hashKeys := integration.GenerateHashKeys(rowNum) + insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{ + DbName: dbName, + CollectionName: collectionName, + FieldsData: []*schemapb.FieldData{fVecColumn}, + HashKeys: hashKeys, + NumRows: uint32(rowNum), + }) + s.NoError(err) + s.Equal(int32(0), insertResult.GetStatus().GetCode()) + + // flush + flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ + DbName: dbName, + CollectionNames: []string{collectionName}, + }) + s.NoError(err) + segmentIDs, has := flushResp.GetCollSegIDs()[collectionName] + ids := segmentIDs.GetData() + s.Require().NotEmpty(segmentIDs) + s.Require().True(has) + flushTs, has := flushResp.GetCollFlushTs()[collectionName] + s.True(has) + + segments, err := c.MetaWatcher.ShowSegments() + s.NoError(err) + s.NotEmpty(segments) + for _, segment := range segments { + log.Info("ShowSegments result", zap.String("segment", segment.String())) + } + s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName) + + // create index + createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + CollectionName: collectionName, + FieldName: integration.FloatVecField, + IndexName: "_default", + ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, metric.L2), + }) + s.NoError(err) + s.Equal(int32(0), createIndexStatus.GetCode()) + + s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) + + // load + loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + s.Equal(int32(0), loadStatus.GetCode()) + s.WaitForLoad(ctx, collectionName) + + // search + expr := fmt.Sprintf("%s > 0", integration.Int64Field) + nq := 10 + topk := 10 + roundDecimal := -1 + + params := integration.GetSearchParams(integration.IndexFaissIvfFlat, metric.L2) + searchReq := integration.ConstructSearchRequest("", collectionName, expr, + integration.FloatVecField, schemapb.DataType_FloatVector, nil, metric.L2, params, nq, dim, topk, roundDecimal) + + searchResult, err := c.Proxy.Search(ctx, searchReq) + + err = merr.CheckRPCCall(searchResult, err) + s.NoError(err) + s.Equal(nq*topk, len(searchResult.GetResults().GetScores())) + + // get collectionID and partitionID + collectionID := showCollectionsResp.GetCollectionIds()[0] + partitionID := showPartitionsResp.GetPartitionIDs()[0] + return collectionID, partitionID +} + +func (s *BulkInsertSuite) TestBinlogImport() { + const ( + startTs = "0" + endTs = "548373346338803234" + ) + + collectionID, partitionID := s.PrepareCollectionA() + + c := s.Cluster + ctx, cancel := context.WithTimeout(c.GetContext(), 60*time.Second) + defer cancel() + + collectionName := "TestBulkInsert_B_" + funcutil.GenRandomStr() + + schema := integration.ConstructSchema(collectionName, dim, true) + marshaledSchema, err := proto.Marshal(schema) + s.NoError(err) + + createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + DbName: "", + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: common.DefaultShardsNum, + }) + s.NoError(err) + s.Equal(int32(0), createCollectionStatus.GetCode()) + + // create index + createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + CollectionName: collectionName, + FieldName: integration.FloatVecField, + IndexName: "_default", + ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, metric.L2), + }) + s.NoError(err) + s.Equal(int32(0), createIndexStatus.GetCode()) + + s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) + + // load + loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + CollectionName: collectionName, + }) + s.NoError(err) + s.Equal(commonpb.ErrorCode_Success, loadStatus.GetErrorCode()) + s.WaitForLoad(ctx, collectionName) + + files := []*internalpb.ImportFile{ + { + Paths: []string{ + fmt.Sprintf("/tmp/%s/insert_log/%d/%d/", paramtable.Get().EtcdCfg.RootPath.GetValue(), collectionID, partitionID), + fmt.Sprintf("/tmp/%s/delta_log/%d/%d/", paramtable.Get().EtcdCfg.RootPath.GetValue(), collectionID, partitionID), + }, + }, + } + importResp, err := c.Proxy.ImportV2(ctx, &internalpb.ImportRequest{ + CollectionName: collectionName, + Files: files, + Options: []*commonpb.KeyValuePair{ + {Key: "startTs", Value: startTs}, + {Key: "endTs", Value: endTs}, + {Key: "backup", Value: "true"}, + }, + }) + s.NoError(err) + s.Equal(int32(0), importResp.GetStatus().GetCode()) + log.Info("Import result", zap.Any("importResp", importResp)) + + jobID := importResp.GetJobID() + err = WaitForImportDone(ctx, c, jobID) + s.NoError(err) + + segments, err := c.MetaWatcher.ShowSegments() + s.NoError(err) + s.NotEmpty(segments) + log.Info("Show segments", zap.Any("segments", segments)) + + // load refresh + loadStatus, err = c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + CollectionName: collectionName, + Refresh: true, + }) + s.NoError(err) + s.Equal(commonpb.ErrorCode_Success, loadStatus.GetErrorCode()) + s.WaitForLoadRefresh(ctx, "", collectionName) + + // search + expr := fmt.Sprintf("%s > 0", integration.Int64Field) + nq := 10 + topk := 10 + roundDecimal := -1 + + params := integration.GetSearchParams(integration.IndexFaissIvfFlat, metric.L2) + searchReq := integration.ConstructSearchRequest("", collectionName, expr, + integration.FloatVecField, schemapb.DataType_FloatVector, nil, metric.L2, params, nq, dim, topk, roundDecimal) + + searchResult, err := c.Proxy.Search(ctx, searchReq) + + err = merr.CheckRPCCall(searchResult, err) + s.NoError(err) + s.Equal(nq*topk, len(searchResult.GetResults().GetScores())) +} diff --git a/tests/integration/util_query.go b/tests/integration/util_query.go index d4ce05ffd83d0..ead1e5f173b97 100644 --- a/tests/integration/util_query.go +++ b/tests/integration/util_query.go @@ -75,6 +75,29 @@ func (s *MiniClusterSuite) waitForLoadInternal(ctx context.Context, dbName, coll } } +func (s *MiniClusterSuite) WaitForLoadRefresh(ctx context.Context, dbName, collection string) { + cluster := s.Cluster + getLoadingProgress := func() *milvuspb.GetLoadingProgressResponse { + loadProgress, err := cluster.Proxy.GetLoadingProgress(ctx, &milvuspb.GetLoadingProgressRequest{ + DbName: dbName, + CollectionName: collection, + }) + if err != nil { + panic("GetLoadingProgress fail") + } + return loadProgress + } + for getLoadingProgress().GetRefreshProgress() != 100 { + select { + case <-ctx.Done(): + s.FailNow("failed to wait for load (refresh)") + return + default: + time.Sleep(500 * time.Millisecond) + } + } +} + func ConstructSearchRequest( dbName, collectionName string, expr string,