diff --git a/Makefile b/Makefile index 13b7bc9e3ecba..5ad8b316f812f 100644 --- a/Makefile +++ b/Makefile @@ -333,8 +333,8 @@ mock_s3iface: @mockgen -package mock github.com/aws/aws-sdk-go/service/s3/s3iface S3API > br/pkg/mock/s3iface.go mock_lightning: - @mockgen -package mock -mock_names AbstractBackend=MockBackend github.com/pingcap/tidb/br/pkg/lightning/backend AbstractBackend,EngineWriter > br/pkg/mock/backend.go - @mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend/encode Encoder,Rows,Row > br/pkg/mock/encode.go + @mockgen -package mock -mock_names AbstractBackend=MockBackend github.com/pingcap/tidb/br/pkg/lightning/backend AbstractBackend,EngineWriter,TargetInfoGetter > br/pkg/mock/backend.go + @mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend/encode Encoder,EncodingBuilder,Rows,Row > br/pkg/mock/encode.go # There is no FreeBSD environment for GitHub actions. So cross-compile on Linux # but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have diff --git a/br/pkg/lightning/backend/backend.go b/br/pkg/lightning/backend/backend.go index 0ccde961bb165..d595b18a8c1b3 100644 --- a/br/pkg/lightning/backend/backend.go +++ b/br/pkg/lightning/backend/backend.go @@ -149,8 +149,6 @@ type TargetInfoGetter interface { // Implementations of this interface must be goroutine safe: you can share an // instance and execute any method anywhere. type AbstractBackend interface { - encode.EncodingBuilder - TargetInfoGetter // Close the connection to the backend. Close() @@ -261,26 +259,10 @@ func (be Backend) Close() { be.abstract.Close() } -func (be Backend) MakeEmptyRows() encode.Rows { - return be.abstract.MakeEmptyRows() -} - -func (be Backend) NewEncoder(ctx context.Context, config *encode.EncodingConfig) (encode.Encoder, error) { - return be.abstract.NewEncoder(ctx, config) -} - func (be Backend) ShouldPostProcess() bool { return be.abstract.ShouldPostProcess() } -func (be Backend) CheckRequirements(ctx context.Context, checkCtx *CheckCtx) error { - return be.abstract.CheckRequirements(ctx, checkCtx) -} - -func (be Backend) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) { - return be.abstract.FetchRemoteTableModels(ctx, schemaName) -} - func (be Backend) FlushAll(ctx context.Context) error { return be.abstract.FlushAllEngines(ctx) } diff --git a/br/pkg/lightning/backend/backend_test.go b/br/pkg/lightning/backend/backend_test.go index 5a8bc5f34b580..4b96d1fe6f0ae 100644 --- a/br/pkg/lightning/backend/backend_test.go +++ b/br/pkg/lightning/backend/backend_test.go @@ -21,6 +21,7 @@ import ( type backendSuite struct { controller *gomock.Controller mockBackend *mock.MockBackend + encBuilder *mock.MockEncodingBuilder backend backend.Backend ts uint64 } @@ -32,6 +33,7 @@ func createBackendSuite(c gomock.TestReporter) *backendSuite { controller: controller, mockBackend: mockBackend, backend: backend.MakeBackend(mockBackend), + encBuilder: mock.NewMockEncodingBuilder(controller), ts: oracle.ComposeTS(time.Now().Unix()*1000, 0), } } @@ -316,8 +318,8 @@ func TestMakeEmptyRows(t *testing.T) { defer s.tearDownTest() rows := mock.NewMockRows(s.controller) - s.mockBackend.EXPECT().MakeEmptyRows().Return(rows) - require.Equal(t, rows, s.mockBackend.MakeEmptyRows()) + s.encBuilder.EXPECT().MakeEmptyRows().Return(rows) + require.Equal(t, rows, s.encBuilder.MakeEmptyRows()) } func TestNewEncoder(t *testing.T) { @@ -328,9 +330,9 @@ func TestNewEncoder(t *testing.T) { options := &encode.EncodingConfig{ SessionOptions: encode.SessionOptions{SQLMode: mysql.ModeANSIQuotes, Timestamp: 1234567890}, } - s.mockBackend.EXPECT().NewEncoder(nil, options).Return(encoder, nil) + s.encBuilder.EXPECT().NewEncoder(nil, options).Return(encoder, nil) - realEncoder, err := s.mockBackend.NewEncoder(nil, options) + realEncoder, err := s.encBuilder.NewEncoder(nil, options) require.Equal(t, realEncoder, encoder) require.NoError(t, err) } diff --git a/br/pkg/lightning/backend/kv/kv2sql.go b/br/pkg/lightning/backend/kv/kv2sql.go index d7f087edd020e..3e41161ef2e54 100644 --- a/br/pkg/lightning/backend/kv/kv2sql.go +++ b/br/pkg/lightning/backend/kv/kv2sql.go @@ -29,7 +29,7 @@ import ( type TableKVDecoder struct { tbl table.Table - se *session + se *Session // tableName is the unique table name in the form "`db`.`tbl`". tableName string genCols []genCol @@ -91,7 +91,7 @@ func (t *TableKVDecoder) IterRawIndexKeys(h kv.Handle, rawRow []byte, fn func([] if err != nil { return err } - iter := index.GenIndexKVIter(t.se.vars.StmtCtx, indexValues, h, nil) + iter := index.GenIndexKVIter(t.se.Vars.StmtCtx, indexValues, h, nil) for iter.Valid() { indexKey, _, _, err := iter.Next(indexBuffer) if err != nil { @@ -115,7 +115,7 @@ func NewTableKVDecoder( options *encode.SessionOptions, logger log.Logger, ) (*TableKVDecoder, error) { - se := newSession(options, logger) + se := NewSession(options, logger) cols := tbl.Cols() // Set CommonAddRecordCtx to session to reuse the slices and BufStore in AddRecord recordCtx := tables.NewCommonAddRecordCtx(len(cols)) diff --git a/br/pkg/lightning/backend/kv/session.go b/br/pkg/lightning/backend/kv/session.go index dcfa5570e2bd8..e29e78ea66b84 100644 --- a/br/pkg/lightning/backend/kv/session.go +++ b/br/pkg/lightning/backend/kv/session.go @@ -54,43 +54,45 @@ func (*invalidIterator) Valid() bool { func (*invalidIterator) Close() { } -type bytesBuf struct { +// BytesBuf bytes buffer. +type BytesBuf struct { buf []byte idx int cap int } -func (b *bytesBuf) add(v []byte) []byte { +func (b *BytesBuf) add(v []byte) []byte { start := b.idx copy(b.buf[start:], v) b.idx += len(v) return b.buf[start:b.idx:b.idx] } -func newBytesBuf(size int) *bytesBuf { - return &bytesBuf{ +func newBytesBuf(size int) *BytesBuf { + return &BytesBuf{ buf: manual.New(size), cap: size, } } -func (b *bytesBuf) destroy() { +func (b *BytesBuf) destroy() { if b != nil { manual.Free(b.buf) b.buf = nil } } -type kvMemBuf struct { +// MemBuf used to store the data in memory. +type MemBuf struct { sync.Mutex kv.MemBuffer - buf *bytesBuf - availableBufs []*bytesBuf + buf *BytesBuf + availableBufs []*BytesBuf kvPairs *KvPairs size int } -func (mb *kvMemBuf) Recycle(buf *bytesBuf) { +func (mb *MemBuf) Recycle(buf *BytesBuf) { buf.idx = 0 buf.cap = len(buf.buf) mb.Lock() @@ -104,11 +106,11 @@ func (mb *kvMemBuf) Recycle(buf *bytesBuf) { mb.Unlock() } -func (mb *kvMemBuf) AllocateBuf(size int) { +func (mb *MemBuf) AllocateBuf(size int) { mb.Lock() size = mathutil.Max(units.MiB, int(utils.NextPowerOfTwo(int64(size)))*2) var ( - existingBuf *bytesBuf + existingBuf *BytesBuf existingBufIdx int ) for i, buf := range mb.availableBufs { @@ -128,16 +130,16 @@ func (mb *kvMemBuf) AllocateBuf(size int) { mb.Unlock() } -func (mb *kvMemBuf) Set(k kv.Key, v []byte) error { +func (mb *MemBuf) Set(k kv.Key, v []byte) error { kvPairs := mb.kvPairs size := len(k) + len(v) if mb.buf == nil || mb.buf.cap-mb.buf.idx < size { if mb.buf != nil { - kvPairs.bytesBuf = mb.buf + kvPairs.BytesBuf = mb.buf } mb.AllocateBuf(size) } - kvPairs.pairs = append(kvPairs.pairs, common.KvPair{ + kvPairs.Pairs = append(kvPairs.Pairs, common.KvPair{ Key: mb.buf.add(k), Val: mb.buf.add(v), }) @@ -145,28 +147,28 @@ func (mb *kvMemBuf) Set(k kv.Key, v []byte) error { return nil } -func (mb *kvMemBuf) SetWithFlags(k kv.Key, v []byte, ops ...kv.FlagsOp) error { +func (mb *MemBuf) SetWithFlags(k kv.Key, v []byte, ops ...kv.FlagsOp) error { return mb.Set(k, v) } -func (mb *kvMemBuf) Delete(k kv.Key) error { +func (mb *MemBuf) Delete(k kv.Key) error { return errors.New("unsupported operation") } // Release publish all modifications in the latest staging buffer to upper level. -func (mb *kvMemBuf) Release(h kv.StagingHandle) { +func (mb *MemBuf) Release(h kv.StagingHandle) { } -func (mb *kvMemBuf) Staging() kv.StagingHandle { +func (mb *MemBuf) Staging() kv.StagingHandle { return 0 } -// Cleanup cleanup the resources referenced by the StagingHandle. +// Cleanup the resources referenced by the StagingHandle. // If the changes are not published by `Release`, they will be discarded. -func (mb *kvMemBuf) Cleanup(h kv.StagingHandle) {} +func (mb *MemBuf) Cleanup(h kv.StagingHandle) {} // Size returns sum of keys and values length. -func (mb *kvMemBuf) Size() int { +func (mb *MemBuf) Size() int { return mb.size } @@ -176,11 +178,11 @@ func (t *transaction) Len() int { } type kvUnionStore struct { - kvMemBuf + MemBuf } func (s *kvUnionStore) GetMemBuffer() kv.MemBuffer { - return &s.kvMemBuf + return &s.MemBuf } func (s *kvUnionStore) GetIndexName(tableID, indexID int64) string { @@ -201,7 +203,7 @@ type transaction struct { } func (t *transaction) GetMemBuffer() kv.MemBuffer { - return &t.kvUnionStore.kvMemBuf + return &t.kvUnionStore.MemBuf } func (t *transaction) Discard() { @@ -228,7 +230,7 @@ func (t *transaction) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) { // Set implements the kv.Mutator interface func (t *transaction) Set(k kv.Key, v []byte) error { - return t.kvMemBuf.Set(k, v) + return t.MemBuf.Set(k, v) } // GetTableInfo implements the kv.Transaction interface. @@ -245,24 +247,24 @@ func (t *transaction) SetAssertion(key []byte, assertion ...kv.FlagsOp) error { return nil } -// session is a trimmed down Session type which only wraps our own trimmed-down +// Session is a trimmed down Session type which only wraps our own trimmed-down // transaction type and provides the session variables to the TiDB library // optimized for Lightning. -type session struct { +type Session struct { sessionctx.Context txn transaction - vars *variable.SessionVars + Vars *variable.SessionVars // currently, we only set `CommonAddRecordCtx` values map[fmt.Stringer]interface{} } -// NewSession creates a new trimmed down Session matching the options. -func NewSession(options *encode.SessionOptions, logger log.Logger) sessionctx.Context { - return newSession(options, logger) +// NewSessionCtx creates a new trimmed down Session matching the options. +func NewSessionCtx(options *encode.SessionOptions, logger log.Logger) sessionctx.Context { + return NewSession(options, logger) } -func newSession(options *encode.SessionOptions, logger log.Logger) *session { - s := &session{ +func NewSession(options *encode.SessionOptions, logger log.Logger) *Session { + s := &Session{ values: make(map[fmt.Stringer]interface{}, 1), } sqlMode := options.SQLMode @@ -301,68 +303,68 @@ func newSession(options *encode.SessionOptions, logger log.Logger) *session { log.ShortError(err)) } vars.TxnCtx = nil - s.vars = vars + s.Vars = vars s.txn.kvPairs = &KvPairs{} return s } -func (se *session) takeKvPairs() *KvPairs { - memBuf := &se.txn.kvMemBuf +func (se *Session) TakeKvPairs() *KvPairs { + memBuf := &se.txn.MemBuf pairs := memBuf.kvPairs - if pairs.bytesBuf != nil { - pairs.memBuf = memBuf + if pairs.BytesBuf != nil { + pairs.MemBuf = memBuf } - memBuf.kvPairs = &KvPairs{pairs: make([]common.KvPair, 0, len(pairs.pairs))} + memBuf.kvPairs = &KvPairs{Pairs: make([]common.KvPair, 0, len(pairs.Pairs))} memBuf.size = 0 return pairs } // Txn implements the sessionctx.Context interface -func (se *session) Txn(active bool) (kv.Transaction, error) { +func (se *Session) Txn(active bool) (kv.Transaction, error) { return &se.txn, nil } // GetSessionVars implements the sessionctx.Context interface -func (se *session) GetSessionVars() *variable.SessionVars { - return se.vars +func (se *Session) GetSessionVars() *variable.SessionVars { + return se.Vars } // SetValue saves a value associated with this context for key. -func (se *session) SetValue(key fmt.Stringer, value interface{}) { +func (se *Session) SetValue(key fmt.Stringer, value interface{}) { se.values[key] = value } // Value returns the value associated with this context for key. -func (se *session) Value(key fmt.Stringer) interface{} { +func (se *Session) Value(key fmt.Stringer) interface{} { return se.values[key] } // StmtAddDirtyTableOP implements the sessionctx.Context interface -func (se *session) StmtAddDirtyTableOP(op int, physicalID int64, handle kv.Handle) {} +func (se *Session) StmtAddDirtyTableOP(op int, physicalID int64, handle kv.Handle) {} // GetInfoSchema implements the sessionctx.Context interface. -func (se *session) GetInfoSchema() sessionctx.InfoschemaMetaVersion { +func (se *Session) GetInfoSchema() sessionctx.InfoschemaMetaVersion { return nil } // GetBuiltinFunctionUsage returns the BuiltinFunctionUsage of current Context, which is not thread safe. // Use primitive map type to prevent circular import. Should convert it to telemetry.BuiltinFunctionUsage before using. -func (se *session) GetBuiltinFunctionUsage() map[string]uint32 { +func (se *Session) GetBuiltinFunctionUsage() map[string]uint32 { return make(map[string]uint32) } // BuiltinFunctionUsageInc implements the sessionctx.Context interface. -func (se *session) BuiltinFunctionUsageInc(scalarFuncSigName string) { +func (se *Session) BuiltinFunctionUsageInc(scalarFuncSigName string) { } // GetStmtStats implements the sessionctx.Context interface. -func (se *session) GetStmtStats() *stmtstats.StatementStats { +func (se *Session) GetStmtStats() *stmtstats.StatementStats { return nil } -func (se *session) Close() { - memBuf := &se.txn.kvMemBuf +func (se *Session) Close() { + memBuf := &se.txn.MemBuf if memBuf.buf != nil { memBuf.buf.destroy() memBuf.buf = nil diff --git a/br/pkg/lightning/backend/kv/session_internal_test.go b/br/pkg/lightning/backend/kv/session_internal_test.go index 97ebd8cc82d1b..d70f2827a7916 100644 --- a/br/pkg/lightning/backend/kv/session_internal_test.go +++ b/br/pkg/lightning/backend/kv/session_internal_test.go @@ -77,7 +77,7 @@ func TestKVMemBufInterweaveAllocAndRecycle(t *testing.T) { }, }, } { - testKVMemBuf := &kvMemBuf{} + testKVMemBuf := &MemBuf{} for _, allocSize := range tc.AllocSizes { testKVMemBuf.AllocateBuf(allocSize) testKVMemBuf.Recycle(testKVMemBuf.buf) @@ -94,8 +94,8 @@ func TestKVMemBufBatchAllocAndRecycle(t *testing.T) { AllocSizes []int FinalAvailableByteBufCaps []int } - testKVMemBuf := &kvMemBuf{} - bBufs := []*bytesBuf{} + testKVMemBuf := &MemBuf{} + bBufs := []*BytesBuf{} for i := 0; i < maxAvailableBufSize; i++ { testKVMemBuf.AllocateBuf(1 * units.MiB) bBufs = append(bBufs, testKVMemBuf.buf) diff --git a/br/pkg/lightning/backend/kv/session_test.go b/br/pkg/lightning/backend/kv/session_test.go index 5f7c220ed34e2..a037661cc3b03 100644 --- a/br/pkg/lightning/backend/kv/session_test.go +++ b/br/pkg/lightning/backend/kv/session_test.go @@ -25,7 +25,7 @@ import ( ) func TestSession(t *testing.T) { - session := kv.NewSession(&encode.SessionOptions{SQLMode: mysql.ModeNone, Timestamp: 1234567890}, log.L()) + session := kv.NewSessionCtx(&encode.SessionOptions{SQLMode: mysql.ModeNone, Timestamp: 1234567890}, log.L()) _, err := session.Txn(true) require.NoError(t, err) } diff --git a/br/pkg/lightning/backend/kv/sql2kv.go b/br/pkg/lightning/backend/kv/sql2kv.go index 0aed4727409a8..2ae589fae7bcb 100644 --- a/br/pkg/lightning/backend/kv/sql2kv.go +++ b/br/pkg/lightning/backend/kv/sql2kv.go @@ -58,7 +58,7 @@ type autoIDConverter func(int64) int64 type tableKVEncoder struct { tbl table.Table autoRandomColID int64 - se *session + se *Session recordCache []types.Datum genCols []genCol // convert auto id for shard rowid or auto random id base on row id generated by lightning @@ -80,7 +80,7 @@ func NewTableKVEncoder( } meta := config.Table.Meta() cols := config.Table.Cols() - se := newSession(&config.SessionOptions, config.Logger) + se := NewSession(&config.SessionOptions, config.Logger) // Set CommonAddRecordCtx to session to reuse the slices and BufStore in AddRecord recordCtx := tables.NewCommonAddRecordCtx(len(cols)) tables.SetAddRecordCtx(se, recordCtx) @@ -126,7 +126,7 @@ func NewTableKVEncoder( // collectGeneratedColumns collects all expressions required to evaluate the // results of all generated columns. The returning slice is in evaluation order. -func collectGeneratedColumns(se *session, meta *model.TableInfo, cols []*table.Column) ([]genCol, error) { +func collectGeneratedColumns(se *Session, meta *model.TableInfo, cols []*table.Column) ([]genCol, error) { hasGenCol := false for _, col := range cols { if col.GeneratedExpr != nil { @@ -140,9 +140,9 @@ func collectGeneratedColumns(se *session, meta *model.TableInfo, cols []*table.C } // the expression rewriter requires a non-nil TxnCtx. - se.vars.TxnCtx = new(variable.TransactionContext) + se.Vars.TxnCtx = new(variable.TransactionContext) defer func() { - se.vars.TxnCtx = nil + se.Vars.TxnCtx = nil }() // not using TableInfo2SchemaAndNames to avoid parsing all virtual generated columns again. @@ -290,23 +290,23 @@ func logEvalGenExprFailed(logger log.Logger, row []types.Datum, colInfo *model.C } type KvPairs struct { - pairs []common.KvPair - bytesBuf *bytesBuf - memBuf *kvMemBuf + Pairs []common.KvPair + BytesBuf *BytesBuf + MemBuf *MemBuf } // MakeRowsFromKvPairs converts a KvPair slice into a Rows instance. This is // mainly used for testing only. The resulting Rows instance should only be used // for the importer backend. func MakeRowsFromKvPairs(pairs []common.KvPair) encode.Rows { - return &KvPairs{pairs: pairs} + return &KvPairs{Pairs: pairs} } // MakeRowFromKvPairs converts a KvPair slice into a Row instance. This is // mainly used for testing only. The resulting Row instance should only be used // for the importer backend. func MakeRowFromKvPairs(pairs []common.KvPair) encode.Row { - return &KvPairs{pairs: pairs} + return &KvPairs{Pairs: pairs} } // KvPairsFromRows converts a Rows instance constructed from MakeRowsFromKvPairs @@ -314,7 +314,7 @@ func MakeRowFromKvPairs(pairs []common.KvPair) encode.Row { // constructed in such way. // nolint:golint // kv.KvPairsFromRows sounds good. func KvPairsFromRows(rows encode.Rows) []common.KvPair { - return rows.(*KvPairs).pairs + return rows.(*KvPairs).Pairs } // KvPairsFromRow converts a Row instance constructed from MakeRowFromKvPairs @@ -322,10 +322,10 @@ func KvPairsFromRows(rows encode.Rows) []common.KvPair { // constructed in such way. // nolint:golint // kv.KvPairsFromRow sounds good. func KvPairsFromRow(row encode.Row) []common.KvPair { - return row.(*KvPairs).pairs + return row.(*KvPairs).Pairs } -func evaluateGeneratedColumns(se *session, record []types.Datum, cols []*table.Column, genCols []genCol) (errCol *model.ColumnInfo, err error) { +func evaluateGeneratedColumns(se *Session, record []types.Datum, cols []*table.Column, genCols []genCol) (errCol *model.ColumnInfo, err error) { mutRow := chunk.MutRowFromDatums(record) for _, gc := range genCols { col := cols[gc.index].ToInfo() @@ -426,10 +426,10 @@ func (kvcodec *tableKVEncoder) Encode(row []types.Datum, rowID int64, columnPerm ) return nil, errors.Trace(err) } - kvPairs := kvcodec.se.takeKvPairs() - for i := 0; i < len(kvPairs.pairs); i++ { + kvPairs := kvcodec.se.TakeKvPairs() + for i := 0; i < len(kvPairs.Pairs); i++ { var encoded [9]byte // The max length of encoded int64 is 9. - kvPairs.pairs[i].RowID = common.EncodeIntRowIDToBuf(encoded[:0], rowID) + kvPairs.Pairs[i].RowID = common.EncodeIntRowIDToBuf(encoded[:0], rowID) } kvcodec.recordCache = record[:0] return kvPairs, nil @@ -449,7 +449,7 @@ func GetEncoderIncrementalID(encoder encode.Encoder, id int64) int64 { } // GetEncoderSe return session. -func GetEncoderSe(encoder encode.Encoder) *session { +func GetEncoderSe(encoder encode.Encoder) *Session { return encoder.(*tableKVEncoder).se } @@ -500,7 +500,7 @@ func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDa // if MutRowFromDatums sees a nil it won't initialize the underlying storage and cause SetDatum to panic. value = types.GetMinValue(&col.FieldType) case isBadNullValue: - err = col.HandleBadNull(&value, kvcodec.se.vars.StmtCtx, 0) + err = col.HandleBadNull(&value, kvcodec.se.Vars.StmtCtx, 0) default: value, err = table.GetColDefaultValue(kvcodec.se, col.ToInfo()) } @@ -523,7 +523,7 @@ func getAutoRecordID(d types.Datum, target *types.FieldType) int64 { func (kvs *KvPairs) Size() uint64 { size := uint64(0) - for _, kv := range kvs.pairs { + for _, kv := range kvs.Pairs { size += uint64(len(kv.Key) + len(kv.Val)) } return size @@ -538,22 +538,22 @@ func (kvs *KvPairs) ClassifyAndAppend( dataKVs := (*data).(*KvPairs) indexKVs := (*indices).(*KvPairs) - for _, kv := range kvs.pairs { + for _, kv := range kvs.Pairs { if kv.Key[tablecodec.TableSplitKeyLen+1] == 'r' { - dataKVs.pairs = append(dataKVs.pairs, kv) + dataKVs.Pairs = append(dataKVs.Pairs, kv) dataChecksum.UpdateOne(kv) } else { - indexKVs.pairs = append(indexKVs.pairs, kv) + indexKVs.Pairs = append(indexKVs.Pairs, kv) indexChecksum.UpdateOne(kv) } } // the related buf is shared, so we only need to set it into one of the kvs so it can be released - if kvs.bytesBuf != nil { - dataKVs.bytesBuf = kvs.bytesBuf - dataKVs.memBuf = kvs.memBuf - kvs.bytesBuf = nil - kvs.memBuf = nil + if kvs.BytesBuf != nil { + dataKVs.BytesBuf = kvs.BytesBuf + dataKVs.MemBuf = kvs.MemBuf + kvs.BytesBuf = nil + kvs.MemBuf = nil } *data = dataKVs @@ -561,17 +561,17 @@ func (kvs *KvPairs) ClassifyAndAppend( } func (kvs *KvPairs) SplitIntoChunks(splitSize int) []encode.Rows { - if len(kvs.pairs) == 0 { + if len(kvs.Pairs) == 0 { return nil } res := make([]encode.Rows, 0, 1) i := 0 cumSize := 0 - for j, pair := range kvs.pairs { + for j, pair := range kvs.Pairs { size := len(pair.Key) + len(pair.Val) if i < j && cumSize+size > splitSize { - res = append(res, &KvPairs{pairs: kvs.pairs[i:j]}) + res = append(res, &KvPairs{Pairs: kvs.Pairs[i:j]}) i = j cumSize = 0 } @@ -582,20 +582,20 @@ func (kvs *KvPairs) SplitIntoChunks(splitSize int) []encode.Rows { res = append(res, kvs) } else { res = append(res, &KvPairs{ - pairs: kvs.pairs[i:], - bytesBuf: kvs.bytesBuf, - memBuf: kvs.memBuf, + Pairs: kvs.Pairs[i:], + BytesBuf: kvs.BytesBuf, + MemBuf: kvs.MemBuf, }) } return res } func (kvs *KvPairs) Clear() encode.Rows { - if kvs.bytesBuf != nil { - kvs.memBuf.Recycle(kvs.bytesBuf) - kvs.bytesBuf = nil - kvs.memBuf = nil + if kvs.BytesBuf != nil { + kvs.MemBuf.Recycle(kvs.BytesBuf) + kvs.BytesBuf = nil + kvs.MemBuf = nil } - kvs.pairs = kvs.pairs[:0] + kvs.Pairs = kvs.Pairs[:0] return kvs } diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel index 9c0f7e9f496b7..2f6fac48bbb7c 100644 --- a/br/pkg/lightning/backend/local/BUILD.bazel +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -108,7 +108,6 @@ go_test( "//br/pkg/lightning/backend/encode", "//br/pkg/lightning/backend/kv", "//br/pkg/lightning/common", - "//br/pkg/lightning/glue", "//br/pkg/lightning/log", "//br/pkg/lightning/mydump", "//br/pkg/membuf", @@ -122,7 +121,6 @@ go_test( "//parser", "//parser/ast", "//parser/model", - "//parser/mysql", "//sessionctx/stmtctx", "//store/pdtypes", "//table/tables", diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index dfdd75fcfe790..0fbebc948c68e 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -386,13 +386,13 @@ func checkTiFlashVersion(ctx context.Context, g glue.Glue, checkCtx *backend.Che type local struct { engines sync.Map // sync version of map[uuid.UUID]*Engine - pdCtl *pdutil.PdController - splitCli split.SplitClient - tikvCli *tikvclient.KVStore - tls *common.TLS - pdAddr string - g glue.Glue - tikvCodec tikvclient.Codec + pdCtl *pdutil.PdController + splitCli split.SplitClient + tikvCli *tikvclient.KVStore + tls *common.TLS + pdAddr string + regionSizeGetter TableRegionSizeGetter + tikvCodec tikvclient.Codec localStoreDir string @@ -420,9 +420,6 @@ type local struct { writeLimiter StoreWriteLimiter logger log.Logger - encBuilder encode.EncodingBuilder - targetInfoGetter backend.TargetInfoGetter - // When TiKV is in normal mode, ingesting too many SSTs will cause TiKV write stall. // To avoid this, we should check write stall before ingesting SSTs. Note that, we // must check both leader node and followers in client side, because followers will @@ -453,11 +450,10 @@ func NewLocalBackend( ctx context.Context, tls *common.TLS, cfg *config.Config, - g glue.Glue, + regionSizeGetter TableRegionSizeGetter, maxOpenFiles int, errorMgr *errormanager.ErrorManager, keyspaceName string, - encodingBuilder encode.EncodingBuilder, ) (backend.Backend, error) { localFile := cfg.TikvImporter.SortedKVDir rangeConcurrency := cfg.TikvImporter.RangeConcurrency @@ -534,14 +530,14 @@ func NewLocalBackend( LastAlloc = alloc } local := &local{ - engines: sync.Map{}, - pdCtl: pdCtl, - splitCli: splitCli, - tikvCli: tikvCli, - tls: tls, - pdAddr: cfg.TiDB.PdAddr, - g: g, - tikvCodec: tikvCodec, + engines: sync.Map{}, + pdCtl: pdCtl, + splitCli: splitCli, + tikvCli: tikvCli, + tls: tls, + pdAddr: cfg.TiDB.PdAddr, + regionSizeGetter: regionSizeGetter, + tikvCodec: tikvCodec, localStoreDir: localFile, workerConcurrency: rangeConcurrency * 2, @@ -562,8 +558,6 @@ func NewLocalBackend( bufferPool: membuf.NewPool(membuf.WithAllocator(alloc)), writeLimiter: writeLimiter, logger: log.FromContext(ctx), - encBuilder: encodingBuilder, - targetInfoGetter: NewTargetInfoGetter(tls, g, cfg.TiDB.PdAddr), shouldCheckWriteStall: cfg.Cron.SwitchMode.Duration == 0, } if m, ok := metric.FromContext(ctx); ok { @@ -1669,22 +1663,6 @@ func (local *local) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) err return nil } -func (local *local) CheckRequirements(ctx context.Context, checkCtx *backend.CheckCtx) error { - return local.targetInfoGetter.CheckRequirements(ctx, checkCtx) -} - -func (local *local) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) { - return local.targetInfoGetter.FetchRemoteTableModels(ctx, schemaName) -} - -func (local *local) MakeEmptyRows() encode.Rows { - return local.encBuilder.MakeEmptyRows() -} - -func (local *local) NewEncoder(ctx context.Context, config *encode.EncodingConfig) (encode.Encoder, error) { - return local.encBuilder.NewEncoder(ctx, config) -} - func engineSSTDir(storeDir string, engineUUID uuid.UUID) string { return filepath.Join(storeDir, engineUUID.String()+".sst") } diff --git a/br/pkg/lightning/backend/local/localhelper.go b/br/pkg/lightning/backend/local/localhelper.go index 3e50a6c1ba554..379ac0e4d9c54 100644 --- a/br/pkg/lightning/backend/local/localhelper.go +++ b/br/pkg/lightning/backend/local/localhelper.go @@ -64,6 +64,51 @@ var ( splitRetryTimes = 8 ) +// TableRegionSizeGetter get table region size. +type TableRegionSizeGetter interface { + GetTableRegionSize(ctx context.Context, tableID int64) (map[uint64]int64, error) +} + +// TableRegionSizeGetterImpl implements TableRegionSizeGetter. +type TableRegionSizeGetterImpl struct { + DB *sql.DB +} + +var _ TableRegionSizeGetter = &TableRegionSizeGetterImpl{} + +// GetTableRegionSize implements TableRegionSizeGetter. +func (g *TableRegionSizeGetterImpl) GetTableRegionSize(ctx context.Context, tableID int64) (map[uint64]int64, error) { + if g.DB == nil { + return nil, errors.Errorf("db is nil") + } + exec := &common.SQLWithRetry{ + DB: g.DB, + Logger: log.FromContext(ctx), + } + + stats := make(map[uint64]int64) + err := exec.Transact(ctx, "fetch region approximate sizes", func(ctx context.Context, tx *sql.Tx) error { + rows, err := tx.QueryContext(ctx, "SELECT REGION_ID, APPROXIMATE_SIZE FROM information_schema.TIKV_REGION_STATUS WHERE TABLE_ID = ?", tableID) + if err != nil { + return errors.Trace(err) + } + //nolint: errcheck + defer rows.Close() + var ( + regionID uint64 + size int64 + ) + for rows.Next() { + if err = rows.Scan(®ionID, &size); err != nil { + return errors.Trace(err) + } + stats[regionID] = size * units.MiB + } + return rows.Err() + }) + return stats, errors.Trace(err) +} + // SplitAndScatterRegionInBatches splits&scatter regions in batches. // Too many split&scatter requests may put a lot of pressure on TiKV and PD. func (local *local) SplitAndScatterRegionInBatches( @@ -101,10 +146,7 @@ func (local *local) SplitAndScatterRegionByRanges( return nil } - db, err := local.g.GetDB() - if err != nil { - return errors.Trace(err) - } + var err error minKey := codec.EncodeBytes([]byte{}, ranges[0].start) maxKey := codec.EncodeBytes([]byte{}, ranges[len(ranges)-1].end) @@ -176,7 +218,7 @@ func (local *local) SplitAndScatterRegionByRanges( var tableRegionStats map[uint64]int64 if tableInfo != nil { - tableRegionStats, err = fetchTableRegionSizeStats(ctx, db, tableInfo.ID) + tableRegionStats, err = local.regionSizeGetter.GetTableRegionSize(ctx, tableInfo.ID) if err != nil { log.FromContext(ctx).Warn("fetch table region size statistics failed", zap.String("table", tableInfo.Name), zap.Error(err)) @@ -350,38 +392,6 @@ func (local *local) SplitAndScatterRegionByRanges( return nil } -func fetchTableRegionSizeStats(ctx context.Context, db *sql.DB, tableID int64) (map[uint64]int64, error) { - if db == nil { - return nil, errors.Errorf("db is nil") - } - exec := &common.SQLWithRetry{ - DB: db, - Logger: log.FromContext(ctx), - } - - stats := make(map[uint64]int64) - err := exec.Transact(ctx, "fetch region approximate sizes", func(ctx context.Context, tx *sql.Tx) error { - rows, err := tx.QueryContext(ctx, "SELECT REGION_ID, APPROXIMATE_SIZE FROM information_schema.TIKV_REGION_STATUS WHERE TABLE_ID = ?", tableID) - if err != nil { - return errors.Trace(err) - } - //nolint: errcheck - defer rows.Close() - var ( - regionID uint64 - size int64 - ) - for rows.Next() { - if err = rows.Scan(®ionID, &size); err != nil { - return errors.Trace(err) - } - stats[regionID] = size * units.MiB - } - return rows.Err() - }) - return stats, errors.Trace(err) -} - func (local *local) BatchSplitRegions( ctx context.Context, region *split.RegionInfo, diff --git a/br/pkg/lightning/backend/local/localhelper_test.go b/br/pkg/lightning/backend/local/localhelper_test.go index 9db20baa220fd..95f495518711e 100644 --- a/br/pkg/lightning/backend/local/localhelper_test.go +++ b/br/pkg/lightning/backend/local/localhelper_test.go @@ -28,11 +28,9 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/pingcap/tidb/br/pkg/lightning/glue" "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/store/pdtypes" "github.com/pingcap/tidb/tablecodec" @@ -458,9 +456,9 @@ func doTestBatchSplitRegionByRanges(ctx context.Context, t *testing.T, hook clie keys := [][]byte{[]byte(""), []byte("aay"), []byte("bba"), []byte("bbh"), []byte("cca"), []byte("")} client := initTestSplitClient(keys, hook) local := &local{ - splitCli: client, - g: glue.NewExternalTiDBGlue(nil, mysql.ModeNone), - logger: log.L(), + splitCli: client, + regionSizeGetter: &TableRegionSizeGetterImpl{}, + logger: log.L(), } // current region ranges: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, ) @@ -632,9 +630,9 @@ func TestSplitAndScatterRegionInBatches(t *testing.T) { keys := [][]byte{[]byte(""), []byte("a"), []byte("b"), []byte("")} client := initTestSplitClient(keys, nil) local := &local{ - splitCli: client, - g: glue.NewExternalTiDBGlue(nil, mysql.ModeNone), - logger: log.L(), + splitCli: client, + regionSizeGetter: &TableRegionSizeGetterImpl{}, + logger: log.L(), } ctx, cancel := context.WithCancel(context.Background()) @@ -719,9 +717,9 @@ func doTestBatchSplitByRangesWithClusteredIndex(t *testing.T, hook clientHook) { keys = append(keys, tableEndKey, []byte("")) client := initTestSplitClient(keys, hook) local := &local{ - splitCli: client, - g: glue.NewExternalTiDBGlue(nil, mysql.ModeNone), - logger: log.L(), + splitCli: client, + regionSizeGetter: &TableRegionSizeGetterImpl{}, + logger: log.L(), } ctx := context.Background() diff --git a/br/pkg/lightning/backend/tidb/tidb.go b/br/pkg/lightning/backend/tidb/tidb.go index 31e089d1baf44..d833e3fb619d6 100644 --- a/br/pkg/lightning/backend/tidb/tidb.go +++ b/br/pkg/lightning/backend/tidb/tidb.go @@ -104,7 +104,7 @@ func NewEncodingBuilder() encode.EncodingBuilder { // NewEncoder creates a KV encoder. // It implements the `backend.EncodingBuilder` interface. func (b *encodingBuilder) NewEncoder(ctx context.Context, config *encode.EncodingConfig) (encode.Encoder, error) { - se := kv.NewSession(&config.SessionOptions, log.FromContext(ctx)) + se := kv.NewSessionCtx(&config.SessionOptions, log.FromContext(ctx)) if config.SQLMode.HasStrictMode() { se.GetSessionVars().SkipUTF8Check = false se.GetSessionVars().SkipASCIICheck = false @@ -251,11 +251,9 @@ func (b *targetInfoGetter) CheckRequirements(ctx context.Context, _ *backend.Che } type tidbBackend struct { - db *sql.DB - onDuplicate string - errorMgr *errormanager.ErrorManager - encBuilder encode.EncodingBuilder - targetInfoGetter backend.TargetInfoGetter + db *sql.DB + onDuplicate string + errorMgr *errormanager.ErrorManager } // NewTiDBBackend creates a new TiDB backend using the given database. @@ -270,11 +268,9 @@ func NewTiDBBackend(ctx context.Context, db *sql.DB, onDuplicate string, errorMg onDuplicate = config.ReplaceOnDup } return backend.MakeBackend(&tidbBackend{ - db: db, - onDuplicate: onDuplicate, - errorMgr: errorMgr, - encBuilder: NewEncodingBuilder(), - targetInfoGetter: NewTargetInfoGetter(db), + db: db, + onDuplicate: onDuplicate, + errorMgr: errorMgr, }) } @@ -543,10 +539,6 @@ func (be *tidbBackend) Close() { // TidbManager, so we let the manager to close it. } -func (be *tidbBackend) MakeEmptyRows() encode.Rows { - return be.encBuilder.MakeEmptyRows() -} - func (be *tidbBackend) RetryImportDelay() time.Duration { return 0 } @@ -562,14 +554,6 @@ func (be *tidbBackend) ShouldPostProcess() bool { return true } -func (be *tidbBackend) CheckRequirements(ctx context.Context, _ *backend.CheckCtx) error { - return be.targetInfoGetter.CheckRequirements(ctx, nil) -} - -func (be *tidbBackend) NewEncoder(ctx context.Context, config *encode.EncodingConfig) (encode.Encoder, error) { - return be.encBuilder.NewEncoder(ctx, config) -} - func (be *tidbBackend) OpenEngine(context.Context, *backend.EngineConfig, uuid.UUID) error { return nil } @@ -752,10 +736,6 @@ func (be *tidbBackend) execStmts(ctx context.Context, stmtTasks []stmtTask, tabl return nil } -func (be *tidbBackend) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) { - return be.targetInfoGetter.FetchRemoteTableModels(ctx, schemaName) -} - func (be *tidbBackend) EngineFileSizes() []backend.EngineFileSize { return nil } diff --git a/br/pkg/lightning/backend/tidb/tidb_test.go b/br/pkg/lightning/backend/tidb/tidb_test.go index 5bbd7c30c3615..828920a9323d1 100644 --- a/br/pkg/lightning/backend/tidb/tidb_test.go +++ b/br/pkg/lightning/backend/tidb/tidb_test.go @@ -42,10 +42,11 @@ import ( ) type mysqlSuite struct { - dbHandle *sql.DB - mockDB sqlmock.Sqlmock - backend backend.Backend - tbl table.Table + dbHandle *sql.DB + mockDB sqlmock.Sqlmock + backend backend.Backend + encBuilder encode.EncodingBuilder + tbl table.Table } func createMysqlSuite(t *testing.T) *mysqlSuite { @@ -65,7 +66,13 @@ func createMysqlSuite(t *testing.T) *mysqlSuite { tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(0), tblInfo) require.NoError(t, err) backend := tidb.NewTiDBBackend(context.Background(), db, config.ReplaceOnDup, errormanager.New(nil, config.NewConfig(), log.L())) - return &mysqlSuite{dbHandle: db, mockDB: mock, backend: backend, tbl: tbl} + return &mysqlSuite{ + dbHandle: db, + mockDB: mock, + backend: backend, + encBuilder: tidb.NewEncodingBuilder(), + tbl: tbl, + } } func (s *mysqlSuite) TearDownTest(t *testing.T) { @@ -86,9 +93,9 @@ func TestWriteRowsReplaceOnDup(t *testing.T) { engine, err := s.backend.OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) require.NoError(t, err) - dataRows := s.backend.MakeEmptyRows() + dataRows := s.encBuilder.MakeEmptyRows() dataChecksum := verification.MakeKVChecksum(0, 0, 0) - indexRows := s.backend.MakeEmptyRows() + indexRows := s.encBuilder.MakeEmptyRows() indexChecksum := verification.MakeKVChecksum(0, 0, 0) cols := s.tbl.Cols() @@ -100,7 +107,7 @@ func TestWriteRowsReplaceOnDup(t *testing.T) { // skip column a,c due to ignore-columns perms[0] = -1 perms[2] = -1 - encoder, err := s.backend.NewEncoder(context.Background(), &encode.EncodingConfig{ + encoder, err := s.encBuilder.NewEncoder(context.Background(), &encode.EncodingConfig{ SessionOptions: encode.SessionOptions{SQLMode: 0, Timestamp: 1234567890}, Table: s.tbl, Logger: logger, @@ -145,16 +152,17 @@ func TestWriteRowsIgnoreOnDup(t *testing.T) { ctx := context.Background() logger := log.L() + encBuilder := tidb.NewEncodingBuilder() ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, config.IgnoreOnDup, errormanager.New(nil, config.NewConfig(), logger)) engine, err := ignoreBackend.OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) require.NoError(t, err) - dataRows := ignoreBackend.MakeEmptyRows() + dataRows := encBuilder.MakeEmptyRows() dataChecksum := verification.MakeKVChecksum(0, 0, 0) - indexRows := ignoreBackend.MakeEmptyRows() + indexRows := encBuilder.MakeEmptyRows() indexChecksum := verification.MakeKVChecksum(0, 0, 0) - encoder, err := ignoreBackend.NewEncoder(ctx, &encode.EncodingConfig{Table: s.tbl, Logger: logger}) + encoder, err := encBuilder.NewEncoder(ctx, &encode.EncodingConfig{Table: s.tbl, Logger: logger}) require.NoError(t, err) row, err := encoder.Encode([]types.Datum{ types.NewIntDatum(1), @@ -170,7 +178,7 @@ func TestWriteRowsIgnoreOnDup(t *testing.T) { require.NoError(t, err) // test encode rows with _tidb_rowid - encoder, err = ignoreBackend.NewEncoder(ctx, &encode.EncodingConfig{Table: s.tbl, Logger: logger}) + encoder, err = encBuilder.NewEncoder(ctx, &encode.EncodingConfig{Table: s.tbl, Logger: logger}) require.NoError(t, err) rowWithID, err := encoder.Encode([]types.Datum{ types.NewIntDatum(1), @@ -191,16 +199,17 @@ func TestWriteRowsErrorOnDup(t *testing.T) { ctx := context.Background() logger := log.L() + encBuilder := tidb.NewEncodingBuilder() ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, config.ErrorOnDup, errormanager.New(nil, config.NewConfig(), logger)) engine, err := ignoreBackend.OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) require.NoError(t, err) - dataRows := ignoreBackend.MakeEmptyRows() + dataRows := encBuilder.MakeEmptyRows() dataChecksum := verification.MakeKVChecksum(0, 0, 0) - indexRows := ignoreBackend.MakeEmptyRows() + indexRows := encBuilder.MakeEmptyRows() indexChecksum := verification.MakeKVChecksum(0, 0, 0) - encoder, err := ignoreBackend.NewEncoder(ctx, &encode.EncodingConfig{Table: s.tbl, Logger: logger}) + encoder, err := encBuilder.NewEncoder(ctx, &encode.EncodingConfig{Table: s.tbl, Logger: logger}) require.NoError(t, err) row, err := encoder.Encode([]types.Datum{ types.NewIntDatum(1), @@ -236,9 +245,9 @@ func testStrictMode(t *testing.T) { ctx := context.Background() - bk := tidb.NewTiDBBackend(ctx, s.dbHandle, config.ErrorOnDup, errormanager.New(nil, config.NewConfig(), log.L())) + encBuilder := tidb.NewEncodingBuilder() logger := log.L() - encoder, err := bk.NewEncoder(ctx, &encode.EncodingConfig{ + encoder, err := encBuilder.NewEncoder(ctx, &encode.EncodingConfig{ SessionOptions: encode.SessionOptions{SQLMode: mysql.ModeStrictAllTables}, Table: tbl, Logger: log.L(), @@ -257,7 +266,7 @@ func testStrictMode(t *testing.T) { require.Regexp(t, `incorrect utf8 value .* for column s0$`, err.Error()) // oepn a new encode because column count changed. - encoder, err = bk.NewEncoder(ctx, &encode.EncodingConfig{ + encoder, err = encBuilder.NewEncoder(ctx, &encode.EncodingConfig{ SessionOptions: encode.SessionOptions{SQLMode: mysql.ModeStrictAllTables}, Table: tbl, Logger: logger, @@ -283,8 +292,8 @@ func TestFetchRemoteTableModels_3_x(t *testing.T) { AddRow("t", "id", "int(10)", "", "auto_increment")) s.mockDB.ExpectCommit() - bk := tidb.NewTiDBBackend(context.Background(), s.dbHandle, config.ErrorOnDup, errormanager.New(nil, config.NewConfig(), log.L())) - tableInfos, err := bk.FetchRemoteTableModels(context.Background(), "test") + targetInfoGetter := tidb.NewTargetInfoGetter(s.dbHandle) + tableInfos, err := targetInfoGetter.FetchRemoteTableModels(context.Background(), "test") require.NoError(t, err) ft := types.FieldType{} ft.SetFlag(mysql.AutoIncrementFlag) @@ -320,8 +329,8 @@ func TestFetchRemoteTableModels_4_0(t *testing.T) { AddRow("test", "t", "id", int64(1))) s.mockDB.ExpectCommit() - bk := tidb.NewTiDBBackend(context.Background(), s.dbHandle, config.ErrorOnDup, errormanager.New(nil, config.NewConfig(), log.L())) - tableInfos, err := bk.FetchRemoteTableModels(context.Background(), "test") + targetInfoGetter := tidb.NewTargetInfoGetter(s.dbHandle) + tableInfos, err := targetInfoGetter.FetchRemoteTableModels(context.Background(), "test") require.NoError(t, err) ft := types.FieldType{} ft.SetFlag(mysql.AutoIncrementFlag | mysql.UnsignedFlag) @@ -357,8 +366,8 @@ func TestFetchRemoteTableModels_4_x_auto_increment(t *testing.T) { AddRow("test", "t", "id", int64(1), "AUTO_INCREMENT")) s.mockDB.ExpectCommit() - bk := tidb.NewTiDBBackend(context.Background(), s.dbHandle, config.ErrorOnDup, errormanager.New(nil, config.NewConfig(), log.L())) - tableInfos, err := bk.FetchRemoteTableModels(context.Background(), "test") + targetInfoGetter := tidb.NewTargetInfoGetter(s.dbHandle) + tableInfos, err := targetInfoGetter.FetchRemoteTableModels(context.Background(), "test") require.NoError(t, err) ft := types.FieldType{} ft.SetFlag(mysql.AutoIncrementFlag) @@ -394,8 +403,8 @@ func TestFetchRemoteTableModels_4_x_auto_random(t *testing.T) { AddRow("test", "t", "id", int64(1), "AUTO_RANDOM")) s.mockDB.ExpectCommit() - bk := tidb.NewTiDBBackend(context.Background(), s.dbHandle, config.ErrorOnDup, errormanager.New(nil, config.NewConfig(), log.L())) - tableInfos, err := bk.FetchRemoteTableModels(context.Background(), "test") + targetInfoGetter := tidb.NewTargetInfoGetter(s.dbHandle) + tableInfos, err := targetInfoGetter.FetchRemoteTableModels(context.Background(), "test") require.NoError(t, err) ft := types.FieldType{} ft.SetFlag(mysql.PriKeyFlag) @@ -432,7 +441,8 @@ func TestWriteRowsErrorNoRetry(t *testing.T) { ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, config.ErrorOnDup, errormanager.New(s.dbHandle, &config.Config{}, log.L()), ) - dataRows := encodeRowsTiDB(t, ignoreBackend, s.tbl) + encBuilder := tidb.NewEncodingBuilder() + dataRows := encodeRowsTiDB(t, encBuilder, ignoreBackend, s.tbl) ctx := context.Background() engine, err := ignoreBackend.OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) require.NoError(t, err) @@ -499,7 +509,8 @@ func TestWriteRowsErrorDowngradingAll(t *testing.T) { }, }, log.L()), ) - dataRows := encodeRowsTiDB(t, ignoreBackend, s.tbl) + encBuilder := tidb.NewEncodingBuilder() + dataRows := encodeRowsTiDB(t, encBuilder, ignoreBackend, s.tbl) ctx := context.Background() engine, err := ignoreBackend.OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) require.NoError(t, err) @@ -554,7 +565,8 @@ func TestWriteRowsErrorDowngradingExceedThreshold(t *testing.T) { }, }, log.L()), ) - dataRows := encodeRowsTiDB(t, ignoreBackend, s.tbl) + encBuilder := tidb.NewEncodingBuilder() + dataRows := encodeRowsTiDB(t, encBuilder, ignoreBackend, s.tbl) ctx := context.Background() engine, err := ignoreBackend.OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) require.NoError(t, err) @@ -567,10 +579,10 @@ func TestWriteRowsErrorDowngradingExceedThreshold(t *testing.T) { require.Nil(t, st) } -func encodeRowsTiDB(t *testing.T, b backend.Backend, tbl table.Table) encode.Rows { - dataRows := b.MakeEmptyRows() +func encodeRowsTiDB(t *testing.T, encBuilder encode.EncodingBuilder, b backend.Backend, tbl table.Table) encode.Rows { + dataRows := encBuilder.MakeEmptyRows() dataChecksum := verification.MakeKVChecksum(0, 0, 0) - indexRows := b.MakeEmptyRows() + indexRows := encBuilder.MakeEmptyRows() indexChecksum := verification.MakeKVChecksum(0, 0, 0) rowCases := []struct { @@ -613,7 +625,7 @@ func encodeRowsTiDB(t *testing.T, b backend.Backend, tbl table.Table) encode.Row }, } for _, rc := range rowCases { - encoder, err := b.NewEncoder(context.Background(), &encode.EncodingConfig{ + encoder, err := encBuilder.NewEncoder(context.Background(), &encode.EncodingConfig{ Path: rc.path, Table: tbl, Logger: log.L(), @@ -628,7 +640,7 @@ func encodeRowsTiDB(t *testing.T, b backend.Backend, tbl table.Table) encode.Row for i := 0; i < 15; i++ { rawRow = append(rawRow, types.NewIntDatum(0)) } - encoder, err := b.NewEncoder(context.Background(), &encode.EncodingConfig{ + encoder, err := encBuilder.NewEncoder(context.Background(), &encode.EncodingConfig{ Path: "12.csv", Table: tbl, Logger: log.L(), diff --git a/br/pkg/lightning/common/common.go b/br/pkg/lightning/common/common.go index 493d90c352793..f0c6f90befa5e 100644 --- a/br/pkg/lightning/common/common.go +++ b/br/pkg/lightning/common/common.go @@ -27,6 +27,26 @@ const ( IndexEngineID = -1 ) +// DefaultImportantVariables is used in ObtainImportantVariables to retrieve the system +// variables from downstream which may affect KV encode result. The values record the default +// values if missing. +var DefaultImportantVariables = map[string]string{ + "max_allowed_packet": "67108864", + "div_precision_increment": "4", + "time_zone": "SYSTEM", + "lc_time_names": "en_US", + "default_week_format": "0", + "block_encryption_mode": "aes-128-ecb", + "group_concat_max_len": "1024", +} + +// DefaultImportVariablesTiDB is used in ObtainImportantVariables to retrieve the system +// variables from downstream in local/importer backend. The values record the default +// values if missing. +var DefaultImportVariablesTiDB = map[string]string{ + "tidb_row_format_version": "1", +} + func AllocGlobalAutoID(ctx context.Context, n int64, store kv.Storage, dbID int64, tblInfo *model.TableInfo) (autoIDBase, autoIDMax int64, err error) { alloc, err := getGlobalAutoIDAlloc(store, dbID, tblInfo) if err != nil { diff --git a/br/pkg/lightning/importer/chunk_process.go b/br/pkg/lightning/importer/chunk_process.go index 7d22b6473b130..63c7a81d5ac10 100644 --- a/br/pkg/lightning/importer/chunk_process.go +++ b/br/pkg/lightning/importer/chunk_process.go @@ -151,7 +151,7 @@ func (cr *chunkProcessor) process( zap.Stringer("path", &cr.chunk.Key), ) // Create the encoder. - kvEncoder, err := rc.backend.NewEncoder(ctx, &encode.EncodingConfig{ + kvEncoder, err := rc.encBuilder.NewEncoder(ctx, &encode.EncodingConfig{ SessionOptions: encode.SessionOptions{ SQLMode: rc.cfg.TiDB.SQLMode, Timestamp: cr.chunk.Timestamp, @@ -400,8 +400,8 @@ func (cr *chunkProcessor) deliverLoop( zap.String("task", "deliver"), ) // Fetch enough KV pairs from the source. - dataKVs := rc.backend.MakeEmptyRows() - indexKVs := rc.backend.MakeEmptyRows() + dataKVs := rc.encBuilder.MakeEmptyRows() + indexKVs := rc.encBuilder.MakeEmptyRows() dataSynced := true hasMoreKVs := true diff --git a/br/pkg/lightning/importer/chunk_process_test.go b/br/pkg/lightning/importer/chunk_process_test.go index dbb75c97398c5..b5b8a16543b71 100644 --- a/br/pkg/lightning/importer/chunk_process_test.go +++ b/br/pkg/lightning/importer/chunk_process_test.go @@ -97,9 +97,10 @@ func (s *chunkRestoreSuite) TestDeliverLoopCancel() { controller := gomock.NewController(s.T()) defer controller.Finish() mockBackend := mock.NewMockBackend(controller) - mockBackend.EXPECT().MakeEmptyRows().Return(kv.MakeRowsFromKvPairs(nil)).AnyTimes() + mockEncBuilder := mock.NewMockEncodingBuilder(controller) + mockEncBuilder.EXPECT().MakeEmptyRows().Return(kv.MakeRowsFromKvPairs(nil)).AnyTimes() - rc := &Controller{backend: backend.MakeBackend(mockBackend)} + rc := &Controller{backend: backend.MakeBackend(mockBackend), encBuilder: mockEncBuilder} ctx, cancel := context.WithCancel(context.Background()) kvsCh := make(chan []deliveredKVs) go cancel() @@ -115,10 +116,11 @@ func (s *chunkRestoreSuite) TestDeliverLoopEmptyData() { controller := gomock.NewController(s.T()) defer controller.Finish() mockBackend := mock.NewMockBackend(controller) + mockEncBuilder := mock.NewMockEncodingBuilder(controller) importer := backend.MakeBackend(mockBackend) mockBackend.EXPECT().OpenEngine(ctx, gomock.Any(), gomock.Any()).Return(nil).Times(2) - mockBackend.EXPECT().MakeEmptyRows().Return(kv.MakeRowsFromKvPairs(nil)).AnyTimes() + mockEncBuilder.EXPECT().MakeEmptyRows().Return(kv.MakeRowsFromKvPairs(nil)).AnyTimes() mockWriter := mock.NewMockEngineWriter(controller) mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), gomock.Any()).Return(mockWriter, nil).AnyTimes() mockWriter.EXPECT(). @@ -139,7 +141,7 @@ func (s *chunkRestoreSuite) TestDeliverLoopEmptyData() { cfg := &config.Config{} saveCpCh := make(chan saveCp, 16) - rc := &Controller{cfg: cfg, backend: importer, saveCpCh: saveCpCh} + rc := &Controller{cfg: cfg, backend: importer, saveCpCh: saveCpCh, encBuilder: mockEncBuilder} var wg sync.WaitGroup wg.Add(1) @@ -171,11 +173,12 @@ func (s *chunkRestoreSuite) TestDeliverLoop() { defer controller.Finish() mockBackend := mock.NewMockBackend(controller) importer := backend.MakeBackend(mockBackend) + mockEncBuilder := mock.NewMockEncodingBuilder(controller) mockBackend.EXPECT().OpenEngine(ctx, gomock.Any(), gomock.Any()).Return(nil).Times(2) // avoid return the same object at each call - mockBackend.EXPECT().MakeEmptyRows().Return(kv.MakeRowsFromKvPairs(nil)).Times(1) - mockBackend.EXPECT().MakeEmptyRows().Return(kv.MakeRowsFromKvPairs(nil)).Times(1) + mockEncBuilder.EXPECT().MakeEmptyRows().Return(kv.MakeRowsFromKvPairs(nil)).Times(1) + mockEncBuilder.EXPECT().MakeEmptyRows().Return(kv.MakeRowsFromKvPairs(nil)).Times(1) mockWriter := mock.NewMockEngineWriter(controller) mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), gomock.Any()).Return(mockWriter, nil).AnyTimes() mockWriter.EXPECT().IsSynced().Return(true).AnyTimes() @@ -248,7 +251,7 @@ func (s *chunkRestoreSuite) TestDeliverLoop() { }() cfg := &config.Config{} - rc := &Controller{cfg: cfg, saveCpCh: saveCpCh, backend: importer} + rc := &Controller{cfg: cfg, saveCpCh: saveCpCh, backend: importer, encBuilder: mockEncBuilder} _, err = s.cr.deliverLoop(ctx, kvsCh, s.tr, 0, dataWriter, indexWriter, rc) require.NoError(s.T(), err) @@ -513,7 +516,8 @@ func (s *chunkRestoreSuite) TestEncodeLoopColumnsMismatch() { kvsCh := make(chan []deliveredKVs, 2) deliverCompleteCh := make(chan deliverResult) - kvEncoder, err := tidb.NewTiDBBackend(ctx, nil, config.ReplaceOnDup, errorMgr).NewEncoder( + encodingBuilder := tidb.NewEncodingBuilder() + kvEncoder, err := encodingBuilder.NewEncoder( ctx, &encode.EncodingConfig{ SessionOptions: encode.SessionOptions{ @@ -611,12 +615,8 @@ func (s *chunkRestoreSuite) testEncodeLoopIgnoreColumnsCSV( kvsCh := make(chan []deliveredKVs, 2) deliverCompleteCh := make(chan deliverResult) - kvEncoder, err := tidb.NewTiDBBackend( - ctx, - nil, - config.ReplaceOnDup, - errormanager.New(nil, config.NewConfig(), log.L()), - ).NewEncoder( + encodingBuilder := tidb.NewEncodingBuilder() + kvEncoder, err := encodingBuilder.NewEncoder( ctx, &encode.EncodingConfig{ SessionOptions: encode.SessionOptions{ @@ -660,14 +660,15 @@ func (s *chunkRestoreSuite) TestRestore() { defer controller.Finish() mockBackend := mock.NewMockBackend(controller) importer := backend.MakeBackend(mockBackend) + mockEncBuilder := mock.NewMockEncodingBuilder(controller) mockBackend.EXPECT().OpenEngine(ctx, gomock.Any(), gomock.Any()).Return(nil).Times(2) // avoid return the same object at each call - mockBackend.EXPECT().MakeEmptyRows().Return(kv.MakeRowsFromKvPairs(nil)).Times(1) - mockBackend.EXPECT().MakeEmptyRows().Return(kv.MakeRowsFromKvPairs(nil)).Times(1) + mockEncBuilder.EXPECT().MakeEmptyRows().Return(kv.MakeRowsFromKvPairs(nil)).Times(1) + mockEncBuilder.EXPECT().MakeEmptyRows().Return(kv.MakeRowsFromKvPairs(nil)).Times(1) mockWriter := mock.NewMockEngineWriter(controller) mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), gomock.Any()).Return(mockWriter, nil).AnyTimes() - mockBackend.EXPECT().NewEncoder(gomock.Any(), gomock.Any()).Return(mockEncoder{}, nil).Times(1) + mockEncBuilder.EXPECT().NewEncoder(gomock.Any(), gomock.Any()).Return(mockEncoder{}, nil).Times(1) mockWriter.EXPECT().IsSynced().Return(true).AnyTimes() mockWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() @@ -683,10 +684,11 @@ func (s *chunkRestoreSuite) TestRestore() { saveCpCh := make(chan saveCp, 16) err = s.cr.process(ctx, s.tr, 0, dataWriter, indexWriter, &Controller{ - cfg: s.cfg, - saveCpCh: saveCpCh, - backend: importer, - pauser: DeliverPauser, + cfg: s.cfg, + saveCpCh: saveCpCh, + backend: importer, + pauser: DeliverPauser, + encBuilder: mockEncBuilder, }) require.NoError(s.T(), err) require.Len(s.T(), saveCpCh, 2) diff --git a/br/pkg/lightning/importer/get_pre_info_test.go b/br/pkg/lightning/importer/get_pre_info_test.go index f9bb1831f97d4..05c018c5f876b 100644 --- a/br/pkg/lightning/importer/get_pre_info_test.go +++ b/br/pkg/lightning/importer/get_pre_info_test.go @@ -25,6 +25,7 @@ import ( "github.com/DATA-DOG/go-sqlmock" mysql_sql_driver "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/br/pkg/lightning/importer/mock" ropts "github.com/pingcap/tidb/br/pkg/lightning/importer/opts" @@ -604,7 +605,7 @@ func TestGetPreInfoSampleSource(t *testing.T) { } for _, subTest := range subTests { require.NoError(t, mockSrc.GetStorage().WriteFile(ctx, dataFileName, subTest.Data)) - sampledIndexRatio, isRowOrderedFromSample, err := ig.sampleDataFromTable(ctx, "db01", mdTblMeta, dbInfos["db01"].Tables["tbl01"].Core, nil, defaultImportantVariables) + sampledIndexRatio, isRowOrderedFromSample, err := ig.sampleDataFromTable(ctx, "db01", mdTblMeta, dbInfos["db01"].Tables["tbl01"].Core, nil, common.DefaultImportantVariables) require.NoError(t, err) t.Logf("%v, %v", sampledIndexRatio, isRowOrderedFromSample) require.Greater(t, sampledIndexRatio, 1.0) @@ -698,7 +699,7 @@ func TestGetPreInfoSampleSourceCompressed(t *testing.T) { } for _, subTest := range subTests { require.NoError(t, mockSrc.GetStorage().WriteFile(ctx, dataFileName, subTest.Data)) - sampledIndexRatio, isRowOrderedFromSample, err := ig.sampleDataFromTable(ctx, "db01", mdTblMeta, dbInfos["db01"].Tables["tbl01"].Core, nil, defaultImportantVariables) + sampledIndexRatio, isRowOrderedFromSample, err := ig.sampleDataFromTable(ctx, "db01", mdTblMeta, dbInfos["db01"].Tables["tbl01"].Core, nil, common.DefaultImportantVariables) require.NoError(t, err) t.Logf("%v, %v", sampledIndexRatio, isRowOrderedFromSample) require.Greater(t, sampledIndexRatio, 1.0) diff --git a/br/pkg/lightning/importer/import.go b/br/pkg/lightning/importer/import.go index 86c795107b566..55a7b815d098d 100644 --- a/br/pkg/lightning/importer/import.go +++ b/br/pkg/lightning/importer/import.go @@ -230,6 +230,7 @@ type Controller struct { preInfoGetter PreImportInfoGetter precheckItemBuilder *PrecheckItemBuilder + encBuilder encode.EncodingBuilder keyspaceName string } @@ -329,10 +330,12 @@ func NewImportControllerWithPauser( return nil, common.ErrInitErrManager.Wrap(err).GenWithStackByArgs() } - var backend backend.Backend + var encodingBuilder encode.EncodingBuilder + var backendObj backend.Backend switch cfg.TikvImporter.Backend { case config.BackendTiDB: - backend = tidb.NewTiDBBackend(ctx, db, cfg.TikvImporter.OnDuplicate, errorMgr) + encodingBuilder = tidb.NewEncodingBuilder() + backendObj = tidb.NewTiDBBackend(ctx, db, cfg.TikvImporter.OnDuplicate, errorMgr) case config.BackendLocal: var rLimit local.Rlim_t rLimit, err = local.GetSystemRLimit() @@ -356,8 +359,11 @@ func NewImportControllerWithPauser( } } - encodingBuilder := local.NewEncodingBuilder(ctx) - backend, err = local.NewLocalBackend(ctx, tls, cfg, p.Glue, maxOpenFiles, errorMgr, p.KeyspaceName, encodingBuilder) + encodingBuilder = local.NewEncodingBuilder(ctx) + regionSizeGetter := &local.TableRegionSizeGetterImpl{ + DB: db, + } + backendObj, err = local.NewLocalBackend(ctx, tls, cfg, regionSizeGetter, maxOpenFiles, errorMgr, p.KeyspaceName) if err != nil { return nil, common.NormalizeOrWrapErr(common.ErrUnknown, err) } @@ -387,12 +393,19 @@ func NewImportControllerWithPauser( default: metaBuilder = noopMetaMgrBuilder{} } + + var wrapper backend.TargetInfoGetter + if cfg.TikvImporter.Backend == config.BackendLocal { + wrapper = local.NewTargetInfoGetter(tls, p.Glue, cfg.TiDB.PdAddr) + } else { + wrapper = tidb.NewTargetInfoGetter(db) + } ioWorkers := worker.NewPool(ctx, cfg.App.IOConcurrency, "io") targetInfoGetter := &TargetInfoGetterImpl{ cfg: cfg, targetDBGlue: p.Glue, tls: tls, - backend: backend, + backend: wrapper, } preInfoGetter, err := NewPreImportInfoGetter( cfg, @@ -400,7 +413,7 @@ func NewImportControllerWithPauser( p.DumpFileStorage, targetInfoGetter, ioWorkers, - backend, + encodingBuilder, ) if err != nil { return nil, errors.Trace(err) @@ -420,9 +433,9 @@ func NewImportControllerWithPauser( ioWorkers: ioWorkers, checksumWorks: worker.NewPool(ctx, cfg.TiDB.ChecksumTableConcurrency, "checksum"), pauser: p.Pauser, - backend: backend, + backend: backendObj, tidbGlue: p.Glue, - sysVars: defaultImportantVariables, + sysVars: common.DefaultImportantVariables, tls: tls, checkTemplate: NewSimpleTemplate(), @@ -444,6 +457,7 @@ func NewImportControllerWithPauser( preInfoGetter: preInfoGetter, precheckItemBuilder: preCheckBuilder, + encBuilder: encodingBuilder, keyspaceName: p.KeyspaceName, } diff --git a/br/pkg/lightning/importer/precheck_impl.go b/br/pkg/lightning/importer/precheck_impl.go index 4879a96a46e84..053507ef7a997 100644 --- a/br/pkg/lightning/importer/precheck_impl.go +++ b/br/pkg/lightning/importer/precheck_impl.go @@ -1269,7 +1269,7 @@ func checkFieldCompatibility( values []types.Datum, logger log.Logger, ) bool { - se := kv.NewSession(&encode.SessionOptions{ + se := kv.NewSessionCtx(&encode.SessionOptions{ SQLMode: mysql.ModeStrictTransTables, }, logger) for i, col := range tbl.Columns { diff --git a/br/pkg/lightning/importer/restore_schema_test.go b/br/pkg/lightning/importer/restore_schema_test.go index bb121328a0895..7a36674a6a4bf 100644 --- a/br/pkg/lightning/importer/restore_schema_test.go +++ b/br/pkg/lightning/importer/restore_schema_test.go @@ -133,15 +133,16 @@ func (s *restoreSchemaSuite) SetupSuite() { //nolint:interfacer // change test case signature might cause Check failed to find this test case? func (s *restoreSchemaSuite) SetupTest() { s.controller, s.ctx = gomock.WithContext(context.Background(), s.T()) + mockTargetInfoGetter := mock.NewMockTargetInfoGetter(s.controller) mockBackend := mock.NewMockBackend(s.controller) - mockBackend.EXPECT(). + mockTargetInfoGetter.EXPECT(). FetchRemoteTableModels(gomock.Any(), gomock.Any()). AnyTimes(). Return(s.tableInfos, nil) mockBackend.EXPECT().Close() theBackend := backend.MakeBackend(mockBackend) s.rc.backend = theBackend - s.targetInfoGetter.backend = theBackend + s.targetInfoGetter.backend = mockTargetInfoGetter mockDB, sqlMock, err := sqlmock.New() require.NoError(s.T(), err) diff --git a/br/pkg/lightning/importer/table_import_test.go b/br/pkg/lightning/importer/table_import_test.go index 1052bf9853e18..2cebb4e5b0b86 100644 --- a/br/pkg/lightning/importer/table_import_test.go +++ b/br/pkg/lightning/importer/table_import_test.go @@ -376,6 +376,7 @@ func (s *tableRestoreSuite) TestRestoreEngineFailed() { ctx := context.Background() ctrl := gomock.NewController(s.T()) mockBackend := mock.NewMockBackend(ctrl) + mockEncBuilder := mock.NewMockEncodingBuilder(ctrl) rc := &Controller{ cfg: s.cfg, pauser: DeliverPauser, @@ -385,6 +386,7 @@ func (s *tableRestoreSuite) TestRestoreEngineFailed() { backend: backend.MakeBackend(mockBackend), errorSummaries: makeErrorSummaries(log.L()), saveCpCh: make(chan saveCp, 1), + encBuilder: mockEncBuilder, } defer close(rc.saveCpCh) go func() { @@ -403,14 +405,14 @@ func (s *tableRestoreSuite) TestRestoreEngineFailed() { require.NoError(s.T(), err) _, indexUUID := backend.MakeUUID("`db`.`table`", -1) _, dataUUID := backend.MakeUUID("`db`.`table`", 0) - realBackend := tidb.NewTiDBBackend(ctx, nil, "replace", nil) + realEncBuilder := tidb.NewEncodingBuilder() mockBackend.EXPECT().OpenEngine(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) mockBackend.EXPECT().OpenEngine(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) mockBackend.EXPECT().CloseEngine(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - mockBackend.EXPECT().NewEncoder(gomock.Any(), gomock.Any()). - Return(realBackend.NewEncoder(ctx, &encode.EncodingConfig{Table: tbl})). + mockEncBuilder.EXPECT().NewEncoder(gomock.Any(), gomock.Any()). + Return(realEncBuilder.NewEncoder(ctx, &encode.EncodingConfig{Table: tbl})). AnyTimes() - mockBackend.EXPECT().MakeEmptyRows().Return(realBackend.MakeEmptyRows()).AnyTimes() + mockEncBuilder.EXPECT().MakeEmptyRows().Return(realEncBuilder.MakeEmptyRows()).AnyTimes() mockBackend.EXPECT().LocalWriter(gomock.Any(), gomock.Any(), dataUUID).Return(noop.Writer{}, nil) mockBackend.EXPECT().LocalWriter(gomock.Any(), gomock.Any(), indexUUID). Return(nil, errors.New("mock open index local writer failed")) @@ -986,6 +988,7 @@ func (s *tableRestoreSuite) TestTableRestoreMetrics() { errorMgr: errormanager.New(nil, cfg, log.L()), taskMgr: noopTaskMetaMgr{}, preInfoGetter: preInfoGetter, + encBuilder: tidb.NewEncodingBuilder(), } go func() { for scp := range chptCh { @@ -1420,13 +1423,13 @@ func (s *tableRestoreSuite) TestEstimate() { ctx := context.Background() controller := gomock.NewController(s.T()) defer controller.Finish() - mockBackend := mock.NewMockBackend(controller) + mockEncBuilder := mock.NewMockEncodingBuilder(controller) idAlloc := kv.NewPanickingAllocators(0) tbl, err := tables.TableFromMeta(idAlloc, s.tableInfo.Core) require.NoError(s.T(), err) - mockBackend.EXPECT().MakeEmptyRows().Return(kv.MakeRowsFromKvPairs(nil)).AnyTimes() - mockBackend.EXPECT().NewEncoder(gomock.Any(), gomock.Any()).Return(kv.NewTableKVEncoder(&encode.EncodingConfig{ + mockEncBuilder.EXPECT().MakeEmptyRows().Return(kv.MakeRowsFromKvPairs(nil)).AnyTimes() + mockEncBuilder.EXPECT().NewEncoder(gomock.Any(), gomock.Any()).Return(kv.NewTableKVEncoder(&encode.EncodingConfig{ Table: tbl, SessionOptions: encode.SessionOptions{ SQLMode: s.cfg.TiDB.SQLMode, @@ -1435,7 +1438,6 @@ func (s *tableRestoreSuite) TestEstimate() { }, Logger: log.L(), }, nil)).AnyTimes() - importer := backend.MakeBackend(mockBackend) dbMetas := []*mydump.MDDatabaseMeta{ { @@ -1452,7 +1454,7 @@ func (s *tableRestoreSuite) TestEstimate() { preInfoGetter := &PreImportInfoGetterImpl{ cfg: s.cfg, srcStorage: s.store, - encBuilder: importer, + encBuilder: mockEncBuilder, ioWorkers: ioWorkers, dbMetas: dbMetas, targetInfoGetter: mockTarget, diff --git a/br/pkg/lightning/importer/tidb.go b/br/pkg/lightning/importer/tidb.go index 3cf45340e8e35..97d5e1d63cff6 100644 --- a/br/pkg/lightning/importer/tidb.go +++ b/br/pkg/lightning/importer/tidb.go @@ -39,26 +39,6 @@ import ( "golang.org/x/exp/maps" ) -// defaultImportantVariables is used in ObtainImportantVariables to retrieve the system -// variables from downstream which may affect KV encode result. The values record the default -// values if missing. -var defaultImportantVariables = map[string]string{ - "max_allowed_packet": "67108864", - "div_precision_increment": "4", - "time_zone": "SYSTEM", - "lc_time_names": "en_US", - "default_week_format": "0", - "block_encryption_mode": "aes-128-ecb", - "group_concat_max_len": "1024", -} - -// defaultImportVariablesTiDB is used in ObtainImportantVariables to retrieve the system -// variables from downstream in local/importer backend. The values record the default -// values if missing. -var defaultImportVariablesTiDB = map[string]string{ - "tidb_row_format_version": "1", -} - type TiDBManager struct { db *sql.DB parser *parser.Parser @@ -275,7 +255,7 @@ func ObtainImportantVariables(ctx context.Context, g glue.SQLExecutor, needTiDBV var query strings.Builder query.WriteString("SHOW VARIABLES WHERE Variable_name IN ('") first := true - for k := range defaultImportantVariables { + for k := range common.DefaultImportantVariables { if first { first = false } else { @@ -284,7 +264,7 @@ func ObtainImportantVariables(ctx context.Context, g glue.SQLExecutor, needTiDBV query.WriteString(k) } if needTiDBVars { - for k := range defaultImportVariablesTiDB { + for k := range common.DefaultImportVariablesTiDB { query.WriteString("','") query.WriteString(k) } @@ -297,7 +277,7 @@ func ObtainImportantVariables(ctx context.Context, g glue.SQLExecutor, needTiDBV } // convert result into a map. fill in any missing variables with default values. - result := make(map[string]string, len(defaultImportantVariables)+len(defaultImportVariablesTiDB)) + result := make(map[string]string, len(common.DefaultImportantVariables)+len(common.DefaultImportVariablesTiDB)) for _, kv := range kvs { result[kv[0]] = kv[1] } @@ -309,9 +289,9 @@ func ObtainImportantVariables(ctx context.Context, g glue.SQLExecutor, needTiDBV } } } - setDefaultValue(result, defaultImportantVariables) + setDefaultValue(result, common.DefaultImportantVariables) if needTiDBVars { - setDefaultValue(result, defaultImportVariablesTiDB) + setDefaultValue(result, common.DefaultImportVariablesTiDB) } return result diff --git a/br/pkg/mock/backend.go b/br/pkg/mock/backend.go index 713682a9f247a..35d4158546bd3 100644 --- a/br/pkg/mock/backend.go +++ b/br/pkg/mock/backend.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/pingcap/tidb/br/pkg/lightning/backend (interfaces: AbstractBackend,EngineWriter) +// Source: github.com/pingcap/tidb/br/pkg/lightning/backend (interfaces: AbstractBackend,EngineWriter,TargetInfoGetter) // Package mock is a generated GoMock package. package mock @@ -41,20 +41,6 @@ func (m *MockBackend) EXPECT() *MockBackendMockRecorder { return m.recorder } -// CheckRequirements mocks base method. -func (m *MockBackend) CheckRequirements(arg0 context.Context, arg1 *backend.CheckCtx) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CheckRequirements", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// CheckRequirements indicates an expected call of CheckRequirements. -func (mr *MockBackendMockRecorder) CheckRequirements(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckRequirements", reflect.TypeOf((*MockBackend)(nil).CheckRequirements), arg0, arg1) -} - // CleanupEngine mocks base method. func (m *MockBackend) CleanupEngine(arg0 context.Context, arg1 uuid.UUID) error { m.ctrl.T.Helper() @@ -139,21 +125,6 @@ func (mr *MockBackendMockRecorder) EngineFileSizes() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EngineFileSizes", reflect.TypeOf((*MockBackend)(nil).EngineFileSizes)) } -// FetchRemoteTableModels mocks base method. -func (m *MockBackend) FetchRemoteTableModels(arg0 context.Context, arg1 string) ([]*model.TableInfo, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FetchRemoteTableModels", arg0, arg1) - ret0, _ := ret[0].([]*model.TableInfo) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// FetchRemoteTableModels indicates an expected call of FetchRemoteTableModels. -func (mr *MockBackendMockRecorder) FetchRemoteTableModels(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchRemoteTableModels", reflect.TypeOf((*MockBackend)(nil).FetchRemoteTableModels), arg0, arg1) -} - // FlushAllEngines mocks base method. func (m *MockBackend) FlushAllEngines(arg0 context.Context) error { m.ctrl.T.Helper() @@ -211,35 +182,6 @@ func (mr *MockBackendMockRecorder) LocalWriter(arg0, arg1, arg2 interface{}) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LocalWriter", reflect.TypeOf((*MockBackend)(nil).LocalWriter), arg0, arg1, arg2) } -// MakeEmptyRows mocks base method. -func (m *MockBackend) MakeEmptyRows() encode.Rows { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "MakeEmptyRows") - ret0, _ := ret[0].(encode.Rows) - return ret0 -} - -// MakeEmptyRows indicates an expected call of MakeEmptyRows. -func (mr *MockBackendMockRecorder) MakeEmptyRows() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MakeEmptyRows", reflect.TypeOf((*MockBackend)(nil).MakeEmptyRows)) -} - -// NewEncoder mocks base method. -func (m *MockBackend) NewEncoder(arg0 context.Context, arg1 *encode.EncodingConfig) (encode.Encoder, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "NewEncoder", arg0, arg1) - ret0, _ := ret[0].(encode.Encoder) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// NewEncoder indicates an expected call of NewEncoder. -func (mr *MockBackendMockRecorder) NewEncoder(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewEncoder", reflect.TypeOf((*MockBackend)(nil).NewEncoder), arg0, arg1) -} - // OpenEngine mocks base method. func (m *MockBackend) OpenEngine(arg0 context.Context, arg1 *backend.EngineConfig, arg2 uuid.UUID) error { m.ctrl.T.Helper() @@ -389,3 +331,55 @@ func (mr *MockEngineWriterMockRecorder) IsSynced() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsSynced", reflect.TypeOf((*MockEngineWriter)(nil).IsSynced)) } + +// MockTargetInfoGetter is a mock of TargetInfoGetter interface. +type MockTargetInfoGetter struct { + ctrl *gomock.Controller + recorder *MockTargetInfoGetterMockRecorder +} + +// MockTargetInfoGetterMockRecorder is the mock recorder for MockTargetInfoGetter. +type MockTargetInfoGetterMockRecorder struct { + mock *MockTargetInfoGetter +} + +// NewMockTargetInfoGetter creates a new mock instance. +func NewMockTargetInfoGetter(ctrl *gomock.Controller) *MockTargetInfoGetter { + mock := &MockTargetInfoGetter{ctrl: ctrl} + mock.recorder = &MockTargetInfoGetterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTargetInfoGetter) EXPECT() *MockTargetInfoGetterMockRecorder { + return m.recorder +} + +// CheckRequirements mocks base method. +func (m *MockTargetInfoGetter) CheckRequirements(arg0 context.Context, arg1 *backend.CheckCtx) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CheckRequirements", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// CheckRequirements indicates an expected call of CheckRequirements. +func (mr *MockTargetInfoGetterMockRecorder) CheckRequirements(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckRequirements", reflect.TypeOf((*MockTargetInfoGetter)(nil).CheckRequirements), arg0, arg1) +} + +// FetchRemoteTableModels mocks base method. +func (m *MockTargetInfoGetter) FetchRemoteTableModels(arg0 context.Context, arg1 string) ([]*model.TableInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchRemoteTableModels", arg0, arg1) + ret0, _ := ret[0].([]*model.TableInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchRemoteTableModels indicates an expected call of FetchRemoteTableModels. +func (mr *MockTargetInfoGetterMockRecorder) FetchRemoteTableModels(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchRemoteTableModels", reflect.TypeOf((*MockTargetInfoGetter)(nil).FetchRemoteTableModels), arg0, arg1) +} diff --git a/br/pkg/mock/encode.go b/br/pkg/mock/encode.go index ee503ac4c0e43..a8b31b068d446 100644 --- a/br/pkg/mock/encode.go +++ b/br/pkg/mock/encode.go @@ -1,10 +1,11 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/pingcap/tidb/br/pkg/lightning/backend/encode (interfaces: Encoder,Rows,Row) +// Source: github.com/pingcap/tidb/br/pkg/lightning/backend/encode (interfaces: Encoder,EncodingBuilder,Rows,Row) // Package mock is a generated GoMock package. package mock import ( + context "context" reflect "reflect" gomock "github.com/golang/mock/gomock" @@ -63,6 +64,58 @@ func (mr *MockEncoderMockRecorder) Encode(arg0, arg1, arg2, arg3 interface{}) *g return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Encode", reflect.TypeOf((*MockEncoder)(nil).Encode), arg0, arg1, arg2, arg3) } +// MockEncodingBuilder is a mock of EncodingBuilder interface. +type MockEncodingBuilder struct { + ctrl *gomock.Controller + recorder *MockEncodingBuilderMockRecorder +} + +// MockEncodingBuilderMockRecorder is the mock recorder for MockEncodingBuilder. +type MockEncodingBuilderMockRecorder struct { + mock *MockEncodingBuilder +} + +// NewMockEncodingBuilder creates a new mock instance. +func NewMockEncodingBuilder(ctrl *gomock.Controller) *MockEncodingBuilder { + mock := &MockEncodingBuilder{ctrl: ctrl} + mock.recorder = &MockEncodingBuilderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockEncodingBuilder) EXPECT() *MockEncodingBuilderMockRecorder { + return m.recorder +} + +// MakeEmptyRows mocks base method. +func (m *MockEncodingBuilder) MakeEmptyRows() encode.Rows { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MakeEmptyRows") + ret0, _ := ret[0].(encode.Rows) + return ret0 +} + +// MakeEmptyRows indicates an expected call of MakeEmptyRows. +func (mr *MockEncodingBuilderMockRecorder) MakeEmptyRows() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MakeEmptyRows", reflect.TypeOf((*MockEncodingBuilder)(nil).MakeEmptyRows)) +} + +// NewEncoder mocks base method. +func (m *MockEncodingBuilder) NewEncoder(arg0 context.Context, arg1 *encode.EncodingConfig) (encode.Encoder, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewEncoder", arg0, arg1) + ret0, _ := ret[0].(encode.Encoder) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// NewEncoder indicates an expected call of NewEncoder. +func (mr *MockEncodingBuilderMockRecorder) NewEncoder(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewEncoder", reflect.TypeOf((*MockEncodingBuilder)(nil).NewEncoder), arg0, arg1) +} + // MockRows is a mock of Rows interface. type MockRows struct { ctrl *gomock.Controller diff --git a/ddl/ingest/backend_mgr.go b/ddl/ingest/backend_mgr.go index e6e6b030fb3bc..9b70f9d4da93f 100644 --- a/ddl/ingest/backend_mgr.go +++ b/ddl/ingest/backend_mgr.go @@ -89,8 +89,14 @@ func createLocalBackend(ctx context.Context, cfg *Config, glue glue.Glue) (backe logutil.BgLogger().Info("[ddl-ingest] create local backend for adding index", zap.String("keyspaceName", cfg.KeyspaceName)) errorMgr := errormanager.New(nil, cfg.Lightning, log.Logger{Logger: logutil.BgLogger()}) - encodingBuilder := local.NewEncodingBuilder(ctx) - return local.NewLocalBackend(ctx, tls, cfg.Lightning, glue, int(LitRLimit), errorMgr, cfg.KeyspaceName, encodingBuilder) + db, err := glue.GetDB() + if err != nil { + return backend.Backend{}, err + } + regionSizeGetter := &local.TableRegionSizeGetterImpl{ + DB: db, + } + return local.NewLocalBackend(ctx, tls, cfg.Lightning, regionSizeGetter, int(LitRLimit), errorMgr, cfg.KeyspaceName) } func newBackendContext(ctx context.Context, jobID int64, be *backend.Backend, diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel index 1d5374a6d3e4f..4b05eb4299a4d 100644 --- a/executor/BUILD.bazel +++ b/executor/BUILD.bazel @@ -99,7 +99,6 @@ go_library( deps = [ "//bindinfo", "//br/pkg/glue", - "//br/pkg/lightning/log", "//br/pkg/lightning/mydump", "//br/pkg/storage", "//br/pkg/task", @@ -181,7 +180,6 @@ go_library( "//util/gcutil", "//util/hack", "//util/hint", - "//util/intest", "//util/keydecoder", "//util/kvcache", "//util/logutil", @@ -356,7 +354,6 @@ go_test( flaky = True, shard_count = 50, deps = [ - "//br/pkg/errors", "//config", "//ddl", "//ddl/placement", diff --git a/executor/importer/BUILD.bazel b/executor/importer/BUILD.bazel index 609ee7aef7103..190398c7c5d82 100644 --- a/executor/importer/BUILD.bazel +++ b/executor/importer/BUILD.bazel @@ -6,15 +6,26 @@ go_library( importpath = "github.com/pingcap/tidb/executor/importer", visibility = ["//visibility:public"], deps = [ + "//br/pkg/lightning/common", "//br/pkg/lightning/config", + "//br/pkg/lightning/log", + "//br/pkg/lightning/mydump", + "//br/pkg/storage", "//parser/ast", "//parser/mysql", + "//parser/terror", "//planner/core", "//sessionctx", "//table", "//util/chunk", "//util/dbterror", "//util/dbterror/exeerrors", + "//util/intest", + "//util/logutil", + "//util/stringutil", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_log//:log", + "@org_uber_go_zap//:zap", ], ) @@ -25,6 +36,7 @@ go_test( embed = [":importer"], flaky = True, deps = [ + "//br/pkg/errors", "//br/pkg/lightning/config", "//expression", "//parser", @@ -32,6 +44,7 @@ go_test( "//planner/core", "//util/dbterror/exeerrors", "//util/mock", + "@com_github_pingcap_errors//:errors", "@com_github_stretchr_testify//require", ], ) diff --git a/executor/importer/import.go b/executor/importer/import.go index cb84fbcfb9643..1a734958ec359 100644 --- a/executor/importer/import.go +++ b/executor/importer/import.go @@ -15,20 +15,34 @@ package importer import ( + "context" "fmt" + "io" "math" + "path/filepath" "runtime" "strings" + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/config" + litlog "github.com/pingcap/tidb/br/pkg/lightning/log" + "github.com/pingcap/tidb/br/pkg/lightning/mydump" + "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/parser/terror" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/dbterror/exeerrors" + "github.com/pingcap/tidb/util/intest" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/stringutil" + "go.uber.org/zap" ) const ( @@ -40,8 +54,9 @@ const ( LoadDataFormatParquet = "parquet" // LogicalImportMode represents the import mode is SQL-like. - LogicalImportMode = "logical" // tidb backend - physicalImportMode = "physical" // local backend + LogicalImportMode = "logical" + // PhysicalImportMode represents the import mode is KV-like. + PhysicalImportMode = "physical" unlimitedWriteSpeed = config.ByteSize(math.MaxInt64) minDiskQuota = config.ByteSize(10 << 30) // 10GiB minWriteSpeed = config.ByteSize(1 << 10) // 1KiB/s @@ -94,28 +109,38 @@ type FieldMapping struct { UserVar *ast.VariableExpr } +// LoadDataReaderInfo provides information for a data reader of LOAD DATA. +type LoadDataReaderInfo struct { + // Opener can be called at needed to get a io.ReadSeekCloser. It will only + // be called once. + Opener func(ctx context.Context) (io.ReadSeekCloser, error) + // Remote is not nil only if load from cloud storage. + Remote *mydump.SourceFileMeta +} + // LoadDataController load data controller. // todo: need a better name type LoadDataController struct { - Path string - + FileLocRef ast.FileLocRefTp + Path string Format string ColumnsAndUserVars []*ast.ColumnNameOrUserVar ColumnAssignments []*ast.Assignment OnDuplicate ast.OnDuplicateKeyHandlingType - Table table.Table - SchemaName string + Table table.Table + DBName string + DBID int64 // how input field(or input column) from data file is mapped, either to a column or variable. // if there's NO column list clause in load data statement, then it's table's columns // else it's user defined list. - fieldMappings []*FieldMapping - // see InsertValues.insertColumns + FieldMappings []*FieldMapping + // see InsertValues.InsertColumns // todo: our behavior is different with mysql. such as for table t(a,b) // - "...(a,a) set a=100" is allowed in mysql, but not in tidb // - "...(a,b) set b=100" will set b=100 in mysql, but in tidb the set is ignored. - insertColumns []*table.Column + InsertColumns []*table.Column // used for DELIMITED DATA format FieldNullDef []string @@ -124,17 +149,42 @@ type LoadDataController struct { IgnoreLines uint64 // import options - importMode string + ImportMode string diskQuota config.ByteSize checksum config.PostOpLevel addIndex bool analyze config.PostOpLevel threadCnt int64 - batchSize int64 + BatchSize int64 maxWriteSpeed config.ByteSize // per second splitFile bool maxRecordedErrors int64 // -1 means record all error Detached bool + + logger *zap.Logger + sqlMode mysql.SQLMode + importantSysVars map[string]string + dataStore storage.ExternalStorage + dataFiles []*mydump.SourceFileMeta +} + +func getImportantSysVars(sctx sessionctx.Context) map[string]string { + res := map[string]string{} + for k, defVal := range common.DefaultImportantVariables { + if val, ok := sctx.GetSessionVars().GetSystemVar(k); ok { + res[k] = val + } else { + res[k] = defVal + } + } + for k, defVal := range common.DefaultImportVariablesTiDB { + if val, ok := sctx.GetSessionVars().GetSystemVar(k); ok { + res[k] = val + } else { + res[k] = defVal + } + } + return res } // NewLoadDataController create new controller. @@ -146,15 +196,22 @@ func NewLoadDataController(sctx sessionctx.Context, plan *plannercore.LoadData, // without FORMAT 'xxx' clause, default to DELIMITED DATA format = LoadDataFormatDelimitedData } + fullTableName := common.UniqueTable(plan.Table.Schema.L, plan.Table.Name.L) c := &LoadDataController{ + FileLocRef: plan.FileLocRef, Path: plan.Path, Format: format, ColumnsAndUserVars: plan.ColumnsAndUserVars, ColumnAssignments: plan.ColumnAssignments, OnDuplicate: plan.OnDuplicate, - SchemaName: plan.Table.Schema.O, + DBName: plan.Table.Schema.O, + DBID: plan.Table.DBInfo.ID, Table: tbl, LineFieldsInfo: plannercore.NewLineFieldsInfo(plan.FieldsInfo, plan.LinesInfo), + + logger: log.L().With(zap.String("table", fullTableName)), + sqlMode: sctx.GetSessionVars().SQLMode, + importantSysVars: getImportantSysVars(sctx), } if err := c.initFieldParams(plan); err != nil { return nil, err @@ -178,6 +235,11 @@ func (e *LoadDataController) initFieldParams(plan *plannercore.LoadData) error { return exeerrors.ErrLoadDataUnsupportedFormat.GenWithStackByArgs(e.Format) } + if e.FileLocRef == ast.FileLocClient && e.Format == LoadDataFormatParquet { + // parquet parser need seek around, it's not supported for client local file + return exeerrors.ErrLoadParquetFromLocal + } + if e.Format != LoadDataFormatDelimitedData { if plan.FieldsInfo != nil || plan.LinesInfo != nil || plan.IgnoreLines != nil { return exeerrors.ErrLoadDataWrongFormatConfig.GenWithStackByArgs(fmt.Sprintf("cannot specify FIELDS ... or LINES ... or IGNORE N LINES for format '%s'", e.Format)) @@ -236,13 +298,13 @@ func (e *LoadDataController) initDefaultOptions() { threadCnt = int(math.Max(1, float64(threadCnt)*0.75)) } - e.importMode = LogicalImportMode + e.ImportMode = LogicalImportMode _ = e.diskQuota.UnmarshalText([]byte("50GiB")) // todo confirm with pm e.checksum = config.OpLevelRequired e.addIndex = true e.analyze = config.OpLevelOptional e.threadCnt = int64(threadCnt) - e.batchSize = 1000 + e.BatchSize = 1000 e.maxWriteSpeed = unlimitedWriteSpeed e.splitFile = false e.maxRecordedErrors = 100 @@ -278,17 +340,17 @@ func (e *LoadDataController) initOptions(seCtx sessionctx.Context, options []*pl return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name) } v = strings.ToLower(v) - if v != LogicalImportMode && v != physicalImportMode { + if v != LogicalImportMode && v != PhysicalImportMode { return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name) } - e.importMode = v + e.ImportMode = v } - if e.importMode == LogicalImportMode { + if e.ImportMode == LogicalImportMode { // some options are only allowed in physical mode for _, opt := range specifiedOptions { if _, ok := optionsForPhysicalImport[opt.Name]; ok { - return exeerrors.ErrLoadDataUnsupportedOption.FastGenByArgs(opt.Name, e.importMode) + return exeerrors.ErrLoadDataUnsupportedOption.FastGenByArgs(opt.Name, e.ImportMode) } } } @@ -338,8 +400,8 @@ func (e *LoadDataController) initOptions(seCtx sessionctx.Context, options []*pl } } if opt, ok := specifiedOptions[batchSizeOption]; ok { - e.batchSize, isNull, err = opt.Value.EvalInt(seCtx, chunk.Row{}) - if err != nil || isNull || e.batchSize < 0 { + e.BatchSize, isNull, err = opt.Value.EvalInt(seCtx, chunk.Row{}) + if err != nil || isNull || e.BatchSize < 0 { return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name) } } @@ -404,7 +466,7 @@ func (e *LoadDataController) initFieldMappings() []string { fieldMapping := &FieldMapping{ Column: v, } - e.fieldMappings = append(e.fieldMappings, fieldMapping) + e.FieldMappings = append(e.FieldMappings, fieldMapping) columns = append(columns, v.Name.O) } @@ -425,7 +487,7 @@ func (e *LoadDataController) initFieldMappings() []string { Column: column, UserVar: v.UserVar, } - e.fieldMappings = append(e.fieldMappings, fieldMapping) + e.FieldMappings = append(e.FieldMappings, fieldMapping) } return columns @@ -452,11 +514,11 @@ func (e *LoadDataController) initLoadColumns(columnNames []string) error { for _, col := range cols { if !col.IsGenerated() { // todo: should report error here, since in reorderColumns we report error if en(cols) != len(columnNames) - e.insertColumns = append(e.insertColumns, col) + e.InsertColumns = append(e.InsertColumns, col) } } - // e.insertColumns is appended according to the original tables' column sequence. + // e.InsertColumns is appended according to the original tables' column sequence. // We have to reorder it to follow the use-specified column order which is shown in the columnNames. if err = e.reorderColumns(columnNames); err != nil { return err @@ -471,10 +533,10 @@ func (e *LoadDataController) initLoadColumns(columnNames []string) error { return nil } -// reorderColumns reorder the e.insertColumns according to the order of columnNames -// Note: We must ensure there must be one-to-one mapping between e.insertColumns and columnNames in terms of column name. +// reorderColumns reorder the e.InsertColumns according to the order of columnNames +// Note: We must ensure there must be one-to-one mapping between e.InsertColumns and columnNames in terms of column name. func (e *LoadDataController) reorderColumns(columnNames []string) error { - cols := e.insertColumns + cols := e.InsertColumns if len(cols) != len(columnNames) { return exeerrors.ErrColumnsNotMatched @@ -496,30 +558,14 @@ func (e *LoadDataController) reorderColumns(columnNames []string) error { reorderedColumns[idx] = col } - e.insertColumns = reorderedColumns + e.InsertColumns = reorderedColumns return nil } -// GetInsertColumns get column list need to insert into target table. -// this list include all columns and in the same order as in fieldMappings and ColumnAssignments -func (e *LoadDataController) GetInsertColumns() []*table.Column { - return e.insertColumns -} - -// GetFieldMapping get field mapping. -func (e *LoadDataController) GetFieldMapping() []*FieldMapping { - return e.fieldMappings -} - // GetFieldCount get field count. func (e *LoadDataController) GetFieldCount() int { - return len(e.fieldMappings) -} - -// GetBatchSize get batch size. -func (e *LoadDataController) GetBatchSize() int64 { - return e.batchSize + return len(e.FieldMappings) } // GenerateCSVConfig generates a CSV config for parser from LoadDataWorker. @@ -540,3 +586,194 @@ func (e *LoadDataController) GenerateCSVConfig() *config.CSVConfig { UnescapedQuote: true, } } + +// InitDataFiles initializes the data store and load data files. +func (e *LoadDataController) InitDataFiles(ctx context.Context) error { + u, err2 := storage.ParseRawURL(e.Path) + if err2 != nil { + return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(err2.Error()) + } + path := strings.Trim(u.Path, "/") + u.Path = "" + b, err2 := storage.ParseBackendFromURL(u, nil) + if err2 != nil { + return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(GetMsgFromBRError(err2)) + } + if b.GetLocal() != nil { + return exeerrors.ErrLoadDataFromServerDisk.GenWithStackByArgs(e.Path) + } + // try to find pattern error in advance + _, err2 = filepath.Match(stringutil.EscapeGlobExceptAsterisk(path), "") + if err2 != nil { + return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs("Glob pattern error: " + err2.Error()) + } + + opt := &storage.ExternalStorageOptions{} + if intest.InTest { + opt.NoCredentials = true + } + s, err := storage.New(ctx, b, opt) + if err != nil { + return exeerrors.ErrLoadDataCantAccess.GenWithStackByArgs(GetMsgFromBRError(err)) + } + + dataFiles := []*mydump.SourceFileMeta{} + idx := strings.IndexByte(path, '*') + // simple path when the INFILE represent one file + if idx == -1 { + fileReader, err2 := s.Open(ctx, path) + if err2 != nil { + return exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(GetMsgFromBRError(err2), "Please check the INFILE path is correct") + } + defer func() { + terror.Log(fileReader.Close()) + }() + size, err3 := fileReader.Seek(0, io.SeekEnd) + if err3 != nil { + return exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(GetMsgFromBRError(err2), "failed to read file size by seek in LOAD DATA") + } + dataFiles = append(dataFiles, &mydump.SourceFileMeta{ + Path: path, + FileSize: size, + }) + } else { + commonPrefix := path[:idx] + // we only support '*', in order to reuse glob library manually escape the path + escapedPath := stringutil.EscapeGlobExceptAsterisk(path) + err = s.WalkDir(ctx, &storage.WalkOption{ObjPrefix: commonPrefix}, + func(remotePath string, size int64) error { + // we have checked in LoadDataExec.Next + //nolint: errcheck + match, _ := filepath.Match(escapedPath, remotePath) + if !match { + return nil + } + dataFiles = append(dataFiles, &mydump.SourceFileMeta{ + Path: remotePath, + FileSize: size, + }) + return nil + }) + if err != nil { + return err + } + } + + e.dataStore = s + e.dataFiles = dataFiles + return nil +} + +// GetLoadDataReaderInfos returns the LoadDataReaderInfo for each data file. +func (e *LoadDataController) GetLoadDataReaderInfos() []LoadDataReaderInfo { + result := make([]LoadDataReaderInfo, 0, len(e.dataFiles)) + for i := range e.dataFiles { + f := e.dataFiles[i] + result = append(result, LoadDataReaderInfo{ + Opener: func(ctx context.Context) (io.ReadSeekCloser, error) { + fileReader, err2 := e.dataStore.Open(ctx, f.Path) + if err2 != nil { + return nil, exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(GetMsgFromBRError(err2), "Please check the INFILE path is correct") + } + return fileReader, nil + }, + Remote: f, + }) + } + return result +} + +// GetParser returns a parser for the data file. +func (e *LoadDataController) GetParser(ctx context.Context, dataFileInfo LoadDataReaderInfo) ( + parser mydump.Parser, err error) { + reader, err2 := dataFileInfo.Opener(ctx) + if err2 != nil { + return nil, err2 + } + defer func() { + if err != nil { + if err3 := reader.Close(); err3 != nil { + e.logger.Warn("failed to close reader", zap.Error(err3)) + } + } + }() + switch e.Format { + case LoadDataFormatDelimitedData: + // CSV-like + parser, err = mydump.NewCSVParser( + ctx, + e.GenerateCSVConfig(), + reader, + LoadDataReadBlockSize, + nil, + false, + // TODO: support charset conversion + nil) + case LoadDataFormatSQLDump: + parser = mydump.NewChunkParser( + ctx, + e.sqlMode, + reader, + LoadDataReadBlockSize, + nil, + ) + case LoadDataFormatParquet: + parser, err = mydump.NewParquetParser( + ctx, + e.dataStore, + reader, + dataFileInfo.Remote.Path, + ) + } + if err != nil { + return nil, exeerrors.ErrLoadDataWrongFormatConfig.GenWithStack(err.Error()) + } + parser.SetLogger(litlog.Logger{Logger: logutil.Logger(ctx)}) + + // handle IGNORE N LINES + ignoreOneLineFn := parser.ReadRow + if csvParser, ok := parser.(*mydump.CSVParser); ok { + ignoreOneLineFn = func() error { + _, _, err3 := csvParser.ReadUntilTerminator() + return err3 + } + } + + ignoreLineCnt := e.IgnoreLines + for ignoreLineCnt > 0 { + err = ignoreOneLineFn() + if err != nil { + if errors.Cause(err) == io.EOF { + return parser, nil + } + return nil, err + } + + ignoreLineCnt-- + } + return parser, nil +} + +// PhysicalImport do physical import. +func (e *LoadDataController) PhysicalImport(ctx context.Context) (int64, error) { + // todo: implement it + return 0, nil +} + +// GetMsgFromBRError get msg from BR error. +// TODO: add GetMsg() to errors package to replace this function. +// see TestGetMsgFromBRError for more details. +func GetMsgFromBRError(err error) string { + if err == nil { + return "" + } + if berr, ok := err.(*errors.Error); ok { + return berr.GetMsg() + } + raw := err.Error() + berrMsg := errors.Cause(err).Error() + if len(raw) <= len(berrMsg)+len(": ") { + return raw + } + return raw[:len(raw)-len(berrMsg)-len(": ")] +} diff --git a/executor/importer/import_test.go b/executor/importer/import_test.go index ae13cfa8799f7..84e541b4a0a81 100644 --- a/executor/importer/import_test.go +++ b/executor/importer/import_test.go @@ -20,6 +20,8 @@ import ( "runtime" "testing" + "github.com/pingcap/errors" + berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/lightning/config" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/parser" @@ -33,13 +35,13 @@ import ( func TestInitDefaultOptions(t *testing.T) { e := LoadDataController{} e.initDefaultOptions() - require.Equal(t, LogicalImportMode, e.importMode) + require.Equal(t, LogicalImportMode, e.ImportMode) require.Equal(t, config.ByteSize(50<<30), e.diskQuota) require.Equal(t, config.OpLevelRequired, e.checksum) require.Equal(t, true, e.addIndex) require.Equal(t, config.OpLevelOptional, e.analyze) require.Equal(t, int64(runtime.NumCPU()), e.threadCnt) - require.Equal(t, int64(1000), e.batchSize) + require.Equal(t, int64(1000), e.BatchSize) require.Equal(t, unlimitedWriteSpeed, e.maxWriteSpeed) require.Equal(t, false, e.splitFile) require.Equal(t, int64(100), e.maxRecordedErrors) @@ -158,13 +160,13 @@ func TestInitOptions(t *testing.T) { require.NoError(t, err, sql) err = e.initOptions(ctx, convertOptions(stmt.(*ast.LoadDataStmt).Options)) require.NoError(t, err, sql) - require.Equal(t, physicalImportMode, e.importMode, sql) + require.Equal(t, PhysicalImportMode, e.ImportMode, sql) require.Equal(t, config.ByteSize(100<<30), e.diskQuota, sql) require.Equal(t, config.OpLevelOptional, e.checksum, sql) require.False(t, e.addIndex, sql) require.Equal(t, config.OpLevelRequired, e.analyze, sql) require.Equal(t, int64(runtime.NumCPU()), e.threadCnt, sql) - require.Equal(t, int64(2000), e.batchSize, sql) + require.Equal(t, int64(2000), e.BatchSize, sql) require.Equal(t, config.ByteSize(200<<20), e.maxWriteSpeed, sql) require.True(t, e.splitFile, sql) require.Equal(t, int64(123), e.maxRecordedErrors, sql) @@ -182,3 +184,12 @@ func TestAdjustOptions(t *testing.T) { require.Equal(t, int64(runtime.NumCPU()), e.threadCnt) require.Equal(t, minWriteSpeed, e.maxWriteSpeed) } + +func TestGetMsgFromBRError(t *testing.T) { + var berr error = berrors.ErrStorageInvalidConfig + require.Equal(t, "[BR:ExternalStorage:ErrStorageInvalidConfig]invalid external storage config", berr.Error()) + require.Equal(t, "invalid external storage config", GetMsgFromBRError(berr)) + berr = errors.Annotatef(berr, "some message about error reason") + require.Equal(t, "some message about error reason: [BR:ExternalStorage:ErrStorageInvalidConfig]invalid external storage config", berr.Error()) + require.Equal(t, "some message about error reason", GetMsgFromBRError(berr)) +} diff --git a/executor/load_data.go b/executor/load_data.go index 3c8e2402bf7ac..2fe21cdfb4b93 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -18,17 +18,13 @@ import ( "context" "fmt" "io" - "path/filepath" "strings" "sync/atomic" "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" - backup "github.com/pingcap/kvproto/pkg/brpb" - "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/lightning/mydump" - "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/executor/asyncloaddata" "github.com/pingcap/tidb/executor/importer" "github.com/pingcap/tidb/expression" @@ -44,10 +40,8 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/dbterror/exeerrors" - "github.com/pingcap/tidb/util/intest" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" - "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tidb/util/syncutil" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -89,25 +83,7 @@ func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { switch e.FileLocRef { case ast.FileLocServerOrRemote: - u, err2 := storage.ParseRawURL(e.loadDataWorker.GetInfilePath()) - if err2 != nil { - return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(err2.Error()) - } - path := strings.Trim(u.Path, "/") - u.Path = "" - b, err2 := storage.ParseBackendFromURL(u, nil) - if err2 != nil { - return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(getMsgFromBRError(err2)) - } - if b.GetLocal() != nil { - return exeerrors.ErrLoadDataFromServerDisk.GenWithStackByArgs(e.loadDataWorker.GetInfilePath()) - } - // try to find pattern error in advance - _, err2 = filepath.Match(stringutil.EscapeGlobExceptAsterisk(path), "") - if err2 != nil { - return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs("Glob pattern error: " + err2.Error()) - } - jobID, err2 := e.loadFromRemote(ctx, b, path) + jobID, err2 := e.loadDataWorker.loadRemote(ctx) if err2 != nil { return err2 } @@ -132,89 +108,6 @@ func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { return nil } -func (e *LoadDataExec) loadFromRemote( - ctx context.Context, - b *backup.StorageBackend, - path string, -) (int64, error) { - opt := &storage.ExternalStorageOptions{} - if intest.InTest { - opt.NoCredentials = true - } - s, err := storage.New(ctx, b, opt) - if err != nil { - return 0, exeerrors.ErrLoadDataCantAccess.GenWithStackByArgs(getMsgFromBRError(err)) - } - - idx := strings.IndexByte(path, '*') - // simple path when the INFILE represent one file - if idx == -1 { - opener := func(ctx context.Context) (io.ReadSeekCloser, error) { - fileReader, err2 := s.Open(ctx, path) - if err2 != nil { - return nil, exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(getMsgFromBRError(err2), "Please check the INFILE path is correct") - } - return fileReader, nil - } - - // try to read the file size to report progress. Don't fail the main load - // if this fails to tolerate transient errors. - filesize := int64(-1) - reader, err2 := opener(ctx) - if err2 == nil { - size, err3 := reader.Seek(0, io.SeekEnd) - if err3 != nil { - logutil.Logger(ctx).Warn("failed to read file size by seek in LOAD DATA", - zap.Error(err3)) - } else { - filesize = size - } - terror.Log(reader.Close()) - } - - return e.loadDataWorker.Load(ctx, []LoadDataReaderInfo{{ - Opener: opener, - Remote: &loadRemoteInfo{store: s, path: path, size: filesize}, - }}) - } - - // when the INFILE represent multiple files - readerInfos := make([]LoadDataReaderInfo, 0, 8) - commonPrefix := path[:idx] - // we only support '*', in order to reuse glob library manually escape the path - escapedPath := stringutil.EscapeGlobExceptAsterisk(path) - - err = s.WalkDir(ctx, &storage.WalkOption{ObjPrefix: commonPrefix}, - func(remotePath string, size int64) error { - // we have checked in LoadDataExec.Next - //nolint: errcheck - match, _ := filepath.Match(escapedPath, remotePath) - if !match { - return nil - } - readerInfos = append(readerInfos, LoadDataReaderInfo{ - Opener: func(ctx context.Context) (io.ReadSeekCloser, error) { - fileReader, err2 := s.Open(ctx, remotePath) - if err2 != nil { - return nil, exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(getMsgFromBRError(err2), "Please check the INFILE path is correct") - } - return fileReader, nil - }, - Remote: &loadRemoteInfo{ - store: s, - path: remotePath, - size: size, - }, - }) - return nil - }) - if err != nil { - return 0, err - } - - return e.loadDataWorker.Load(ctx, readerInfos) -} - // Close implements the Executor Close interface. func (e *LoadDataExec) Close() error { if e.runtimeStats != nil && e.loadDataWorker != nil && e.loadDataWorker.stats != nil { @@ -242,12 +135,6 @@ type commitTask struct { scannedFileSize int64 } -type loadRemoteInfo struct { - store storage.ExternalStorage - path string - size int64 -} - // LoadDataWorker does a LOAD DATA job. type LoadDataWorker struct { *InsertValues @@ -326,7 +213,7 @@ func NewLoadDataWorker( GenExprs: plan.GenCols.Exprs, isLoadData: true, txnInUse: syncutil.Mutex{}, - maxRowsInBatch: uint64(controller.GetBatchSize()), + maxRowsInBatch: uint64(controller.BatchSize), } restrictive := sctx.GetSessionVars().SQLMode.HasStrictMode() && plan.OnDuplicate != ast.OnDuplicateKeyHandlingIgnore @@ -356,7 +243,7 @@ func NewLoadDataWorker( } func (e *LoadDataWorker) initInsertValues() error { - e.insertColumns = e.controller.GetInsertColumns() + e.insertColumns = e.controller.InsertColumns e.rowLen = len(e.insertColumns) for _, col := range e.insertColumns { @@ -372,19 +259,23 @@ func (e *LoadDataWorker) initInsertValues() error { return nil } -// LoadDataReaderInfo provides information for a data reader of LOAD DATA. -type LoadDataReaderInfo struct { - // Opener can be called at needed to get a io.ReadSeekCloser. It will only - // be called once. - Opener func(ctx context.Context) (io.ReadSeekCloser, error) - // Remote is not nil only if load from cloud storage. - Remote *loadRemoteInfo +func (e *LoadDataWorker) loadRemote(ctx context.Context) (int64, error) { + if err2 := e.controller.InitDataFiles(ctx); err2 != nil { + return 0, err2 + } + + if e.controller.ImportMode == importer.PhysicalImportMode { + return e.controller.PhysicalImport(ctx) + } + + dataReaderInfos := e.controller.GetLoadDataReaderInfos() + return e.Load(ctx, dataReaderInfos) } // Load reads from readerInfos and do load data job. func (e *LoadDataWorker) Load( ctx context.Context, - readerInfos []LoadDataReaderInfo, + readerInfos []importer.LoadDataReaderInfo, ) (int64, error) { var ( jobID int64 @@ -403,7 +294,7 @@ func (e *LoadDataWorker) Load( ctx, sqlExec, e.GetInfilePath(), - e.controller.SchemaName, + e.controller.DBName, e.table.Meta().Name.O, importer.LogicalImportMode, e.Ctx.GetSessionVars().User.String(), @@ -427,7 +318,7 @@ func (e *LoadDataWorker) Load( func (e *LoadDataWorker) doLoad( ctx context.Context, - readerInfos []LoadDataReaderInfo, + readerInfos []importer.LoadDataReaderInfo, jobID int64, ) (err error) { defer func() { @@ -486,7 +377,7 @@ func (e *LoadDataWorker) doLoad( hasErr = true break } - totalFilesize += readerInfo.Remote.size + totalFilesize += readerInfo.Remote.FileSize } if !hasErr { progress.SourceFileSize = totalFilesize @@ -511,21 +402,14 @@ func (e *LoadDataWorker) doLoad( } // processStream goroutine. - var parser mydump.Parser group.Go(func() error { for _, info := range readerInfos { - reader, err2 := info.Opener(ctx) - if err2 != nil { - return err2 - } - - parser, err2 = e.buildParser(ctx, reader, info.Remote) + dataParser, err2 := e.controller.GetParser(ctx, info) if err2 != nil { - terror.Log(reader.Close()) return err2 } - err2 = e.processStream(groupCtx, parser, reader) - terror.Log(reader.Close()) + err2 = e.processStream(groupCtx, dataParser) + terror.Log(dataParser.Close()) if err2 != nil { return err2 } @@ -579,77 +463,11 @@ func (e *LoadDataWorker) doLoad( return err } -func (e *LoadDataWorker) buildParser( - ctx context.Context, - reader io.ReadSeekCloser, - remote *loadRemoteInfo, -) (parser mydump.Parser, err error) { - switch e.controller.Format { - case importer.LoadDataFormatDelimitedData: - // CSV-like - parser, err = mydump.NewCSVParser( - ctx, - e.controller.GenerateCSVConfig(), - reader, - importer.LoadDataReadBlockSize, - nil, - false, - // TODO: support charset conversion - nil) - case importer.LoadDataFormatSQLDump: - parser = mydump.NewChunkParser( - ctx, - e.Ctx.GetSessionVars().SQLMode, - reader, - importer.LoadDataReadBlockSize, - nil, - ) - case importer.LoadDataFormatParquet: - if remote == nil { - return nil, exeerrors.ErrLoadParquetFromLocal - } - parser, err = mydump.NewParquetParser( - ctx, - remote.store, - reader, - remote.path, - ) - } - if err != nil { - return nil, exeerrors.ErrLoadDataWrongFormatConfig.GenWithStack(err.Error()) - } - parser.SetLogger(log.Logger{Logger: logutil.Logger(ctx)}) - - // handle IGNORE N LINES - ignoreOneLineFn := parser.ReadRow - if csvParser, ok := parser.(*mydump.CSVParser); ok { - ignoreOneLineFn = func() error { - _, _, err := csvParser.ReadUntilTerminator() - return err - } - } - - ignoreLineCnt := e.controller.IgnoreLines - for ignoreLineCnt > 0 { - err = ignoreOneLineFn() - if err != nil { - if errors.Cause(err) == io.EOF { - return parser, nil - } - return nil, err - } - - ignoreLineCnt-- - } - return parser, nil -} - // processStream process input stream from parser. When returns nil, it means // all data is read. func (e *LoadDataWorker) processStream( ctx context.Context, parser mydump.Parser, - seeker io.Seeker, ) (err error) { defer func() { r := recover() @@ -679,7 +497,7 @@ func (e *LoadDataWorker) processStream( } TrySendTask: - currScannedSize, err = seeker.Seek(0, io.SeekCurrent) + currScannedSize, err = parser.ScannedPos() if err != nil && !loggedError { loggedError = true logutil.Logger(ctx).Error(" LOAD DATA failed to read current file offset by seek", @@ -934,7 +752,7 @@ func (e *LoadDataWorker) parserData2TableData( } } - fieldMappings := e.controller.GetFieldMapping() + fieldMappings := e.controller.FieldMappings for i := 0; i < len(fieldMappings); i++ { if i >= len(parserData) { if fieldMappings[i].Column == nil { diff --git a/executor/loaddatatest/load_data_test.go b/executor/loaddatatest/load_data_test.go index 7563a59f5ca8d..a4b5321b9d906 100644 --- a/executor/loaddatatest/load_data_test.go +++ b/executor/loaddatatest/load_data_test.go @@ -99,6 +99,8 @@ func TestLoadDataInitParam(t *testing.T) { exeerrors.ErrLoadDataUnsupportedFormat) require.ErrorIs(t, tk.ExecToErr("load data infile '/a' format 'aaa' into table load_data_test"), exeerrors.ErrLoadDataUnsupportedFormat) + require.ErrorIs(t, tk.ExecToErr("load data local infile '/a' format 'parquet' into table load_data_test"), + exeerrors.ErrLoadParquetFromLocal) require.ErrorContains(t, tk.ExecToErr("load data infile '/a' format 'sql file' into table load_data_test fields terminated by 'a'"), "cannot specify FIELDS ... or LINES") require.ErrorContains(t, tk.ExecToErr("load data infile '/a' format 'parquet' into table load_data_test fields terminated by 'a'"), @@ -142,7 +144,7 @@ func TestLoadDataInitParam(t *testing.T) { []string{"NULL"}, false) // positive case - require.NoError(t, tk.ExecToErr("load data local infile '/a' format 'parquet' into table load_data_test")) + require.NoError(t, tk.ExecToErr("load data local infile '/a' format 'sql file' into table load_data_test")) ctx.SetValue(executor.LoadDataVarKey, nil) require.NoError(t, tk.ExecToErr("load data local infile '/a' into table load_data_test fields terminated by 'a'")) ctx.SetValue(executor.LoadDataVarKey, nil) diff --git a/executor/utils.go b/executor/utils.go index 4997eb9d277a8..47fe32a93aa68 100644 --- a/executor/utils.go +++ b/executor/utils.go @@ -16,8 +16,6 @@ package executor import ( "strings" - - "github.com/pingcap/errors" ) // SetFromString constructs a slice of strings from a comma separated string. @@ -94,20 +92,3 @@ func (b *batchRetrieverHelper) nextBatch(retrieveRange func(start, end int) erro } return nil } - -// TODO: add GetMsg() to errors package to replace this function. -// see TestGetMsgFromBRError for more details. -func getMsgFromBRError(err error) string { - if err == nil { - return "" - } - if berr, ok := err.(*errors.Error); ok { - return berr.GetMsg() - } - raw := err.Error() - berrMsg := errors.Cause(err).Error() - if len(raw) <= len(berrMsg)+len(": ") { - return raw - } - return raw[:len(raw)-len(berrMsg)-len(": ")] -} diff --git a/executor/utils_test.go b/executor/utils_test.go index 36501b19d9d6a..8afcddb16d781 100644 --- a/executor/utils_test.go +++ b/executor/utils_test.go @@ -18,7 +18,6 @@ import ( "testing" "github.com/pingcap/errors" - berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/types" "github.com/stretchr/testify/require" @@ -97,15 +96,6 @@ func TestBatchRetrieverHelper(t *testing.T) { require.Equal(t, rangeEnds, []int{10}) } -func TestGetMsgFromBRError(t *testing.T) { - var berr error = berrors.ErrStorageInvalidConfig - require.Equal(t, "[BR:ExternalStorage:ErrStorageInvalidConfig]invalid external storage config", berr.Error()) - require.Equal(t, "invalid external storage config", getMsgFromBRError(berr)) - berr = errors.Annotatef(berr, "some message about error reason") - require.Equal(t, "some message about error reason: [BR:ExternalStorage:ErrStorageInvalidConfig]invalid external storage config", berr.Error()) - require.Equal(t, "some message about error reason", getMsgFromBRError(berr)) -} - func TestEqualDatumsAsBinary(t *testing.T) { tests := []struct { a []interface{} diff --git a/server/BUILD.bazel b/server/BUILD.bazel index af93cefec6fe8..df40b50f44dff 100644 --- a/server/BUILD.bazel +++ b/server/BUILD.bazel @@ -34,6 +34,7 @@ go_library( "//domain/infosync", "//errno", "//executor", + "//executor/importer", "//expression", "//extension", "//infoschema", diff --git a/server/conn.go b/server/conn.go index 6f6735a936c26..dd1e8c1ab123e 100644 --- a/server/conn.go +++ b/server/conn.go @@ -60,6 +60,7 @@ import ( "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/executor" + "github.com/pingcap/tidb/executor/importer" "github.com/pingcap/tidb/extension" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" @@ -1623,7 +1624,7 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataWorker *execut }() ctx = kv.WithInternalSourceType(ctx, kv.InternalLoadData) - _, err = loadDataWorker.Load(ctx, []executor.LoadDataReaderInfo{{ + _, err = loadDataWorker.Load(ctx, []importer.LoadDataReaderInfo{{ Opener: func(_ context.Context) (io.ReadSeekCloser, error) { return executor.NewSimpleSeekerOnReadCloser(r), nil }}})