Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add import reader for numpy #29253

Merged
merged 20 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
539c837
feat: Add import reader for numpy
bigsheeper Dec 16, 2023
6d7faf6
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Dec 18, 2023
c97c26c
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Dec 18, 2023
925c38c
update
bigsheeper Dec 18, 2023
47e160c
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Dec 18, 2023
84eec47
code format
bigsheeper Dec 19, 2023
de37615
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Dec 19, 2023
9f0d59c
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Dec 26, 2023
f9c9614
fix conflicts
bigsheeper Jan 2, 2024
5ed38e2
fix
bigsheeper Jan 2, 2024
1504dca
code format
bigsheeper Jan 2, 2024
67dde84
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Jan 4, 2024
3207356
update after code review
bigsheeper Jan 4, 2024
9cb596b
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Jan 4, 2024
27e57a8
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Jan 4, 2024
2aedd74
rename columnReader to fieldReader
bigsheeper Jan 4, 2024
a432f22
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Jan 4, 2024
a21d704
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Jan 5, 2024
33e3691
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Jan 5, 2024
bd4d242
read by count
bigsheeper Jan 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
read by count
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
  • Loading branch information
bigsheeper committed Jan 5, 2024
commit bd4d2420b4c07b3e4ff1aa729f814c094fbe3dad
46 changes: 19 additions & 27 deletions internal/util/importutilv2/numpy/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,65 +26,57 @@
)

type Reader struct {
schema *schemapb.CollectionSchema
bufferSize int
frs map[int64]*FieldReader // fieldID -> FieldReader
schema *schemapb.CollectionSchema
count int64
frs map[int64]*FieldReader // fieldID -> FieldReader
}

func NewReader(schema *schemapb.CollectionSchema, readers map[int64]io.Reader, bufferSize int) (*Reader, error) {
fields := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 {
return field.GetFieldID()
})
count, err := calcRowCount(bufferSize, schema)
if err != nil {
return nil, err
}

Check warning on line 41 in internal/util/importutilv2/numpy/reader.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/reader.go#L40-L41

Added lines #L40 - L41 were not covered by tests
crs := make(map[int64]*FieldReader)
for fieldID, r := range readers {
cr, err := NewFieldReader(r, fields[fieldID])
if err != nil {
return nil, err
}

Check warning on line 47 in internal/util/importutilv2/numpy/reader.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/reader.go#L46-L47

Added lines #L46 - L47 were not covered by tests
crs[fieldID] = cr
}
return &Reader{
schema: schema,
bufferSize: bufferSize,
frs: crs,
schema: schema,
count: count,
frs: crs,
}, nil
}

func (r *Reader) Read() (*storage.InsertData, error) {
insertData, err := storage.NewInsertData(r.schema)
if err != nil {
return nil, err
}

Check warning on line 61 in internal/util/importutilv2/numpy/reader.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/reader.go#L60-L61

Added lines #L60 - L61 were not covered by tests
OUTER:
for {
for fieldID, cr := range r.frs {
data, err := cr.Next(1)
if err != nil {
return nil, err
}
if data == nil {
break OUTER
}
err = insertData.Data[fieldID].AppendRows(data)
if err != nil {
return nil, err
}
}
if insertData.GetMemorySize() >= r.bufferSize {
break
for fieldID, cr := range r.frs {
data, err := cr.Next(r.count)
if err != nil {
return nil, err
}

Check warning on line 66 in internal/util/importutilv2/numpy/reader.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/reader.go#L65-L66

Added lines #L65 - L66 were not covered by tests
}

for fieldID := range r.frs {
if insertData.Data[fieldID].RowNum() == 0 {
if data == nil {
return nil, nil
}

Check warning on line 69 in internal/util/importutilv2/numpy/reader.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/reader.go#L68-L69

Added lines #L68 - L69 were not covered by tests
err = insertData.Data[fieldID].AppendRows(data)
if err != nil {
return nil, err
}

Check warning on line 73 in internal/util/importutilv2/numpy/reader.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/reader.go#L72-L73

Added lines #L72 - L73 were not covered by tests
}
return insertData, nil
}

func (r *Reader) Close() {
for _, cr := range r.frs {
cr.Close()
}

Check warning on line 81 in internal/util/importutilv2/numpy/reader.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/reader.go#L78-L81

Added lines #L78 - L81 were not covered by tests
}
12 changes: 12 additions & 0 deletions internal/util/importutilv2/numpy/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) {
Name: "pk",
IsPrimaryKey: true,
DataType: suite.pkDataType,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "max_length",
Value: "256",
},
},
},
{
FieldID: 101,
Expand All @@ -199,6 +205,12 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) {
Name: dt.String(),
DataType: dt,
ElementType: schemapb.DataType_Int32,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "max_length",
Value: "256",
},
},
},
},
}
Expand Down
10 changes: 10 additions & 0 deletions internal/util/importutilv2/numpy/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

