Skip to content

Commit

Permalink
enhance: Support retriving file size from importutilv2.Reader (#31533)
Browse files Browse the repository at this point in the history
To reduce the overhead caused by listing the S3 objects, add an
interface to importutil.Reader to retrieve file sizes.

issue: #31532,
#28521

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
  • Loading branch information
bigsheeper authored Mar 25, 2024
1 parent 6bb654f commit 31cf849
Show file tree
Hide file tree
Showing 14 changed files with 230 additions and 104 deletions.
12 changes: 9 additions & 3 deletions internal/datanode/importv2/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (e *executor) PreImport(task Task) {
}
defer reader.Close()
start := time.Now()
err = e.readFileStat(reader, task, i, file)
err = e.readFileStat(reader, task, i)
if err != nil {
e.handleErr(task, err, "preimport failed")
return err
Expand Down Expand Up @@ -180,11 +180,17 @@ func (e *executor) PreImport(task Task) {
WrapLogFields(task, zap.Any("fileStats", task.(*PreImportTask).GetFileStats()))...)
}

func (e *executor) readFileStat(reader importutilv2.Reader, task Task, fileIdx int, file *internalpb.ImportFile) error {
fileSize, err := GetFileSize(file, e.cm, task)
func (e *executor) readFileStat(reader importutilv2.Reader, task Task, fileIdx int) error {
fileSize, err := reader.Size()
if err != nil {
return err
}
maxSize := paramtable.Get().DataNodeCfg.MaxImportFileSizeInGB.GetAsFloat() * 1024 * 1024 * 1024
if fileSize > int64(maxSize) {
return errors.New(fmt.Sprintf(
"The import file size has reached the maximum limit allowed for importing, "+
"fileSize=%d, maxSize=%d", fileSize, int64(maxSize)))
}

totalRows := 0
totalSize := 0
Expand Down
7 changes: 2 additions & 5 deletions internal/datanode/importv2/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,13 +464,10 @@ func (s *ExecutorSuite) TestExecutor_ReadFileStat() {
Paths: []string{"dummy.json"},
}

cm := mocks.NewChunkManager(s.T())
cm.EXPECT().Size(mock.Anything, mock.Anything).Return(1024, nil)
s.executor.cm = cm

var once sync.Once
data := createInsertData(s.T(), s.schema, s.numRows)
s.reader = importutilv2.NewMockReader(s.T())
s.reader.EXPECT().Size().Return(1024, nil)
s.reader.EXPECT().Read().RunAndReturn(func() (*storage.InsertData, error) {
var res *storage.InsertData
once.Do(func() {
Expand All @@ -492,7 +489,7 @@ func (s *ExecutorSuite) TestExecutor_ReadFileStat() {
}
preimportTask := NewPreImportTask(preimportReq)
s.manager.Add(preimportTask)
err := s.executor.readFileStat(s.reader, preimportTask, 0, importFile)
err := s.executor.readFileStat(s.reader, preimportTask, 0)
s.NoError(err)
}

Expand Down
40 changes: 0 additions & 40 deletions internal/datanode/importv2/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"
"strconv"
"time"

"github.com/samber/lo"
"go.uber.org/zap"
Expand All @@ -30,10 +29,8 @@ import (
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
Expand Down Expand Up @@ -204,43 +201,6 @@ func GetInsertDataRowCount(data *storage.InsertData, schema *schemapb.Collection
return 0
}

func GetFileSize(file *internalpb.ImportFile, cm storage.ChunkManager, task Task) (int64, error) {
paths := file.GetPaths()
if importutilv2.IsBackup(task.GetOptions()) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
paths = make([]string, 0)
for _, prefix := range file.GetPaths() {
binlogs, _, err := cm.ListWithPrefix(ctx, prefix, true)
if err != nil {
return 0, err
}
paths = append(paths, binlogs...)
}
}

fn := func(path string) (int64, error) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
return cm.Size(ctx, path)
}
var totalSize int64 = 0
for _, path := range paths {
size, err := fn(path)
if err != nil {
return 0, err
}
totalSize += size
}
maxSize := paramtable.Get().DataNodeCfg.MaxImportFileSizeInGB.GetAsFloat() * 1024 * 1024 * 1024
if totalSize > int64(maxSize) {
return 0, merr.WrapErrImportFailed(fmt.Sprintf(
"The import file size has reached the maximum limit allowed for importing, "+
"fileSize=%d, maxSize=%d", totalSize, int64(maxSize)))
}
return totalSize, nil
}

func LogStats(manager TaskManager) {
logFunc := func(tasks []Task, taskType TaskType) {
byState := lo.GroupBy(tasks, func(t Task) datapb.ImportTaskStateV2 {
Expand Down
13 changes: 13 additions & 0 deletions internal/storage/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package storage

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
Expand Down Expand Up @@ -1262,3 +1263,15 @@ func NewTestChunkManagerFactory(params *paramtable.ComponentParam, rootPath stri
IAMEndpoint(params.MinioCfg.IAMEndpoint.GetValue()),
CreateBucket(true))
}

func GetFilesSize(ctx context.Context, paths []string, cm ChunkManager) (int64, error) {
totalSize := int64(0)
for _, filePath := range paths {
size, err := cm.Size(ctx, filePath)
if err != nil {
return 0, err
}
totalSize += size
}
return totalSize, nil
}
23 changes: 20 additions & 3 deletions internal/util/importutilv2/binlog/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"io"
"math"

"github.com/samber/lo"
"go.uber.org/atomic"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/merr"
Expand All @@ -34,6 +37,7 @@ type reader struct {
cm storage.ChunkManager
schema *schemapb.CollectionSchema

fileSize *atomic.Int64
deleteData *storage.DeleteData
insertLogs map[int64][]string // fieldID -> binlogs

Expand All @@ -50,9 +54,10 @@ func NewReader(ctx context.Context,
) (*reader, error) {
schema = typeutil.AppendSystemFields(schema)
r := &reader{
ctx: ctx,
cm: cm,
schema: schema,
ctx: ctx,
cm: cm,
schema: schema,
fileSize: atomic.NewInt64(0),
}
err := r.init(paths, tsStart, tsEnd)
if err != nil {
Expand Down Expand Up @@ -200,4 +205,16 @@ OUTER:
return result, nil
}

func (r *reader) Size() (int64, error) {
if size := r.fileSize.Load(); size != 0 {
return size, nil
}
size, err := storage.GetFilesSize(r.ctx, lo.Flatten(lo.Values(r.insertLogs)), r.cm)
if err != nil {
return 0, err
}
r.fileSize.Store(size)
return size, nil
}

func (r *reader) Close() {}
35 changes: 31 additions & 4 deletions internal/util/importutilv2/json/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
package json

import (
"context"
"encoding/json"
"fmt"
"io"
"strings"

"go.uber.org/atomic"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/merr"
Expand All @@ -35,25 +38,37 @@ const (
type Row = map[storage.FieldID]any

type reader struct {
dec *json.Decoder
ctx context.Context
cm storage.ChunkManager
schema *schemapb.CollectionSchema

fileSize *atomic.Int64
filePath string
dec *json.Decoder

bufferSize int
count int64
isOldFormat bool

parser RowParser
}

func NewReader(r io.Reader, schema *schemapb.CollectionSchema, bufferSize int) (*reader, error) {
var err error
func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.CollectionSchema, path string, bufferSize int) (*reader, error) {
r, err := cm.Reader(ctx, path)
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("read json file failed, path=%s, err=%s", path, err.Error()))
}
count, err := estimateReadCountPerBatch(bufferSize, schema)
if err != nil {
return nil, err
}
reader := &reader{
dec: json.NewDecoder(r),
ctx: ctx,
cm: cm,
schema: schema,
fileSize: atomic.NewInt64(0),
filePath: path,
dec: json.NewDecoder(r),
bufferSize: bufferSize,
count: count,
}
Expand Down Expand Up @@ -153,6 +168,18 @@ func (j *reader) Read() (*storage.InsertData, error) {
return insertData, nil
}

func (j *reader) Size() (int64, error) {
if size := j.fileSize.Load(); size != 0 {
return size, nil
}
size, err := j.cm.Size(j.ctx, j.filePath)
if err != nil {
return 0, err
}
j.fileSize.Store(size)
return size, nil
}

func (j *reader) Close() {}

func estimateReadCountPerBatch(bufferSize int, schema *schemapb.CollectionSchema) (int64, error) {
Expand Down
18 changes: 16 additions & 2 deletions internal/util/importutilv2/json/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package json

import (
"context"
rand2 "crypto/rand"
"encoding/json"
"fmt"
"io"
"math"
"math/rand"
"strconv"
Expand All @@ -28,11 +30,13 @@ import (

"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"golang.org/x/exp/slices"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/paramtable"
Expand Down Expand Up @@ -246,8 +250,18 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) {

jsonBytes, err := json.Marshal(rows)
suite.NoError(err)
r := strings.NewReader(string(jsonBytes))
reader, err := NewReader(r, schema, math.MaxInt)
type mockReader struct {
io.Reader
io.Closer
io.ReaderAt
io.Seeker
}
cm := mocks.NewChunkManager(suite.T())
cm.EXPECT().Reader(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, s string) (storage.FileReader, error) {
r := &mockReader{Reader: strings.NewReader(string(jsonBytes))}
return r, nil
})
reader, err := NewReader(context.Background(), cm, schema, "mockPath", math.MaxInt)
suite.NoError(err)

checkFn := func(actualInsertData *storage.InsertData, offsetBegin, expectRows int) {
Expand Down
51 changes: 51 additions & 0 deletions internal/util/importutilv2/mock_reader.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 31cf849

Please sign in to comment.