From 31cf849f68711cf6cbdf4c9cd7237c5bf3e09cf6 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Mon, 25 Mar 2024 20:29:07 +0800 Subject: [PATCH] enhance: Support retriving file size from importutilv2.Reader (#31533) To reduce the overhead caused by listing the S3 objects, add an interface to importutil.Reader to retrieve file sizes. issue: https://github.com/milvus-io/milvus/issues/31532, https://github.com/milvus-io/milvus/issues/28521 --------- Signed-off-by: bigsheeper --- internal/datanode/importv2/executor.go | 12 +++- internal/datanode/importv2/executor_test.go | 7 +-- internal/datanode/importv2/util.go | 40 ------------- internal/storage/utils.go | 13 +++++ internal/util/importutilv2/binlog/reader.go | 23 +++++++- internal/util/importutilv2/json/reader.go | 35 ++++++++++-- .../util/importutilv2/json/reader_test.go | 18 +++++- internal/util/importutilv2/mock_reader.go | 51 +++++++++++++++++ internal/util/importutilv2/numpy/reader.go | 43 ++++++++++---- internal/util/importutilv2/numpy/util.go | 4 +- internal/util/importutilv2/parquet/reader.go | 56 ++++++++++++++----- .../util/importutilv2/parquet/reader_test.go | 8 +-- internal/util/importutilv2/reader.go | 20 +++---- internal/util/importutilv2/util.go | 4 -- 14 files changed, 230 insertions(+), 104 deletions(-) diff --git a/internal/datanode/importv2/executor.go b/internal/datanode/importv2/executor.go index 24cbf1a7fdd10..999ffbdfb447f 100644 --- a/internal/datanode/importv2/executor.go +++ b/internal/datanode/importv2/executor.go @@ -150,7 +150,7 @@ func (e *executor) PreImport(task Task) { } defer reader.Close() start := time.Now() - err = e.readFileStat(reader, task, i, file) + err = e.readFileStat(reader, task, i) if err != nil { e.handleErr(task, err, "preimport failed") return err @@ -180,11 +180,17 @@ func (e *executor) PreImport(task Task) { WrapLogFields(task, zap.Any("fileStats", task.(*PreImportTask).GetFileStats()))...) } -func (e *executor) readFileStat(reader importutilv2.Reader, task Task, fileIdx int, file *internalpb.ImportFile) error { - fileSize, err := GetFileSize(file, e.cm, task) +func (e *executor) readFileStat(reader importutilv2.Reader, task Task, fileIdx int) error { + fileSize, err := reader.Size() if err != nil { return err } + maxSize := paramtable.Get().DataNodeCfg.MaxImportFileSizeInGB.GetAsFloat() * 1024 * 1024 * 1024 + if fileSize > int64(maxSize) { + return errors.New(fmt.Sprintf( + "The import file size has reached the maximum limit allowed for importing, "+ + "fileSize=%d, maxSize=%d", fileSize, int64(maxSize))) + } totalRows := 0 totalSize := 0 diff --git a/internal/datanode/importv2/executor_test.go b/internal/datanode/importv2/executor_test.go index c342706cae5d1..d4739d28a295a 100644 --- a/internal/datanode/importv2/executor_test.go +++ b/internal/datanode/importv2/executor_test.go @@ -464,13 +464,10 @@ func (s *ExecutorSuite) TestExecutor_ReadFileStat() { Paths: []string{"dummy.json"}, } - cm := mocks.NewChunkManager(s.T()) - cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil) - s.executor.cm = cm - var once sync.Once data := createInsertData(s.T(), s.schema, s.numRows) s.reader = importutilv2.NewMockReader(s.T()) + s.reader.EXPECT().Size().Return(1024, nil) s.reader.EXPECT().Read().RunAndReturn(func() (*storage.InsertData, error) { var res *storage.InsertData once.Do(func() { @@ -492,7 +489,7 @@ func (s *ExecutorSuite) TestExecutor_ReadFileStat() { } preimportTask := NewPreImportTask(preimportReq) s.manager.Add(preimportTask) - err := s.executor.readFileStat(s.reader, preimportTask, 0, importFile) + err := s.executor.readFileStat(s.reader, preimportTask, 0) s.NoError(err) } diff --git a/internal/datanode/importv2/util.go b/internal/datanode/importv2/util.go index 1cc3ab0aa9508..b9b99b7777377 100644 --- a/internal/datanode/importv2/util.go +++ b/internal/datanode/importv2/util.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "strconv" - "time" "github.com/samber/lo" "go.uber.org/zap" @@ -30,10 +29,8 @@ import ( "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/syncmgr" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/querycoordv2/params" "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" @@ -204,43 +201,6 @@ func GetInsertDataRowCount(data *storage.InsertData, schema *schemapb.Collection return 0 } -func GetFileSize(file *internalpb.ImportFile, cm storage.ChunkManager, task Task) (int64, error) { - paths := file.GetPaths() - if importutilv2.IsBackup(task.GetOptions()) { - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() - paths = make([]string, 0) - for _, prefix := range file.GetPaths() { - binlogs, _, err := cm.ListWithPrefix(ctx, prefix, true) - if err != nil { - return 0, err - } - paths = append(paths, binlogs...) - } - } - - fn := func(path string) (int64, error) { - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() - return cm.Size(ctx, path) - } - var totalSize int64 = 0 - for _, path := range paths { - size, err := fn(path) - if err != nil { - return 0, err - } - totalSize += size - } - maxSize := paramtable.Get().DataNodeCfg.MaxImportFileSizeInGB.GetAsFloat() * 1024 * 1024 * 1024 - if totalSize > int64(maxSize) { - return 0, merr.WrapErrImportFailed(fmt.Sprintf( - "The import file size has reached the maximum limit allowed for importing, "+ - "fileSize=%d, maxSize=%d", totalSize, int64(maxSize))) - } - return totalSize, nil -} - func LogStats(manager TaskManager) { logFunc := func(tasks []Task, taskType TaskType) { byState := lo.GroupBy(tasks, func(t Task) datapb.ImportTaskStateV2 { diff --git a/internal/storage/utils.go b/internal/storage/utils.go index a5cde25050339..631da3bfdd477 100644 --- a/internal/storage/utils.go +++ b/internal/storage/utils.go @@ -18,6 +18,7 @@ package storage import ( "bytes" + "context" "encoding/binary" "fmt" "io" @@ -1262,3 +1263,15 @@ func NewTestChunkManagerFactory(params *paramtable.ComponentParam, rootPath stri IAMEndpoint(params.MinioCfg.IAMEndpoint.GetValue()), CreateBucket(true)) } + +func GetFilesSize(ctx context.Context, paths []string, cm ChunkManager) (int64, error) { + totalSize := int64(0) + for _, filePath := range paths { + size, err := cm.Size(ctx, filePath) + if err != nil { + return 0, err + } + totalSize += size + } + return totalSize, nil +} diff --git a/internal/util/importutilv2/binlog/reader.go b/internal/util/importutilv2/binlog/reader.go index 2d1763702aea0..69cc7636018a2 100644 --- a/internal/util/importutilv2/binlog/reader.go +++ b/internal/util/importutilv2/binlog/reader.go @@ -23,6 +23,9 @@ import ( "io" "math" + "github.com/samber/lo" + "go.uber.org/atomic" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/util/merr" @@ -34,6 +37,7 @@ type reader struct { cm storage.ChunkManager schema *schemapb.CollectionSchema + fileSize *atomic.Int64 deleteData *storage.DeleteData insertLogs map[int64][]string // fieldID -> binlogs @@ -50,9 +54,10 @@ func NewReader(ctx context.Context, ) (*reader, error) { schema = typeutil.AppendSystemFields(schema) r := &reader{ - ctx: ctx, - cm: cm, - schema: schema, + ctx: ctx, + cm: cm, + schema: schema, + fileSize: atomic.NewInt64(0), } err := r.init(paths, tsStart, tsEnd) if err != nil { @@ -200,4 +205,16 @@ OUTER: return result, nil } +func (r *reader) Size() (int64, error) { + if size := r.fileSize.Load(); size != 0 { + return size, nil + } + size, err := storage.GetFilesSize(r.ctx, lo.Flatten(lo.Values(r.insertLogs)), r.cm) + if err != nil { + return 0, err + } + r.fileSize.Store(size) + return size, nil +} + func (r *reader) Close() {} diff --git a/internal/util/importutilv2/json/reader.go b/internal/util/importutilv2/json/reader.go index cdf6086321c2a..49c84ee8b8560 100644 --- a/internal/util/importutilv2/json/reader.go +++ b/internal/util/importutilv2/json/reader.go @@ -17,11 +17,14 @@ package json import ( + "context" "encoding/json" "fmt" "io" "strings" + "go.uber.org/atomic" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/util/merr" @@ -35,9 +38,14 @@ const ( type Row = map[storage.FieldID]any type reader struct { - dec *json.Decoder + ctx context.Context + cm storage.ChunkManager schema *schemapb.CollectionSchema + fileSize *atomic.Int64 + filePath string + dec *json.Decoder + bufferSize int count int64 isOldFormat bool @@ -45,15 +53,22 @@ type reader struct { parser RowParser } -func NewReader(r io.Reader, schema *schemapb.CollectionSchema, bufferSize int) (*reader, error) { - var err error +func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.CollectionSchema, path string, bufferSize int) (*reader, error) { + r, err := cm.Reader(ctx, path) + if err != nil { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("read json file failed, path=%s, err=%s", path, err.Error())) + } count, err := estimateReadCountPerBatch(bufferSize, schema) if err != nil { return nil, err } reader := &reader{ - dec: json.NewDecoder(r), + ctx: ctx, + cm: cm, schema: schema, + fileSize: atomic.NewInt64(0), + filePath: path, + dec: json.NewDecoder(r), bufferSize: bufferSize, count: count, } @@ -153,6 +168,18 @@ func (j *reader) Read() (*storage.InsertData, error) { return insertData, nil } +func (j *reader) Size() (int64, error) { + if size := j.fileSize.Load(); size != 0 { + return size, nil + } + size, err := j.cm.Size(j.ctx, j.filePath) + if err != nil { + return 0, err + } + j.fileSize.Store(size) + return size, nil +} + func (j *reader) Close() {} func estimateReadCountPerBatch(bufferSize int, schema *schemapb.CollectionSchema) (int64, error) { diff --git a/internal/util/importutilv2/json/reader_test.go b/internal/util/importutilv2/json/reader_test.go index c4476753abe6a..46272a36d71e6 100644 --- a/internal/util/importutilv2/json/reader_test.go +++ b/internal/util/importutilv2/json/reader_test.go @@ -17,9 +17,11 @@ package json import ( + "context" rand2 "crypto/rand" "encoding/json" "fmt" + "io" "math" "math/rand" "strconv" @@ -28,11 +30,13 @@ import ( "github.com/samber/lo" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "golang.org/x/exp/slices" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -246,8 +250,18 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) { jsonBytes, err := json.Marshal(rows) suite.NoError(err) - r := strings.NewReader(string(jsonBytes)) - reader, err := NewReader(r, schema, math.MaxInt) + type mockReader struct { + io.Reader + io.Closer + io.ReaderAt + io.Seeker + } + cm := mocks.NewChunkManager(suite.T()) + cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) { + r := &mockReader{Reader: strings.NewReader(string(jsonBytes))} + return r, nil + }) + reader, err := NewReader(context.Background(), cm, schema, "mockPath", math.MaxInt) suite.NoError(err) checkFn := func(actualInsertData *storage.InsertData, offsetBegin, expectRows int) { diff --git a/internal/util/importutilv2/mock_reader.go b/internal/util/importutilv2/mock_reader.go index 04fa270eb8f80..2a8c13ac74e25 100644 --- a/internal/util/importutilv2/mock_reader.go +++ b/internal/util/importutilv2/mock_reader.go @@ -105,6 +105,57 @@ func (_c *MockReader_Read_Call) RunAndReturn(run func() (*storage.InsertData, er return _c } +// Size provides a mock function with given fields: +func (_m *MockReader) Size() (int64, error) { + ret := _m.Called() + + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func() (int64, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockReader_Size_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Size' +type MockReader_Size_Call struct { + *mock.Call +} + +// Size is a helper method to define mock.On call +func (_e *MockReader_Expecter) Size() *MockReader_Size_Call { + return &MockReader_Size_Call{Call: _e.mock.On("Size")} +} + +func (_c *MockReader_Size_Call) Run(run func()) *MockReader_Size_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockReader_Size_Call) Return(_a0 int64, _a1 error) *MockReader_Size_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockReader_Size_Call) RunAndReturn(run func() (int64, error)) *MockReader_Size_Call { + _c.Call.Return(run) + return _c +} + // NewMockReader creates a new instance of MockReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockReader(t interface { diff --git a/internal/util/importutilv2/numpy/reader.go b/internal/util/importutilv2/numpy/reader.go index 2913bda3f0f6c..6acabe1f37b8b 100644 --- a/internal/util/importutilv2/numpy/reader.go +++ b/internal/util/importutilv2/numpy/reader.go @@ -24,19 +24,26 @@ import ( "strings" "github.com/samber/lo" + "go.uber.org/atomic" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/util/merr" ) -type Reader struct { +type reader struct { + ctx context.Context + cm storage.ChunkManager schema *schemapb.CollectionSchema - count int64 - frs map[int64]*FieldReader // fieldID -> FieldReader + + fileSize *atomic.Int64 + paths []string + + count int64 + frs map[int64]*FieldReader // fieldID -> FieldReader } -func NewReader(ctx context.Context, schema *schemapb.CollectionSchema, paths []string, cm storage.ChunkManager, bufferSize int) (*Reader, error) { +func NewReader(ctx context.Context, schema *schemapb.CollectionSchema, paths []string, cm storage.ChunkManager, bufferSize int) (*reader, error) { fields := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 { return field.GetFieldID() }) @@ -56,14 +63,18 @@ func NewReader(ctx context.Context, schema *schemapb.CollectionSchema, paths []s } crs[fieldID] = cr } - return &Reader{ - schema: schema, - count: count, - frs: crs, + return &reader{ + ctx: ctx, + cm: cm, + schema: schema, + fileSize: atomic.NewInt64(0), + paths: paths, + count: count, + frs: crs, }, nil } -func (r *Reader) Read() (*storage.InsertData, error) { +func (r *reader) Read() (*storage.InsertData, error) { insertData, err := storage.NewInsertData(r.schema) if err != nil { return nil, err @@ -89,7 +100,19 @@ func (r *Reader) Read() (*storage.InsertData, error) { return insertData, nil } -func (r *Reader) Close() { +func (r *reader) Size() (int64, error) { + if size := r.fileSize.Load(); size != 0 { + return size, nil + } + size, err := storage.GetFilesSize(r.ctx, r.paths, r.cm) + if err != nil { + return 0, err + } + r.fileSize.Store(size) + return size, nil +} + +func (r *reader) Close() { for _, cr := range r.frs { cr.Close() } diff --git a/internal/util/importutilv2/numpy/util.go b/internal/util/importutilv2/numpy/util.go index 552985348ffcb..9ff554f298b01 100644 --- a/internal/util/importutilv2/numpy/util.go +++ b/internal/util/importutilv2/numpy/util.go @@ -251,7 +251,7 @@ func fillDynamicData(data *storage.InsertData, schema *schemapb.CollectionSchema if dynamicField == nil { return nil } - rowNum := GetInsertDataRowNum(data, schema) + rowNum := getInsertDataRowNum(data, schema) dynamicData := data.Data[dynamicField.GetFieldID()] jsonFD := dynamicData.(*storage.JSONFieldData) bs := []byte("{}") @@ -262,7 +262,7 @@ func fillDynamicData(data *storage.InsertData, schema *schemapb.CollectionSchema return nil } -func GetInsertDataRowNum(data *storage.InsertData, schema *schemapb.CollectionSchema) int { +func getInsertDataRowNum(data *storage.InsertData, schema *schemapb.CollectionSchema) int { fields := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 { return field.GetFieldID() }) diff --git a/internal/util/importutilv2/parquet/reader.go b/internal/util/importutilv2/parquet/reader.go index d39dbb236894c..16bc006673d0b 100644 --- a/internal/util/importutilv2/parquet/reader.go +++ b/internal/util/importutilv2/parquet/reader.go @@ -25,6 +25,7 @@ import ( "github.com/apache/arrow/go/v12/parquet" "github.com/apache/arrow/go/v12/parquet/file" "github.com/apache/arrow/go/v12/parquet/pqarrow" + "go.uber.org/atomic" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" @@ -33,28 +34,37 @@ import ( "github.com/milvus-io/milvus/pkg/util/merr" ) -type Reader struct { - reader *file.Reader +type reader struct { + ctx context.Context + cm storage.ChunkManager + schema *schemapb.CollectionSchema + + path string + r *file.Reader + fileSize *atomic.Int64 bufferSize int count int64 - schema *schemapb.CollectionSchema - frs map[int64]*FieldReader // fieldID -> FieldReader + frs map[int64]*FieldReader // fieldID -> FieldReader } -func NewReader(ctx context.Context, schema *schemapb.CollectionSchema, cmReader storage.FileReader, bufferSize int) (*Reader, error) { - reader, err := file.NewParquetReader(cmReader, file.WithReadProps(&parquet.ReaderProperties{ +func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.CollectionSchema, path string, bufferSize int) (*reader, error) { + cmReader, err := cm.Reader(ctx, path) + if err != nil { + return nil, err + } + r, err := file.NewParquetReader(cmReader, file.WithReadProps(&parquet.ReaderProperties{ BufferSize: int64(bufferSize), BufferedStreamEnabled: true, })) if err != nil { return nil, merr.WrapErrImportFailed(fmt.Sprintf("new parquet reader failed, err=%v", err)) } - log.Info("create parquet reader done", zap.Int("row group num", reader.NumRowGroups()), - zap.Int64("num rows", reader.NumRows())) + log.Info("create parquet reader done", zap.Int("row group num", r.NumRowGroups()), + zap.Int64("num rows", r.NumRows())) - fileReader, err := pqarrow.NewFileReader(reader, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator) + fileReader, err := pqarrow.NewFileReader(r, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator) if err != nil { return nil, merr.WrapErrImportFailed(fmt.Sprintf("new parquet file reader failed, err=%v", err)) } @@ -67,16 +77,20 @@ func NewReader(ctx context.Context, schema *schemapb.CollectionSchema, cmReader if err != nil { return nil, err } - return &Reader{ - reader: reader, + return &reader{ + ctx: ctx, + cm: cm, + schema: schema, + fileSize: atomic.NewInt64(0), + path: path, + r: r, bufferSize: bufferSize, count: count, - schema: schema, frs: crs, }, nil } -func (r *Reader) Read() (*storage.InsertData, error) { +func (r *reader) Read() (*storage.InsertData, error) { insertData, err := storage.NewInsertData(r.schema) if err != nil { return nil, err @@ -108,11 +122,23 @@ OUTER: return insertData, nil } -func (r *Reader) Close() { +func (r *reader) Size() (int64, error) { + if size := r.fileSize.Load(); size != 0 { + return size, nil + } + size, err := r.cm.Size(r.ctx, r.path) + if err != nil { + return 0, err + } + r.fileSize.Store(size) + return size, nil +} + +func (r *reader) Close() { for _, cr := range r.frs { cr.Close() } - err := r.reader.Close() + err := r.r.Close() if err != nil { log.Warn("close parquet reader failed", zap.Error(err)) } diff --git a/internal/util/importutilv2/parquet/reader_test.go b/internal/util/importutilv2/parquet/reader_test.go index 796cf60e295a2..9240b6dccee90 100644 --- a/internal/util/importutilv2/parquet/reader_test.go +++ b/internal/util/importutilv2/parquet/reader_test.go @@ -413,9 +413,7 @@ func (s *ReaderSuite) run(dt schemapb.DataType) { f := storage.NewChunkManagerFactory("local", storage.RootPath("/tmp/milvus_test/test_parquet_reader/")) cm, err := f.NewPersistentStorageChunkManager(ctx) assert.NoError(s.T(), err) - cmReader, err := cm.Reader(ctx, filePath) - assert.NoError(s.T(), err) - reader, err := NewReader(ctx, schema, cmReader, 64*1024*1024) + reader, err := NewReader(ctx, cm, schema, filePath, 64*1024*1024) s.NoError(err) checkFn := func(actualInsertData *storage.InsertData, offsetBegin, expectRows int) { @@ -494,9 +492,7 @@ func (s *ReaderSuite) failRun(dt schemapb.DataType, isDynamic bool) { f := storage.NewChunkManagerFactory("local", storage.RootPath("/tmp/milvus_test/test_parquet_reader/")) cm, err := f.NewPersistentStorageChunkManager(ctx) assert.NoError(s.T(), err) - cmReader, err := cm.Reader(ctx, filePath) - assert.NoError(s.T(), err) - reader, err := NewReader(ctx, schema, cmReader, 64*1024*1024) + reader, err := NewReader(ctx, cm, schema, filePath, 64*1024*1024) s.NoError(err) _, err = reader.Read() diff --git a/internal/util/importutilv2/reader.go b/internal/util/importutilv2/reader.go index 6934baa50546f..d3c0d532fb8a1 100644 --- a/internal/util/importutilv2/reader.go +++ b/internal/util/importutilv2/reader.go @@ -31,7 +31,15 @@ import ( //go:generate mockery --name=Reader --structname=MockReader --output=./ --filename=mock_reader.go --with-expecter --inpackage type Reader interface { + // Size returns the size of the underlying file/files in bytes. + // It returns an error if the size cannot be determined. + Size() (int64, error) + + // Read reads data from the underlying file/files. + // It returns the storage.InsertData and an error, if any. Read() (*storage.InsertData, error) + + // Close closes the underlying file reader. Close() } @@ -57,19 +65,11 @@ func NewReader(ctx context.Context, } switch fileType { case JSON: - reader, err := cm.Reader(ctx, importFile.GetPaths()[0]) - if err != nil { - return nil, WrapReadFileError(importFile.GetPaths()[0], err) - } - return json.NewReader(reader, schema, bufferSize) + return json.NewReader(ctx, cm, schema, importFile.GetPaths()[0], bufferSize) case Numpy: return numpy.NewReader(ctx, schema, importFile.GetPaths(), cm, bufferSize) case Parquet: - cmReader, err := cm.Reader(ctx, importFile.GetPaths()[0]) - if err != nil { - return nil, err - } - return parquet.NewReader(ctx, schema, cmReader, bufferSize) + return parquet.NewReader(ctx, cm, schema, importFile.GetPaths()[0], bufferSize) } return nil, merr.WrapErrImportFailed("unexpected import file") } diff --git a/internal/util/importutilv2/util.go b/internal/util/importutilv2/util.go index 7a55e8dfe70ee..0f4f7e2a2fed5 100644 --- a/internal/util/importutilv2/util.go +++ b/internal/util/importutilv2/util.go @@ -50,10 +50,6 @@ func (f FileType) String() string { return FileTypeName[int(f)] } -func WrapReadFileError(file string, err error) error { - return merr.WrapErrImportFailed(fmt.Sprintf("failed to read the file '%s', error: %s", file, err.Error())) -} - func GetFileType(file *internalpb.ImportFile) (FileType, error) { if len(file.GetPaths()) == 0 { return Invalid, merr.WrapErrImportFailed("no file to import")