Skip to content

Commit

Permalink
feat: adding binlog streaming writer
Browse files Browse the repository at this point in the history
Signed-off-by: Ted Xu <ted.xu@zilliz.com>
  • Loading branch information
tedxu committed Mar 22, 2024
1 parent 5220005 commit cc76640
Show file tree
Hide file tree
Showing 3 changed files with 880 additions and 200 deletions.
34 changes: 24 additions & 10 deletions internal/storage/binlog_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import (

"github.com/stretchr/testify/assert"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/pkg/common"
)

func generateTestData(num int) ([]*Blob, error) {
func generateTestSchema() *schemapb.CollectionSchema {
schema := &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
{FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64},
{FieldID: common.RowIDField, Name: "rowid", DataType: schemapb.DataType_Int64},
Expand All @@ -42,12 +43,25 @@ func generateTestData(num int) ([]*Blob, error) {
{FieldID: 18, Name: "array", DataType: schemapb.DataType_Array},
{FieldID: 19, Name: "string", DataType: schemapb.DataType_JSON},
{FieldID: 101, Name: "int32", DataType: schemapb.DataType_Int32},
{FieldID: 102, Name: "floatVector", DataType: schemapb.DataType_FloatVector},
{FieldID: 103, Name: "binaryVector", DataType: schemapb.DataType_BinaryVector},
{FieldID: 104, Name: "float16Vector", DataType: schemapb.DataType_Float16Vector},
{FieldID: 105, Name: "bf16Vector", DataType: schemapb.DataType_BFloat16Vector},
{FieldID: 102, Name: "floatVector", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "8"},
}},
{FieldID: 103, Name: "binaryVector", DataType: schemapb.DataType_BinaryVector, TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "8"},
}},
{FieldID: 104, Name: "float16Vector", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "8"},
}},
{FieldID: 105, Name: "bf16Vector", DataType: schemapb.DataType_BFloat16Vector, TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "8"},
}},
}}
insertCodec := NewInsertCodecWithSchema(&etcdpb.CollectionMeta{ID: 1, Schema: schema})

return schema
}

func generateTestData(num int) ([]*Blob, error) {
insertCodec := NewInsertCodecWithSchema(&etcdpb.CollectionMeta{ID: 1, Schema: generateTestSchema()})

var (
field0 []int64
Expand Down Expand Up @@ -102,7 +116,7 @@ func generateTestData(num int) ([]*Blob, error) {
field102 = append(field102, f102...)
field103 = append(field103, 0xff)

f104 := make([]byte, 8)
f104 := make([]byte, 16)
for j := range f104 {
f104[j] = byte(i)
}
Expand Down Expand Up @@ -135,11 +149,11 @@ func generateTestData(num int) ([]*Blob, error) {
},
104: &Float16VectorFieldData{
Data: field104,
Dim: 4,
Dim: 8,
},
105: &BFloat16VectorFieldData{
Data: field105,
Dim: 4,
Dim: 8,
},
}}

Expand All @@ -154,7 +168,7 @@ func assertTestData(t *testing.T, i int, value *Value) {
f102[j] = float32(i)
}

f104 := make([]byte, 8)
f104 := make([]byte, 16)
for j := range f104 {
f104[j] = byte(i)
}
Expand Down
Loading

0 comments on commit cc76640

Please sign in to comment.