var (
Expand All @@ -42,48 +43,48 @@
func stringLen(dtype string) (int, bool, error) {
var utf bool
switch {
case reStrPre.MatchString(dtype), reStrPost.MatchString(dtype):
utf = false

Check warning on line 47 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L46-L47

Added lines #L46 - L47 were not covered by tests
case reUniPre.MatchString(dtype), reUniPost.MatchString(dtype):
utf = true
}

if m := reStrPre.FindStringSubmatch(dtype); m != nil {
v, err := strconv.Atoi(m[1])
if err != nil {
return 0, false, err
}
return v, utf, nil

Check warning on line 57 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L53-L57

Added lines #L53 - L57 were not covered by tests
}
if m := reStrPost.FindStringSubmatch(dtype); m != nil {
v, err := strconv.Atoi(m[1])
if err != nil {
return 0, false, err
}
return v, utf, nil

Check warning on line 64 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L60-L64

Added lines #L60 - L64 were not covered by tests
}
if m := reUniPre.FindStringSubmatch(dtype); m != nil {
v, err := strconv.Atoi(m[1])
if err != nil {
return 0, false, err
}
return v, utf, nil

Check warning on line 71 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L67-L71

Added lines #L67 - L71 were not covered by tests
}
if m := reUniPost.FindStringSubmatch(dtype); m != nil {
v, err := strconv.Atoi(m[1])
if err != nil {
return 0, false, err
}

Check warning on line 77 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L76-L77

Added lines #L76 - L77 were not covered by tests
return v, utf, nil
}

return 0, false, merr.WrapErrImportFailed(fmt.Sprintf("dtype '%s' of numpy file is not varchar data type", dtype))

Check warning on line 81 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L81

Added line #L81 was not covered by tests
}

func decodeUtf32(src []byte, order binary.ByteOrder) (string, error) {
if len(src)%4 != 0 {
return "", merr.WrapErrImportFailed(fmt.Sprintf("invalid utf32 bytes length %d, the byte array length should be multiple of 4", len(src)))
}

Check warning on line 87 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L86-L87

Added lines #L86 - L87 were not covered by tests

var str string
for len(src) > 0 {
Expand All @@ -96,12 +97,12 @@
isUtf16 = true
}
lowbytesPosition = 0
} else {
if src[0] == 0 && src[1] == 0 {
isUtf16 = true
}
lowbytesPosition = 2
uOrder = unicode.BigEndian

Check warning on line 105 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L100-L105

Added lines #L100 - L105 were not covered by tests
}

if isUtf16 {
Expand All @@ -111,26 +112,26 @@
decoder := unicode.UTF16(uOrder, unicode.IgnoreBOM).NewDecoder()
res, err := decoder.Bytes(src[lowbytesPosition : lowbytesPosition+2])
if err != nil {
return "", merr.WrapErrImportFailed(fmt.Sprintf("failed to decode utf32 binary bytes, error: %v", err))
}

Check warning on line 116 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L115-L116

Added lines #L115 - L116 were not covered by tests
str += string(res)
}
} else {
// convert the 4 bytes to a unicode and encode to utf8
// Golang strongly opposes utf32 coding, this kind of encoding has been excluded from standard lib
var x uint32
if order == binary.LittleEndian {
x = uint32(src[3])<<24 | uint32(src[2])<<16 | uint32(src[1])<<8 | uint32(src[0])
} else {
x = uint32(src[0])<<24 | uint32(src[1])<<16 | uint32(src[2])<<8 | uint32(src[3])
}
r := rune(x)
utf8Code := make([]byte, 4)
utf8.EncodeRune(utf8Code, r)
if r == utf8.RuneError {
return "", merr.WrapErrImportFailed(fmt.Sprintf("failed to convert 4 bytes unicode %d to utf8 rune", x))
}
str += string(utf8Code)

Check warning on line 134 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L119-L134

Added lines #L119 - L134 were not covered by tests
}

