Skip to content

Commit

Permalink
lightning: refactor for reuse part2 (#42626)
Browse files Browse the repository at this point in the history
ref #40499
  • Loading branch information
D3Hunter authored Mar 28, 2023
1 parent bf470aa commit 976148d
Show file tree
Hide file tree
Showing 36 changed files with 776 additions and 690 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ mock_s3iface:
@mockgen -package mock github.com/aws/aws-sdk-go/service/s3/s3iface S3API > br/pkg/mock/s3iface.go

mock_lightning:
@mockgen -package mock -mock_names AbstractBackend=MockBackend github.com/pingcap/tidb/br/pkg/lightning/backend AbstractBackend,EngineWriter > br/pkg/mock/backend.go
@mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend/encode Encoder,Rows,Row > br/pkg/mock/encode.go
@mockgen -package mock -mock_names AbstractBackend=MockBackend github.com/pingcap/tidb/br/pkg/lightning/backend AbstractBackend,EngineWriter,TargetInfoGetter > br/pkg/mock/backend.go
@mockgen -package mock github.com/pingcap/tidb/br/pkg/lightning/backend/encode Encoder,EncodingBuilder,Rows,Row > br/pkg/mock/encode.go

# There is no FreeBSD environment for GitHub actions. So cross-compile on Linux
# but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have
Expand Down
18 changes: 0 additions & 18 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,6 @@ type TargetInfoGetter interface {
// Implementations of this interface must be goroutine safe: you can share an
// instance and execute any method anywhere.
type AbstractBackend interface {
encode.EncodingBuilder
TargetInfoGetter
// Close the connection to the backend.
Close()

Expand Down Expand Up @@ -261,26 +259,10 @@ func (be Backend) Close() {
be.abstract.Close()
}

func (be Backend) MakeEmptyRows() encode.Rows {
return be.abstract.MakeEmptyRows()
}

func (be Backend) NewEncoder(ctx context.Context, config *encode.EncodingConfig) (encode.Encoder, error) {
return be.abstract.NewEncoder(ctx, config)
}

func (be Backend) ShouldPostProcess() bool {
return be.abstract.ShouldPostProcess()
}

func (be Backend) CheckRequirements(ctx context.Context, checkCtx *CheckCtx) error {
return be.abstract.CheckRequirements(ctx, checkCtx)
}

func (be Backend) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
return be.abstract.FetchRemoteTableModels(ctx, schemaName)
}

func (be Backend) FlushAll(ctx context.Context) error {
return be.abstract.FlushAllEngines(ctx)
}
Expand Down
10 changes: 6 additions & 4 deletions br/pkg/lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
type backendSuite struct {
controller *gomock.Controller
mockBackend *mock.MockBackend
encBuilder *mock.MockEncodingBuilder
backend backend.Backend
ts uint64
}
Expand All @@ -32,6 +33,7 @@ func createBackendSuite(c gomock.TestReporter) *backendSuite {
controller: controller,
mockBackend: mockBackend,
backend: backend.MakeBackend(mockBackend),
encBuilder: mock.NewMockEncodingBuilder(controller),
ts: oracle.ComposeTS(time.Now().Unix()*1000, 0),
}
}
Expand Down Expand Up @@ -316,8 +318,8 @@ func TestMakeEmptyRows(t *testing.T) {
defer s.tearDownTest()

rows := mock.NewMockRows(s.controller)
s.mockBackend.EXPECT().MakeEmptyRows().Return(rows)
require.Equal(t, rows, s.mockBackend.MakeEmptyRows())
s.encBuilder.EXPECT().MakeEmptyRows().Return(rows)
require.Equal(t, rows, s.encBuilder.MakeEmptyRows())
}

func TestNewEncoder(t *testing.T) {
Expand All @@ -328,9 +330,9 @@ func TestNewEncoder(t *testing.T) {
options := &encode.EncodingConfig{
SessionOptions: encode.SessionOptions{SQLMode: mysql.ModeANSIQuotes, Timestamp: 1234567890},
}
s.mockBackend.EXPECT().NewEncoder(nil, options).Return(encoder, nil)
s.encBuilder.EXPECT().NewEncoder(nil, options).Return(encoder, nil)

realEncoder, err := s.mockBackend.NewEncoder(nil, options)
realEncoder, err := s.encBuilder.NewEncoder(nil, options)
require.Equal(t, realEncoder, encoder)
require.NoError(t, err)
}
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/backend/kv/kv2sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

type TableKVDecoder struct {
tbl table.Table
se *session
se *Session
// tableName is the unique table name in the form "`db`.`tbl`".
tableName string
genCols []genCol
Expand Down Expand Up @@ -91,7 +91,7 @@ func (t *TableKVDecoder) IterRawIndexKeys(h kv.Handle, rawRow []byte, fn func([]
if err != nil {
return err
}
iter := index.GenIndexKVIter(t.se.vars.StmtCtx, indexValues, h, nil)
iter := index.GenIndexKVIter(t.se.Vars.StmtCtx, indexValues, h, nil)
for iter.Valid() {
indexKey, _, _, err := iter.Next(indexBuffer)
if err != nil {
Expand All @@ -115,7 +115,7 @@ func NewTableKVDecoder(
options *encode.SessionOptions,
logger log.Logger,
) (*TableKVDecoder, error) {
se := newSession(options, logger)
se := NewSession(options, logger)
cols := tbl.Cols()
// Set CommonAddRecordCtx to session to reuse the slices and BufStore in AddRecord
recordCtx := tables.NewCommonAddRecordCtx(len(cols))
Expand Down
104 changes: 53 additions & 51 deletions br/pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,43 +54,45 @@ func (*invalidIterator) Valid() bool {
func (*invalidIterator) Close() {
}

type bytesBuf struct {
// BytesBuf bytes buffer.
type BytesBuf struct {
buf []byte
idx int
cap int
}

func (b *bytesBuf) add(v []byte) []byte {
func (b *BytesBuf) add(v []byte) []byte {
start := b.idx
copy(b.buf[start:], v)
b.idx += len(v)
return b.buf[start:b.idx:b.idx]
}

func newBytesBuf(size int) *bytesBuf {
return &bytesBuf{
func newBytesBuf(size int) *BytesBuf {
return &BytesBuf{
buf: manual.New(size),
cap: size,
}
}

func (b *bytesBuf) destroy() {
func (b *BytesBuf) destroy() {
if b != nil {
manual.Free(b.buf)
b.buf = nil
}
}

type kvMemBuf struct {
// MemBuf used to store the data in memory.
type MemBuf struct {
sync.Mutex
kv.MemBuffer
buf *bytesBuf
availableBufs []*bytesBuf
buf *BytesBuf
availableBufs []*BytesBuf
kvPairs *KvPairs
size int
}

func (mb *kvMemBuf) Recycle(buf *bytesBuf) {
func (mb *MemBuf) Recycle(buf *BytesBuf) {
buf.idx = 0
buf.cap = len(buf.buf)
mb.Lock()
Expand All @@ -104,11 +106,11 @@ func (mb *kvMemBuf) Recycle(buf *bytesBuf) {
mb.Unlock()
}

func (mb *kvMemBuf) AllocateBuf(size int) {
func (mb *MemBuf) AllocateBuf(size int) {
mb.Lock()
size = mathutil.Max(units.MiB, int(utils.NextPowerOfTwo(int64(size)))*2)
var (
existingBuf *bytesBuf
existingBuf *BytesBuf
existingBufIdx int
)
for i, buf := range mb.availableBufs {
Expand All @@ -128,45 +130,45 @@ func (mb *kvMemBuf) AllocateBuf(size int) {
mb.Unlock()
}

func (mb *kvMemBuf) Set(k kv.Key, v []byte) error {
func (mb *MemBuf) Set(k kv.Key, v []byte) error {
kvPairs := mb.kvPairs
size := len(k) + len(v)
if mb.buf == nil || mb.buf.cap-mb.buf.idx < size {
if mb.buf != nil {
kvPairs.bytesBuf = mb.buf
kvPairs.BytesBuf = mb.buf
}
mb.AllocateBuf(size)
}
kvPairs.pairs = append(kvPairs.pairs, common.KvPair{
kvPairs.Pairs = append(kvPairs.Pairs, common.KvPair{
Key: mb.buf.add(k),
Val: mb.buf.add(v),
})
mb.size += size
return nil
}

func (mb *kvMemBuf) SetWithFlags(k kv.Key, v []byte, ops ...kv.FlagsOp) error {
func (mb *MemBuf) SetWithFlags(k kv.Key, v []byte, ops ...kv.FlagsOp) error {
return mb.Set(k, v)
}

func (mb *kvMemBuf) Delete(k kv.Key) error {
func (mb *MemBuf) Delete(k kv.Key) error {
return errors.New("unsupported operation")
}

// Release publish all modifications in the latest staging buffer to upper level.
func (mb *kvMemBuf) Release(h kv.StagingHandle) {
func (mb *MemBuf) Release(h kv.StagingHandle) {
}

func (mb *kvMemBuf) Staging() kv.StagingHandle {
func (mb *MemBuf) Staging() kv.StagingHandle {
return 0
}

// Cleanup cleanup the resources referenced by the StagingHandle.
// Cleanup the resources referenced by the StagingHandle.
// If the changes are not published by `Release`, they will be discarded.
func (mb *kvMemBuf) Cleanup(h kv.StagingHandle) {}
func (mb *MemBuf) Cleanup(h kv.StagingHandle) {}

// Size returns sum of keys and values length.
func (mb *kvMemBuf) Size() int {
func (mb *MemBuf) Size() int {
return mb.size
}

Expand All @@ -176,11 +178,11 @@ func (t *transaction) Len() int {
}

type kvUnionStore struct {
kvMemBuf
MemBuf
}

func (s *kvUnionStore) GetMemBuffer() kv.MemBuffer {
return &s.kvMemBuf
return &s.MemBuf
}

func (s *kvUnionStore) GetIndexName(tableID, indexID int64) string {
Expand All @@ -201,7 +203,7 @@ type transaction struct {
}

func (t *transaction) GetMemBuffer() kv.MemBuffer {
return &t.kvUnionStore.kvMemBuf
return &t.kvUnionStore.MemBuf
}

func (t *transaction) Discard() {
Expand All @@ -228,7 +230,7 @@ func (t *transaction) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) {

// Set implements the kv.Mutator interface
func (t *transaction) Set(k kv.Key, v []byte) error {
return t.kvMemBuf.Set(k, v)
return t.MemBuf.Set(k, v)
}

// GetTableInfo implements the kv.Transaction interface.
Expand All @@ -245,24 +247,24 @@ func (t *transaction) SetAssertion(key []byte, assertion ...kv.FlagsOp) error {
return nil
}

// session is a trimmed down Session type which only wraps our own trimmed-down
// Session is a trimmed down Session type which only wraps our own trimmed-down
// transaction type and provides the session variables to the TiDB library
// optimized for Lightning.
type session struct {
type Session struct {
sessionctx.Context
txn transaction
vars *variable.SessionVars
Vars *variable.SessionVars
// currently, we only set `CommonAddRecordCtx`
values map[fmt.Stringer]interface{}
}

// NewSession creates a new trimmed down Session matching the options.
func NewSession(options *encode.SessionOptions, logger log.Logger) sessionctx.Context {
return newSession(options, logger)
// NewSessionCtx creates a new trimmed down Session matching the options.
func NewSessionCtx(options *encode.SessionOptions, logger log.Logger) sessionctx.Context {
return NewSession(options, logger)
}

func newSession(options *encode.SessionOptions, logger log.Logger) *session {
s := &session{
func NewSession(options *encode.SessionOptions, logger log.Logger) *Session {
s := &Session{
values: make(map[fmt.Stringer]interface{}, 1),
}
sqlMode := options.SQLMode
Expand Down Expand Up @@ -301,68 +303,68 @@ func newSession(options *encode.SessionOptions, logger log.Logger) *session {
log.ShortError(err))
}
vars.TxnCtx = nil
s.vars = vars
s.Vars = vars
s.txn.kvPairs = &KvPairs{}

return s
}

func (se *session) takeKvPairs() *KvPairs {
memBuf := &se.txn.kvMemBuf
func (se *Session) TakeKvPairs() *KvPairs {
memBuf := &se.txn.MemBuf
pairs := memBuf.kvPairs
if pairs.bytesBuf != nil {
pairs.memBuf = memBuf
if pairs.BytesBuf != nil {
pairs.MemBuf = memBuf
}
memBuf.kvPairs = &KvPairs{pairs: make([]common.KvPair, 0, len(pairs.pairs))}
memBuf.kvPairs = &KvPairs{Pairs: make([]common.KvPair, 0, len(pairs.Pairs))}
memBuf.size = 0
return pairs
}

// Txn implements the sessionctx.Context interface
func (se *session) Txn(active bool) (kv.Transaction, error) {
func (se *Session) Txn(active bool) (kv.Transaction, error) {
return &se.txn, nil
}

// GetSessionVars implements the sessionctx.Context interface
func (se *session) GetSessionVars() *variable.SessionVars {
return se.vars
func (se *Session) GetSessionVars() *variable.SessionVars {
return se.Vars
}

// SetValue saves a value associated with this context for key.
func (se *session) SetValue(key fmt.Stringer, value interface{}) {
func (se *Session) SetValue(key fmt.Stringer, value interface{}) {
se.values[key] = value
}

// Value returns the value associated with this context for key.
func (se *session) Value(key fmt.Stringer) interface{} {
func (se *Session) Value(key fmt.Stringer) interface{} {
return se.values[key]
}

// StmtAddDirtyTableOP implements the sessionctx.Context interface
func (se *session) StmtAddDirtyTableOP(op int, physicalID int64, handle kv.Handle) {}
func (se *Session) StmtAddDirtyTableOP(op int, physicalID int64, handle kv.Handle) {}

// GetInfoSchema implements the sessionctx.Context interface.
func (se *session) GetInfoSchema() sessionctx.InfoschemaMetaVersion {
func (se *Session) GetInfoSchema() sessionctx.InfoschemaMetaVersion {
return nil
}

// GetBuiltinFunctionUsage returns the BuiltinFunctionUsage of current Context, which is not thread safe.
// Use primitive map type to prevent circular import. Should convert it to telemetry.BuiltinFunctionUsage before using.
func (se *session) GetBuiltinFunctionUsage() map[string]uint32 {
func (se *Session) GetBuiltinFunctionUsage() map[string]uint32 {
return make(map[string]uint32)
}

// BuiltinFunctionUsageInc implements the sessionctx.Context interface.
func (se *session) BuiltinFunctionUsageInc(scalarFuncSigName string) {
func (se *Session) BuiltinFunctionUsageInc(scalarFuncSigName string) {
}

// GetStmtStats implements the sessionctx.Context interface.
func (se *session) GetStmtStats() *stmtstats.StatementStats {
func (se *Session) GetStmtStats() *stmtstats.StatementStats {
return nil
}

func (se *session) Close() {
memBuf := &se.txn.kvMemBuf
func (se *Session) Close() {
memBuf := &se.txn.MemBuf
if memBuf.buf != nil {
memBuf.buf.destroy()
memBuf.buf = nil
Expand Down
Loading

0 comments on commit 976148d

Please sign in to comment.