Skip to content

Commit

Permalink
update import reader for binlog
Browse files Browse the repository at this point in the history
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
  • Loading branch information
bigsheeper committed Dec 4, 2023
1 parent 500955e commit 963891f
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 96 deletions.
42 changes: 1 addition & 41 deletions internal/util/importutilv2/binlog/column_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,17 @@
package binlog

import (
"context"
"fmt"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/pkg/util/merr"
)

type columnReader struct {
reader *storage.BinlogReader
fieldSchema *schemapb.FieldSchema
}

func NewColumnReader(cm storage.ChunkManager, fieldSchema *schemapb.FieldSchema, path string) (importutilv2.ColumnReader, error) {
func newColumnReader(cm storage.ChunkManager, fieldSchema *schemapb.FieldSchema, path string) (importutilv2.ColumnReader, error) {
reader, err := newBinlogReader(cm, path)
if err != nil {
return nil, err
Expand Down Expand Up @@ -59,39 +55,3 @@ func (r *columnReader) Next(_ int64) (storage.FieldData, error) {
}
return fieldData, nil
}

func readData(reader *storage.BinlogReader, et storage.EventTypeCode) ([]any, error) {
result := make([]any, 0)
for {
event, err := reader.NextEventReader()
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to iterate events reader, error: %v", err))
}
if event == nil {
break // end of the file
}
if event.TypeCode != et {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("wrong binlog type, expect:%s, actual:%s",
et.String(), event.TypeCode.String()))
}
data, _, err := event.PayloadReaderInterface.GetDataFromPayload()
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to read data, error: %v", err))
}
result = append(result, data.([]any)...)
}
return result, nil
}