src = src[4:]
Expand Down Expand Up @@ -164,69 +165,78 @@
// Note: JSON field and VARCHAR field are using string type numpy
return schemapb.DataType_VarChar, nil
}
return schemapb.DataType_None, merr.WrapErrImportFailed(
fmt.Sprintf("the numpy file dtype '%s' is not supported", typeStr))

Check warning on line 169 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L168-L169

Added lines #L168 - L169 were not covered by tests
}
}

func wrapElementTypeError(eleType schemapb.DataType, field *schemapb.FieldSchema) error {
return merr.WrapErrImportFailed(fmt.Sprintf("expected element type '%s' for field '%s', got type '%T'",
field.GetDataType().String(), field.GetName(), eleType))

Check warning on line 175 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L173-L175

Added lines #L173 - L175 were not covered by tests
}

func wrapDimError(actualDim int, expectDim int, field *schemapb.FieldSchema) error {
return merr.WrapErrImportFailed(fmt.Sprintf("expected dim '%d' for %s field '%s', got dim '%d'",
expectDim, field.GetDataType().String(), field.GetName(), actualDim))

Check warning on line 180 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L178-L180

Added lines #L178 - L180 were not covered by tests
}

func wrapShapeError(actualShape int, expectShape int, field *schemapb.FieldSchema) error {
return merr.WrapErrImportFailed(fmt.Sprintf("expected shape '%d' for %s field '%s', got shape '%d'",
expectShape, field.GetDataType().String(), field.GetName(), actualShape))

Check warning on line 185 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L183-L185

Added lines #L183 - L185 were not covered by tests
}

func validateHeader(npyReader *npy.Reader, field *schemapb.FieldSchema, dim int) error {
elementType, err := convertNumpyType(npyReader.Header.Descr.Type)
if err != nil {
return err
}

Check warning on line 192 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L191-L192

Added lines #L191 - L192 were not covered by tests
shape := npyReader.Header.Descr.Shape

switch field.GetDataType() {
case schemapb.DataType_FloatVector:
if elementType != schemapb.DataType_Float && elementType != schemapb.DataType_Double {
return wrapElementTypeError(elementType, field)
}

Check warning on line 199 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L198-L199

Added lines #L198 - L199 were not covered by tests
if len(shape) != 2 {
return wrapShapeError(len(shape), 2, field)
}

Check warning on line 202 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L201-L202

Added lines #L201 - L202 were not covered by tests
if shape[1] != dim {
return wrapDimError(shape[1], dim, field)
}

Check warning on line 205 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L204-L205

Added lines #L204 - L205 were not covered by tests
case schemapb.DataType_BinaryVector:
if elementType != schemapb.DataType_BinaryVector {
return wrapElementTypeError(elementType, field)
}

Check warning on line 209 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L208-L209

Added lines #L208 - L209 were not covered by tests
if len(shape) != 2 {
return wrapShapeError(len(shape), 2, field)
}

Check warning on line 212 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L211-L212

Added lines #L211 - L212 were not covered by tests
if shape[1] != dim/8 {
return wrapDimError(shape[1]*8, dim, field)
}

Check warning on line 215 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L214-L215

Added lines #L214 - L215 were not covered by tests
case schemapb.DataType_VarChar, schemapb.DataType_JSON:
if len(shape) != 1 {
return wrapShapeError(len(shape), 1, field)
}

Check warning on line 219 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L218-L219

Added lines #L218 - L219 were not covered by tests
case schemapb.DataType_None, schemapb.DataType_Array,
schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
return merr.WrapErrImportFailed(fmt.Sprintf("unsupported data type: %s", field.GetDataType().String()))

Check warning on line 222 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L221-L222

Added lines #L221 - L222 were not covered by tests

default:
if elementType != field.GetDataType() {
return wrapElementTypeError(elementType, field)
}

Check warning on line 227 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L226-L227

Added lines #L226 - L227 were not covered by tests
if len(shape) != 1 {
return wrapShapeError(len(shape), 1, field)
}

Check warning on line 230 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L229-L230

Added lines #L229 - L230 were not covered by tests
}
return nil
}

func calcRowCount(bufferSize int, schema *schemapb.CollectionSchema) (int64, error) {
sizePerRecord, err := typeutil.EstimateSizePerRecord(schema)
if err != nil {
return 0, err
}

Check warning on line 239 in internal/util/importutilv2/numpy/util.go

View check run for this annotation

Codecov / codecov/patch

internal/util/importutilv2/numpy/util.go#L238-L239

Added lines #L238 - L239 were not covered by tests
rowCount := int64(bufferSize) / int64(sizePerRecord)
return rowCount, nil
}
Loading