Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add import reader for binlog #28910

Merged
merged 44 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
436ac4d
feat: Add import reader for binlog
bigsheeper Dec 1, 2023
500955e
updaet
bigsheeper Dec 1, 2023
963891f
update import reader for binlog
bigsheeper Dec 4, 2023
0fb4c5d
update reader
bigsheeper Dec 5, 2023
dc9f7d4
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Dec 5, 2023
e6d590e
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Dec 5, 2023
cdce1ad
resolve import sycle
bigsheeper Dec 5, 2023
cffeda2
code format
bigsheeper Dec 5, 2023
868688f
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Dec 5, 2023
691a44a
update
bigsheeper Dec 5, 2023
f2e4285
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Dec 5, 2023
b96c030
update test
bigsheeper Dec 5, 2023
14eb29a
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Dec 5, 2023
61998d3
update
bigsheeper Dec 5, 2023
c5fb921
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Dec 5, 2023
9938ef8
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Dec 6, 2023
7e2f1cf
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Dec 8, 2023
bf0edeb
update test
bigsheeper Dec 11, 2023
3c0a72b
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Dec 13, 2023
20e19f0
update
bigsheeper Dec 13, 2023
4dd1bd9
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Dec 13, 2023
0fbdf44
ut pass
bigsheeper Dec 13, 2023
6fb9e86
ut pass2
bigsheeper Dec 14, 2023
cfb06ed
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Dec 14, 2023
08bcf59
update
bigsheeper Dec 14, 2023
5ccad15
all test pass
bigsheeper Dec 14, 2023
1f0875e
all test pass 2
bigsheeper Dec 14, 2023
2d4f86f
update
bigsheeper Dec 14, 2023
7e28b25
all test pass 3
bigsheeper Dec 14, 2023
74300f1
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Dec 14, 2023
4679afc
update
bigsheeper Dec 14, 2023
10f4199
update
bigsheeper Dec 14, 2023
9815f1e
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Dec 16, 2023
6fb736f
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Dec 19, 2023
4061245
update
bigsheeper Dec 19, 2023
f8b9ea3
resolve conflicts
bigsheeper Dec 25, 2023
32a5496
readPKs
bigsheeper Dec 25, 2023
ef0e987
update
bigsheeper Dec 25, 2023
76c896c
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Dec 26, 2023
4e1f23d
fix conflicts
bigsheeper Jan 2, 2024
5d3282d
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Jan 4, 2024
43ed85a
update after code review
bigsheeper Jan 4, 2024
26bb258
Merge branch 'master' of https://github.com/milvus-io/milvus into 231…
bigsheeper Jan 4, 2024
1bd6e13
remove fieldReader api
bigsheeper Jan 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
updaet
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
  • Loading branch information
bigsheeper committed Dec 4, 2023
commit 500955eaf7a2b8e93ccf704ac5c62f75dba88cd8
6 changes: 5 additions & 1 deletion internal/storage/insert_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ func (i *InsertData) Append(row map[FieldID]interface{}) error {
}

func (i *InsertData) GetRow(idx int) map[FieldID]interface{} {
return nil
res := make(map[FieldID]interface{})
for field, data := range i.Data {
res[field] = data.GetRow(idx)
}
return res
}

// FieldData defines field data interface
Expand Down
1 change: 1 addition & 0 deletions internal/util/importutilv2/binlog/column_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package binlog
import (
"context"
"fmt"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/importutilv2"
Expand Down
4 changes: 2 additions & 2 deletions internal/util/importutilv2/binlog/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

type Filter func(row map[int64]interface{}) bool

func FilterWithDelete(r *Reader) (Filter, error) {
func FilterWithDelete(r *reader) (Filter, error) {
pkField, err := typeutil.GetPrimaryFieldSchema(r.schema)
if err != nil {
return nil, err
Expand All @@ -39,7 +39,7 @@ func FilterWithDelete(r *Reader) (Filter, error) {
}, nil
}

func FilterWithTimerange(r *Reader) Filter {
func FilterWithTimerange(r *reader) Filter {
return func(row map[int64]interface{}) bool {
ts := row[common.TimeStampField].(int64)
return uint64(ts) >= r.tsBegin && uint64(ts) <= r.tsEnd
Expand Down
12 changes: 6 additions & 6 deletions internal/util/importutilv2/binlog/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ package binlog
import (
"context"
"encoding/json"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type Reader struct {
type reader struct {
cm storage.ChunkManager
schema *schemapb.CollectionSchema

Expand All @@ -40,7 +41,7 @@ type Reader struct {
tsEnd uint64
}

func (r *Reader) Init(paths []string) error {
func (r *reader) Init(paths []string) error {
insertLogs, deltaLogs, err := r.ListBinlogs(paths)
if err != nil {
return err
Expand All @@ -61,7 +62,7 @@ func (r *Reader) Init(paths []string) error {
return nil
}

func (r *Reader) ReadDelta(deltaLogs *datapb.FieldBinlog) (*storage.DeleteData, error) {
func (r *reader) ReadDelta(deltaLogs *datapb.FieldBinlog) (*storage.DeleteData, error) {
deleteData := storage.NewDeleteData(nil, nil)
for _, binlog := range deltaLogs.GetBinlogs() {
path := binlog.GetLogPath()
Expand All @@ -87,7 +88,7 @@ func (r *Reader) ReadDelta(deltaLogs *datapb.FieldBinlog) (*storage.DeleteData,
return deleteData, nil
}

func (r *Reader) ListBinlogs(paths []string) ([]*datapb.FieldBinlog, *datapb.FieldBinlog, error) {
func (r *reader) ListBinlogs(paths []string) ([]*datapb.FieldBinlog, *datapb.FieldBinlog, error) {
if len(paths) < 1 {
return nil, nil, merr.WrapErrImportFailed("no insert binlogs to import")
}
Expand All @@ -106,7 +107,7 @@ func (r *Reader) ListBinlogs(paths []string) ([]*datapb.FieldBinlog, *datapb.Fie
return nil, nil, nil
}

func (r *Reader) Next(count int64) (*storage.InsertData, error) {
func (r *reader) Next(count int64) (*storage.InsertData, error) {
insertData, err := storage.NewInsertData(r.schema)
if err != nil {
return nil, err
Expand Down Expand Up @@ -147,7 +148,6 @@ func (r *Reader) Next(count int64) (*storage.InsertData, error) {
return nil, err
}
}

r.readIdx++
return res, nil
}
8 changes: 5 additions & 3 deletions internal/util/importutilv2/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
)

type Reader interface {
//ReadStats()
//Close()
// ReadStats()
// Close()

Next(count int64) (*storage.InsertData, error)
}

type ColumnReader interface {
bigsheeper marked this conversation as resolved.
Show resolved Hide resolved
//Close()
// Close()

Next(count int64) (storage.FieldData, error)
}