func newBinlogReader(cm storage.ChunkManager, path string) (*storage.BinlogReader, error) {
bytes, err := cm.Read(context.TODO(), path) // TODO: fix context
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to open binlog %s", path))
}
var reader *storage.BinlogReader
reader, err = storage.NewBinlogReader(bytes)
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to create reader, binlog:%s, error:%v", path, err))
}
return reader, nil
}
6 changes: 3 additions & 3 deletions internal/util/importutilv2/binlog/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func FilterWithDelete(r *reader) (Filter, error) {
}
return func(row map[int64]interface{}) bool {
rowPk := row[pkField.GetFieldID()]
for _, pk := range r.delData.Pks {
for _, pk := range r.deleteData.Pks {
if pk.GetValue() == rowPk {
return false
}
Expand All @@ -39,9 +39,9 @@ func FilterWithDelete(r *reader) (Filter, error) {
}, nil
}

func FilterWithTimerange(r *reader) Filter {
func FilterWithTimeRange(tsStart, tsEnd uint64) Filter {
return func(row map[int64]interface{}) bool {
ts := row[common.TimeStampField].(int64)
return uint64(ts) >= r.tsBegin && uint64(ts) <= r.tsEnd
return uint64(ts) >= tsStart && uint64(ts) <= tsEnd
}
}
107 changes: 55 additions & 52 deletions internal/util/importutilv2/binlog/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package binlog

import (
"context"
"encoding/json"
"math"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand All @@ -31,41 +31,53 @@ type reader struct {
cm storage.ChunkManager
schema *schemapb.CollectionSchema

delData *storage.DeleteData
insertLogs []*datapb.FieldBinlog
readIdx int
deleteData *storage.DeleteData
insertLogs map[int64][]string // fieldID -> binlogs

readIdx int
filters []Filter

tsBegin uint64
tsEnd uint64
}

func (r *reader) Init(paths []string) error {
insertLogs, deltaLogs, err := r.ListBinlogs(paths)
if err != nil {
return err
func NewReader(cm storage.ChunkManager,
schema *schemapb.CollectionSchema,
paths []string, tsStart, tsEnd uint64) (importutilv2.Reader, error) {
r := &reader{
cm: cm,
schema: schema,
}
r.delData, err = r.ReadDelta(deltaLogs)
err := r.Init(paths, tsStart, tsEnd)
if err != nil {
return err
return nil, err
}
r.insertLogs = insertLogs
isDeleted, err := FilterWithDelete(r)
return r, nil
}

func (r *reader) Init(paths []string, tsStart, tsEnd uint64) error {
if tsStart != 0 || tsEnd != math.MaxUint64 {
r.filters = append(r.filters, FilterWithTimeRange(tsStart, tsEnd))
}
insertLogs, deltaLogs, err := listBinlogs(r.cm, paths)
if err != nil {
return err
}
r.filters = []Filter{
isDeleted,
FilterWithTimerange(r),
r.insertLogs = insertLogs
if len(deltaLogs) > 0 {
r.deleteData, err = r.ReadDelete(deltaLogs, tsStart, tsEnd)
if err != nil {
return err
}
deleteFilter, err := FilterWithDelete(r)
if err != nil {
return err
}
r.filters = append(r.filters, deleteFilter)
}
return nil
}

func (r *reader) ReadDelta(deltaLogs *datapb.FieldBinlog) (*storage.DeleteData, error) {
func (r *reader) ReadDelete(deltaLogs []string, tsStart, tsEnd uint64) (*storage.DeleteData, error) {
deleteData := storage.NewDeleteData(nil, nil)
for _, binlog := range deltaLogs.GetBinlogs() {
path := binlog.GetLogPath()
for _, path := range deltaLogs {
reader, err := newBinlogReader(r.cm, path)
if err != nil {
return nil, err
Expand All @@ -80,48 +92,29 @@ func (r *reader) ReadDelta(deltaLogs *datapb.FieldBinlog) (*storage.DeleteData,
if err != nil {
return nil, err
}
if dl.Ts >= r.tsBegin && dl.Ts <= r.tsEnd {
if dl.Ts >= tsStart && dl.Ts <= tsEnd {
deleteData.Append(dl.Pk, dl.Ts)
}
}
}
return deleteData, nil
}

func (r *reader) ListBinlogs(paths []string) ([]*datapb.FieldBinlog, *datapb.FieldBinlog, error) {
if len(paths) < 1 {
return nil, nil, merr.WrapErrImportFailed("no insert binlogs to import")
}
_, _, err := r.cm.ListWithPrefix(context.Background(), paths[0], true)
if err != nil {
return nil, nil, err
}
// TODO: parse logPaths to fieldBinlog
if len(paths) < 2 {
return nil, nil, nil
}
_, _, err = r.cm.ListWithPrefix(context.Background(), paths[1], true)
if err != nil {
return nil, nil, err
}
return nil, nil, nil
}

func (r *reader) Next(count int64) (*storage.InsertData, error) {
insertData, err := storage.NewInsertData(r.schema)
if err != nil {
return nil, err
}
if r.readIdx == len(r.insertLogs[0].GetBinlogs()) {
if r.readIdx == len(r.insertLogs[0]) {
return nil, nil
}
for _, fieldBinlog := range r.insertLogs {
field := typeutil.GetField(r.schema, fieldBinlog.GetFieldID())
for fieldID, binlogs := range r.insertLogs {
field := typeutil.GetField(r.schema, fieldID)
if field == nil {
return nil, merr.WrapErrFieldNotFound(fieldBinlog.GetFieldID())
return nil, merr.WrapErrFieldNotFound(fieldID)
}
path := fieldBinlog.GetBinlogs()[r.readIdx].GetLogPath()
cr, err := NewColumnReader(r.cm, field, path)
path := binlogs[r.readIdx]
cr, err := newColumnReader(r.cm, field, path)
if err != nil {
return nil, err
}
Expand All @@ -131,8 +124,19 @@ func (r *reader) Next(count int64) (*storage.InsertData, error) {
}
insertData.Data[field.GetFieldID()] = fieldData
}
insertData, err = r.Filter(insertData)
if err != nil {
return nil, err
}
r.readIdx++
return insertData, nil
}

res, err := storage.NewInsertData(r.schema)
func (r *reader) Filter(insertData *storage.InsertData) (*storage.InsertData, error) {
if len(r.filters) == 0 {
return insertData, nil
}
result, err := storage.NewInsertData(r.schema)
if err != nil {
return nil, err
}
Expand All @@ -143,11 +147,10 @@ func (r *reader) Next(count int64) (*storage.InsertData, error) {
continue
}
}
err = res.Append(row)
err = result.Append(row)
if err != nil {
return nil, err
}
}
r.readIdx++
return res, nil
return result, nil
}
95 changes: 95 additions & 0 deletions internal/util/importutilv2/binlog/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package binlog

import (
"context"
"fmt"
"path"
"sort"
"strconv"

"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/merr"
)

func readData(reader *storage.BinlogReader, et storage.EventTypeCode) ([]any, error) {
result := make([]any, 0)
for {
event, err := reader.NextEventReader()
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to iterate events reader, error: %v", err))
}
if event == nil {
break // end of the file
}
if event.TypeCode != et {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("wrong binlog type, expect:%s, actual:%s",
et.String(), event.TypeCode.String()))
}
data, _, err := event.PayloadReaderInterface.GetDataFromPayload()
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to read data, error: %v", err))
}
result = append(result, data.([]any)...)
}
return result, nil
}

func newBinlogReader(cm storage.ChunkManager, path string) (*storage.BinlogReader, error) {
bytes, err := cm.Read(context.TODO(), path) // TODO: dyh, resolve context, and checks if the error is a retryable error
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to open binlog %s", path))
}
var reader *storage.BinlogReader
reader, err = storage.NewBinlogReader(bytes)
if err != nil {
return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to create reader, binlog:%s, error:%v", path, err))
}
return reader, nil
}

func listBinlogs(cm storage.ChunkManager, paths []string) (map[int64][]string, []string, error) {
if len(paths) < 1 {
return nil, nil, merr.WrapErrImportFailed("no insert binlogs to import")
}
insertLogPaths, _, err := cm.ListWithPrefix(context.Background(), paths[0], true)
if err != nil {
return nil, nil, err
}
insertLogs := make(map[int64][]string)
for _, logPath := range insertLogPaths {
fieldPath := path.Dir(logPath)
fieldStrID := path.Base(fieldPath)
fieldID, err := strconv.ParseInt(fieldStrID, 10, 64)
if err != nil {
return nil, nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to parse field id from log, error: %v", err))
}
insertLogs[fieldID] = append(insertLogs[fieldID], logPath)
}
for _, v := range insertLogs {
sort.Strings(v)
}
if len(paths) < 2 {
return insertLogs, nil, nil
}
deltaLogs, _, err := cm.ListWithPrefix(context.Background(), paths[1], true)
if err != nil {
return nil, nil, err
}
return insertLogs, deltaLogs, nil
}
Loading

0 comments on commit 963891f

Please sign in to comment.