From 963891f427fc0310a7f55d99ba307d8d9e30b0f4 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Mon, 4 Dec 2023 17:05:45 +0800 Subject: [PATCH] update import reader for binlog Signed-off-by: bigsheeper --- .../util/importutilv2/binlog/column_reader.go | 42 +------ internal/util/importutilv2/binlog/filter.go | 6 +- internal/util/importutilv2/binlog/reader.go | 107 +++++++++--------- internal/util/importutilv2/binlog/util.go | 95 ++++++++++++++++ internal/util/importutilv2/option.go | 76 +++++++++++++ internal/util/importutilv2/reader.go | 16 +++ 6 files changed, 246 insertions(+), 96 deletions(-) create mode 100644 internal/util/importutilv2/binlog/util.go create mode 100644 internal/util/importutilv2/option.go diff --git a/internal/util/importutilv2/binlog/column_reader.go b/internal/util/importutilv2/binlog/column_reader.go index ebe7552ba5421..fb1b494f50c64 100644 --- a/internal/util/importutilv2/binlog/column_reader.go +++ b/internal/util/importutilv2/binlog/column_reader.go @@ -17,13 +17,9 @@ package binlog import ( - "context" - "fmt" - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/importutilv2" - "github.com/milvus-io/milvus/pkg/util/merr" ) type columnReader struct { @@ -31,7 +27,7 @@ type columnReader struct { fieldSchema *schemapb.FieldSchema } -func NewColumnReader(cm storage.ChunkManager, fieldSchema *schemapb.FieldSchema, path string) (importutilv2.ColumnReader, error) { +func newColumnReader(cm storage.ChunkManager, fieldSchema *schemapb.FieldSchema, path string) (importutilv2.ColumnReader, error) { reader, err := newBinlogReader(cm, path) if err != nil { return nil, err @@ -59,39 +55,3 @@ func (r *columnReader) Next(_ int64) (storage.FieldData, error) { } return fieldData, nil } - -func readData(reader *storage.BinlogReader, et storage.EventTypeCode) ([]any, error) { - result := make([]any, 0) - for { - event, err := reader.NextEventReader() - if err != nil { - return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to iterate events reader, error: %v", err)) - } - if event == nil { - break // end of the file - } - if event.TypeCode != et { - return nil, merr.WrapErrImportFailed(fmt.Sprintf("wrong binlog type, expect:%s, actual:%s", - et.String(), event.TypeCode.String())) - } - data, _, err := event.PayloadReaderInterface.GetDataFromPayload() - if err != nil { - return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to read data, error: %v", err)) - } - result = append(result, data.([]any)...) - } - return result, nil -} - -func newBinlogReader(cm storage.ChunkManager, path string) (*storage.BinlogReader, error) { - bytes, err := cm.Read(context.TODO(), path) // TODO: fix context - if err != nil { - return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to open binlog %s", path)) - } - var reader *storage.BinlogReader - reader, err = storage.NewBinlogReader(bytes) - if err != nil { - return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to create reader, binlog:%s, error:%v", path, err)) - } - return reader, nil -} diff --git a/internal/util/importutilv2/binlog/filter.go b/internal/util/importutilv2/binlog/filter.go index 50904481b617a..06a586234d3c7 100644 --- a/internal/util/importutilv2/binlog/filter.go +++ b/internal/util/importutilv2/binlog/filter.go @@ -30,7 +30,7 @@ func FilterWithDelete(r *reader) (Filter, error) { } return func(row map[int64]interface{}) bool { rowPk := row[pkField.GetFieldID()] - for _, pk := range r.delData.Pks { + for _, pk := range r.deleteData.Pks { if pk.GetValue() == rowPk { return false } @@ -39,9 +39,9 @@ func FilterWithDelete(r *reader) (Filter, error) { }, nil } -func FilterWithTimerange(r *reader) Filter { +func FilterWithTimeRange(tsStart, tsEnd uint64) Filter { return func(row map[int64]interface{}) bool { ts := row[common.TimeStampField].(int64) - return uint64(ts) >= r.tsBegin && uint64(ts) <= r.tsEnd + return uint64(ts) >= tsStart && uint64(ts) <= tsEnd } } diff --git a/internal/util/importutilv2/binlog/reader.go b/internal/util/importutilv2/binlog/reader.go index 92cc50f6b2744..a57408288660f 100644 --- a/internal/util/importutilv2/binlog/reader.go +++ b/internal/util/importutilv2/binlog/reader.go @@ -17,12 +17,12 @@ package binlog import ( - "context" "encoding/json" + "math" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -31,41 +31,53 @@ type reader struct { cm storage.ChunkManager schema *schemapb.CollectionSchema - delData *storage.DeleteData - insertLogs []*datapb.FieldBinlog - readIdx int + deleteData *storage.DeleteData + insertLogs map[int64][]string // fieldID -> binlogs + readIdx int filters []Filter - - tsBegin uint64 - tsEnd uint64 } -func (r *reader) Init(paths []string) error { - insertLogs, deltaLogs, err := r.ListBinlogs(paths) - if err != nil { - return err +func NewReader(cm storage.ChunkManager, + schema *schemapb.CollectionSchema, + paths []string, tsStart, tsEnd uint64) (importutilv2.Reader, error) { + r := &reader{ + cm: cm, + schema: schema, } - r.delData, err = r.ReadDelta(deltaLogs) + err := r.Init(paths, tsStart, tsEnd) if err != nil { - return err + return nil, err } - r.insertLogs = insertLogs - isDeleted, err := FilterWithDelete(r) + return r, nil +} + +func (r *reader) Init(paths []string, tsStart, tsEnd uint64) error { + if tsStart != 0 || tsEnd != math.MaxUint64 { + r.filters = append(r.filters, FilterWithTimeRange(tsStart, tsEnd)) + } + insertLogs, deltaLogs, err := listBinlogs(r.cm, paths) if err != nil { return err } - r.filters = []Filter{ - isDeleted, - FilterWithTimerange(r), + r.insertLogs = insertLogs + if len(deltaLogs) > 0 { + r.deleteData, err = r.ReadDelete(deltaLogs, tsStart, tsEnd) + if err != nil { + return err + } + deleteFilter, err := FilterWithDelete(r) + if err != nil { + return err + } + r.filters = append(r.filters, deleteFilter) } return nil } -func (r *reader) ReadDelta(deltaLogs *datapb.FieldBinlog) (*storage.DeleteData, error) { +func (r *reader) ReadDelete(deltaLogs []string, tsStart, tsEnd uint64) (*storage.DeleteData, error) { deleteData := storage.NewDeleteData(nil, nil) - for _, binlog := range deltaLogs.GetBinlogs() { - path := binlog.GetLogPath() + for _, path := range deltaLogs { reader, err := newBinlogReader(r.cm, path) if err != nil { return nil, err @@ -80,7 +92,7 @@ func (r *reader) ReadDelta(deltaLogs *datapb.FieldBinlog) (*storage.DeleteData, if err != nil { return nil, err } - if dl.Ts >= r.tsBegin && dl.Ts <= r.tsEnd { + if dl.Ts >= tsStart && dl.Ts <= tsEnd { deleteData.Append(dl.Pk, dl.Ts) } } @@ -88,40 +100,21 @@ func (r *reader) ReadDelta(deltaLogs *datapb.FieldBinlog) (*storage.DeleteData, return deleteData, nil } -func (r *reader) ListBinlogs(paths []string) ([]*datapb.FieldBinlog, *datapb.FieldBinlog, error) { - if len(paths) < 1 { - return nil, nil, merr.WrapErrImportFailed("no insert binlogs to import") - } - _, _, err := r.cm.ListWithPrefix(context.Background(), paths[0], true) - if err != nil { - return nil, nil, err - } - // TODO: parse logPaths to fieldBinlog - if len(paths) < 2 { - return nil, nil, nil - } - _, _, err = r.cm.ListWithPrefix(context.Background(), paths[1], true) - if err != nil { - return nil, nil, err - } - return nil, nil, nil -} - func (r *reader) Next(count int64) (*storage.InsertData, error) { insertData, err := storage.NewInsertData(r.schema) if err != nil { return nil, err } - if r.readIdx == len(r.insertLogs[0].GetBinlogs()) { + if r.readIdx == len(r.insertLogs[0]) { return nil, nil } - for _, fieldBinlog := range r.insertLogs { - field := typeutil.GetField(r.schema, fieldBinlog.GetFieldID()) + for fieldID, binlogs := range r.insertLogs { + field := typeutil.GetField(r.schema, fieldID) if field == nil { - return nil, merr.WrapErrFieldNotFound(fieldBinlog.GetFieldID()) + return nil, merr.WrapErrFieldNotFound(fieldID) } - path := fieldBinlog.GetBinlogs()[r.readIdx].GetLogPath() - cr, err := NewColumnReader(r.cm, field, path) + path := binlogs[r.readIdx] + cr, err := newColumnReader(r.cm, field, path) if err != nil { return nil, err } @@ -131,8 +124,19 @@ func (r *reader) Next(count int64) (*storage.InsertData, error) { } insertData.Data[field.GetFieldID()] = fieldData } + insertData, err = r.Filter(insertData) + if err != nil { + return nil, err + } + r.readIdx++ + return insertData, nil +} - res, err := storage.NewInsertData(r.schema) +func (r *reader) Filter(insertData *storage.InsertData) (*storage.InsertData, error) { + if len(r.filters) == 0 { + return insertData, nil + } + result, err := storage.NewInsertData(r.schema) if err != nil { return nil, err } @@ -143,11 +147,10 @@ func (r *reader) Next(count int64) (*storage.InsertData, error) { continue } } - err = res.Append(row) + err = result.Append(row) if err != nil { return nil, err } } - r.readIdx++ - return res, nil + return result, nil } diff --git a/internal/util/importutilv2/binlog/util.go b/internal/util/importutilv2/binlog/util.go new file mode 100644 index 0000000000000..131ee1178e8c1 --- /dev/null +++ b/internal/util/importutilv2/binlog/util.go @@ -0,0 +1,95 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package binlog + +import ( + "context" + "fmt" + "path" + "sort" + "strconv" + + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/util/merr" +) + +func readData(reader *storage.BinlogReader, et storage.EventTypeCode) ([]any, error) { + result := make([]any, 0) + for { + event, err := reader.NextEventReader() + if err != nil { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to iterate events reader, error: %v", err)) + } + if event == nil { + break // end of the file + } + if event.TypeCode != et { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("wrong binlog type, expect:%s, actual:%s", + et.String(), event.TypeCode.String())) + } + data, _, err := event.PayloadReaderInterface.GetDataFromPayload() + if err != nil { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to read data, error: %v", err)) + } + result = append(result, data.([]any)...) + } + return result, nil +} + +func newBinlogReader(cm storage.ChunkManager, path string) (*storage.BinlogReader, error) { + bytes, err := cm.Read(context.TODO(), path) // TODO: dyh, resolve context, and checks if the error is a retryable error + if err != nil { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to open binlog %s", path)) + } + var reader *storage.BinlogReader + reader, err = storage.NewBinlogReader(bytes) + if err != nil { + return nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to create reader, binlog:%s, error:%v", path, err)) + } + return reader, nil +} + +func listBinlogs(cm storage.ChunkManager, paths []string) (map[int64][]string, []string, error) { + if len(paths) < 1 { + return nil, nil, merr.WrapErrImportFailed("no insert binlogs to import") + } + insertLogPaths, _, err := cm.ListWithPrefix(context.Background(), paths[0], true) + if err != nil { + return nil, nil, err + } + insertLogs := make(map[int64][]string) + for _, logPath := range insertLogPaths { + fieldPath := path.Dir(logPath) + fieldStrID := path.Base(fieldPath) + fieldID, err := strconv.ParseInt(fieldStrID, 10, 64) + if err != nil { + return nil, nil, merr.WrapErrImportFailed(fmt.Sprintf("failed to parse field id from log, error: %v", err)) + } + insertLogs[fieldID] = append(insertLogs[fieldID], logPath) + } + for _, v := range insertLogs { + sort.Strings(v) + } + if len(paths) < 2 { + return insertLogs, nil, nil + } + deltaLogs, _, err := cm.ListWithPrefix(context.Background(), paths[1], true) + if err != nil { + return nil, nil, err + } + return insertLogs, deltaLogs, nil +} diff --git a/internal/util/importutilv2/option.go b/internal/util/importutilv2/option.go new file mode 100644 index 0000000000000..72f3d4f4c10ca --- /dev/null +++ b/internal/util/importutilv2/option.go @@ -0,0 +1,76 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importutilv2 + +import ( + "fmt" + "math" + "strconv" + "strings" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/tsoutil" +) + +const ( + StartTs = "start_ts" + EndTs = "end_ts" + BackupFlag = "backup" +) + +type Options []*commonpb.KeyValuePair + +func ParseTimeRange(options Options) (uint64, uint64, error) { + var tsStart uint64 + var tsEnd uint64 + importOptions := funcutil.KeyValuePair2Map(options) + value, ok := importOptions[StartTs] + if ok { + pTs, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return 0, 0, merr.WrapErrImportFailed(fmt.Sprintf("parse start_ts failed, err=%s", err)) + } + tsStart = tsoutil.ComposeTS(pTs, 0) + } else { + tsStart = 0 + } + value, ok = importOptions[EndTs] + if ok { + pTs, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return 0, 0, merr.WrapErrImportFailed(fmt.Sprintf("parse end_ts failed, err=%s", err)) + } + tsEnd = tsoutil.ComposeTS(pTs, 0) + } else { + tsEnd = math.MaxUint64 + } + if tsStart > tsEnd { + return 0, 0, merr.WrapErrImportFailed( + fmt.Sprintf("start_ts shouldn't be larger than end_ts, start_ts:%d, end_ts:%d", tsStart, tsEnd)) + } + return tsStart, tsEnd, nil +} + +func IsBackup(options Options) bool { + isBackup, err := funcutil.GetAttrByKeyFromRepeatedKV(BackupFlag, options) + if err != nil || strings.ToLower(isBackup) != "true" { + return false + } + return true +} diff --git a/internal/util/importutilv2/reader.go b/internal/util/importutilv2/reader.go index 00483d1bd90af..f8a45209d6495 100644 --- a/internal/util/importutilv2/reader.go +++ b/internal/util/importutilv2/reader.go @@ -17,7 +17,9 @@ package importutilv2 import ( + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/importutilv2/binlog" ) type Reader interface { @@ -32,3 +34,17 @@ type ColumnReader interface { Next(count int64) (storage.FieldData, error) } + +func NewReader(cm storage.ChunkManager, + schema *schemapb.CollectionSchema, + paths []string, options Options) (Reader, error) { + if IsBackup(options) { + tsStart, tsEnd, err := ParseTimeRange(options) + if err != nil { + return nil, err + } + return binlog.NewReader(cm, schema, paths, tsStart, tsEnd) + } + + return nil, nil +}