Skip to content

Commit

Permalink
enhance: Support Array DataType for bulk_insert (#28341)
Browse files Browse the repository at this point in the history
issue: #28272 
Support array DataType for bulk_insert with json, binlog files.

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
  • Loading branch information
xiaocai2333 authored Nov 27, 2023
1 parent 8fe2fb3 commit c29b60e
Show file tree
Hide file tree
Showing 25 changed files with 1,399 additions and 482 deletions.
231 changes: 139 additions & 92 deletions internal/util/importutil/binlog_adapter.go

Large diffs are not rendered by default.

83 changes: 81 additions & 2 deletions internal/util/importutil/binlog_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func Test_BinlogAdapterVerify(t *testing.T) {

// row id field missed
holder.fieldFiles = make(map[int64][]string)
for i := int64(102); i <= 112; i++ {
for i := int64(102); i <= 113; i++ {
holder.fieldFiles[i] = make([]string, 0)
}
err = adapter.verify(holder)
Expand All @@ -156,7 +156,7 @@ func Test_BinlogAdapterVerify(t *testing.T) {
assert.Error(t, err)

// succeed
for i := int64(102); i <= 112; i++ {
for i := int64(102); i <= 113; i++ {
holder.fieldFiles[i] = []string{
"a",
}
Expand Down Expand Up @@ -667,6 +667,7 @@ func Test_BinlogAdapterReadInt64PK(t *testing.T) {
int64(110): {"110_insertlog"},
int64(111): {"111_insertlog"},
int64(112): {"112_insertlog"},
int64(113): {"113_insertlog"},
}
holder.deltaFiles = []string{"deltalog"}
err = adapter.Read(holder)
Expand All @@ -689,6 +690,7 @@ func Test_BinlogAdapterReadInt64PK(t *testing.T) {
"110_insertlog": createBinlogBuf(t, schemapb.DataType_BinaryVector, fieldsData[110].([][]byte)),
"111_insertlog": createBinlogBuf(t, schemapb.DataType_FloatVector, fieldsData[111].([][]float32)),
"112_insertlog": createBinlogBuf(t, schemapb.DataType_JSON, fieldsData[112].([][]byte)),
"113_insertlog": createBinlogBuf(t, schemapb.DataType_Array, fieldsData[113].([]*schemapb.ScalarField)),
"deltalog": createDeltalogBuf(t, deletedItems, false),
}

Expand Down Expand Up @@ -1013,6 +1015,79 @@ func Test_BinlogAdapterDispatch(t *testing.T) {
assert.Equal(t, 0, shardsData[2][partitionID][fieldID].RowNum())
})

t.Run("dispatch Array data", func(t *testing.T) {
fieldID := int64(113)
// row count mismatch
data := []*schemapb.ScalarField{
{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: []int32{1, 2, 3, 4, 5},
},
},
},
{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: []int32{7, 8, 9},
},
},
},
{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: []int32{10, 11},
},
},
},
{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: []int32{},
},
},
},
}
err = adapter.dispatchArrayToShards(data, shardsData, shardList, fieldID)
assert.Error(t, err)
for _, shardData := range shardsData {
assert.Equal(t, 0, shardData[partitionID][fieldID].RowNum())
}

// illegal shard ID
err = adapter.dispatchArrayToShards(data, shardsData, []int32{9, 1, 0, 2}, fieldID)
assert.Error(t, err)

// succeed
err = adapter.dispatchArrayToShards([]*schemapb.ScalarField{
{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: []int32{},
},
},
},
{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: []int32{},
},
},
},
{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: []int32{},
},
},
},
}, shardsData, shardList, fieldID)
assert.NoError(t, err)
assert.Equal(t, 1, shardsData[0][partitionID][fieldID].RowNum())
assert.Equal(t, 1, shardsData[1][partitionID][fieldID].RowNum())
assert.Equal(t, 0, shardsData[2][partitionID][fieldID].RowNum())
})

t.Run("dispatch binary vector data", func(t *testing.T) {
fieldID := int64(110)
// row count mismatch
Expand Down Expand Up @@ -1186,6 +1261,10 @@ func Test_BinlogAdapterReadInsertlog(t *testing.T) {
failedFunc(111, "floatvector", schemapb.DataType_FloatVector, 110, schemapb.DataType_BinaryVector)
})

t.Run("failed to dispatch Array data", func(t *testing.T) {
failedFunc(113, "array", schemapb.DataType_Array, 111, schemapb.DataType_FloatVector)
})

// succeed
chunkManager.readBuf["int32"] = createBinlogBuf(t, schemapb.DataType_Int32, fieldsData[105].([]int32))
err = adapter.readInsertlog(105, "int32", shardsData, []int32{0, 1, 1})
Expand Down
Loading

0 comments on commit c29b60e

Please sign in to comment.