Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add import reader for numpy #29253

Merged
merged 20 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
539c837
feat: Add import reader for numpy
bigsheeper Dec 16, 2023
6d7faf6
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Dec 18, 2023
c97c26c
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Dec 18, 2023
925c38c
update
bigsheeper Dec 18, 2023
47e160c
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Dec 18, 2023
84eec47
code format
bigsheeper Dec 19, 2023
de37615
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Dec 19, 2023
9f0d59c
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Dec 26, 2023
f9c9614
fix conflicts
bigsheeper Jan 2, 2024
5ed38e2
fix
bigsheeper Jan 2, 2024
1504dca
code format
bigsheeper Jan 2, 2024
67dde84
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Jan 4, 2024
3207356
update after code review
bigsheeper Jan 4, 2024
9cb596b
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Jan 4, 2024
27e57a8
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Jan 4, 2024
2aedd74
rename columnReader to fieldReader
bigsheeper Jan 4, 2024
a432f22
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Jan 4, 2024
a21d704
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Jan 5, 2024
33e3691
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Jan 5, 2024
bd4d242
read by count
bigsheeper Jan 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
rename columnReader to fieldReader
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
  • Loading branch information
bigsheeper committed Jan 4, 2024
commit 2aedd741eb0f9f898350073f1368d321d0dbb662
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type ColumnReader struct {
type FieldReader struct {
reader io.Reader
npyReader *npy.Reader
order binary.ByteOrder
Expand All @@ -44,7 +44,7 @@ type ColumnReader struct {
readPosition int
}

func NewColumnReader(reader io.Reader, field *schemapb.FieldSchema) (*ColumnReader, error) {
func NewFieldReader(reader io.Reader, field *schemapb.FieldSchema) (*FieldReader, error) {
r, err := npyio.NewReader(reader)
if err != nil {
return nil, err
Expand All @@ -63,7 +63,7 @@ func NewColumnReader(reader io.Reader, field *schemapb.FieldSchema) (*ColumnRead
return nil, err
}

cr := &ColumnReader{
cr := &FieldReader{
reader: reader,
npyReader: r,
dim: dim,
Expand All @@ -82,7 +82,7 @@ func ReadN[T any](reader io.Reader, order binary.ByteOrder, n int64) ([]T, error
return data, nil
}

func (c *ColumnReader) getCount(count int64) int64 {
func (c *FieldReader) getCount(count int64) int64 {
shape := c.npyReader.Header.Descr.Shape
if len(shape) == 0 {
return 0
Expand All @@ -105,7 +105,7 @@ func (c *ColumnReader) getCount(count int64) int64 {
return count
}

func (c *ColumnReader) Next(count int64) (any, error) {
func (c *FieldReader) Next(count int64) (any, error) {
readCount := c.getCount(count)
if readCount == 0 {
return nil, nil
Expand Down Expand Up @@ -225,10 +225,10 @@ func (c *ColumnReader) Next(count int64) (any, error) {
return data, nil
}

func (c *ColumnReader) Close() {}
func (c *FieldReader) Close() {}

// setByteOrder sets BigEndian/LittleEndian, the logic of this method is copied from npyio lib
func (c *ColumnReader) setByteOrder() {
func (c *FieldReader) setByteOrder() {
var nativeEndian binary.ByteOrder
v := uint16(1)
switch byte(v >> 8) {
Expand All @@ -248,7 +248,7 @@ func (c *ColumnReader) setByteOrder() {
}
}

func (c *ColumnReader) ReadString(count int64) ([]string, error) {
func (c *FieldReader) ReadString(count int64) ([]string, error) {
// varchar length, this is the max length, some item is shorter than this length, but they also occupy bytes of max length
maxLen, utf, err := stringLen(c.npyReader.Header.Descr.Type)
if err != nil || maxLen <= 0 {
Expand Down
14 changes: 7 additions & 7 deletions internal/util/importutilv2/numpy/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ import (
type Reader struct {
schema *schemapb.CollectionSchema
bufferSize int
crs map[int64]*ColumnReader // fieldID -> ColumnReader
frs map[int64]*FieldReader // fieldID -> FieldReader
}

func NewReader(schema *schemapb.CollectionSchema, readers map[int64]io.Reader, bufferSize int) (*Reader, error) {
fields := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 {
return field.GetFieldID()
})
crs := make(map[int64]*ColumnReader)
crs := make(map[int64]*FieldReader)
for fieldID, r := range readers {
cr, err := NewColumnReader(r, fields[fieldID])
cr, err := NewFieldReader(r, fields[fieldID])
if err != nil {
return nil, err
}
Expand All @@ -46,7 +46,7 @@ func NewReader(schema *schemapb.CollectionSchema, readers map[int64]io.Reader, b
return &Reader{
schema: schema,
bufferSize: bufferSize,
crs: crs,
frs: crs,
}, nil
}

Expand All @@ -57,7 +57,7 @@ func (r *Reader) Read() (*storage.InsertData, error) {
}
OUTER:
for {
for fieldID, cr := range r.crs {
for fieldID, cr := range r.frs {
data, err := cr.Next(1)
if err != nil {
return nil, err
Expand All @@ -75,7 +75,7 @@ OUTER:
}
}

for fieldID := range r.crs {
for fieldID := range r.frs {
if insertData.Data[fieldID].RowNum() == 0 {
return nil, nil
}
Expand All @@ -84,7 +84,7 @@ OUTER:
}

func (r *Reader) Close() {
for _, cr := range r.crs {
for _, cr := range r.frs {
cr.Close()
}
}
Loading