Skip to content

Commit

Permalink
lightning: add lint rule & fix existing lightning lint err (#42733)
Browse files Browse the repository at this point in the history
ref #40499
  • Loading branch information
D3Hunter authored Apr 3, 2023
1 parent 71fa840 commit 9cf69a9
Show file tree
Hide file tree
Showing 57 changed files with 635 additions and 181 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ errdoc:tools/bin/errdoc-gen
lint:tools/bin/revive
@echo "linting"
@tools/bin/revive -formatter friendly -config tools/check/revive.toml $(FILES_TIDB_TESTS)
@tools/bin/revive -formatter friendly -config tools/check/revive.toml ./br/pkg/lightning/...

license:
bazel $(BAZEL_GLOBAL_CONFIG) run $(BAZEL_CMD_CONFIG) \
Expand Down
19 changes: 19 additions & 0 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func makeLogger(logger log.Logger, tag string, engineUUID uuid.UUID) log.Logger
)
}

// MakeUUID generates a UUID for the engine and a tag for the engine.
func MakeUUID(tableName string, engineID int32) (string, uuid.UUID) {
tag := makeTag(tableName, engineID)
engineUUID := uuid.NewSHA1(engineNamespace, []byte(tag))
Expand All @@ -84,6 +85,7 @@ func MakeUUID(tableName string, engineID int32) (string, uuid.UUID) {

var engineNamespace = uuid.MustParse("d68d6abe-c59e-45d6-ade8-e2b0ceb7bedf")

// EngineFileSize represents the size of an engine on disk and in memory.
type EngineFileSize struct {
// UUID is the engine's UUID.
UUID uuid.UUID
Expand Down Expand Up @@ -246,27 +248,33 @@ type ClosedEngine struct {
engine
}

// LocalEngineWriter is a thread-local writer for writing rows into a single engine.
type LocalEngineWriter struct {
writer EngineWriter
tableName string
}

// MakeBackend creates a new Backend from an AbstractBackend.
func MakeBackend(ab AbstractBackend) Backend {
return Backend{abstract: ab}
}

// Close the connection to the backend.
func (be Backend) Close() {
be.abstract.Close()
}

// ShouldPostProcess returns whether KV-specific post-processing should be
func (be Backend) ShouldPostProcess() bool {
return be.abstract.ShouldPostProcess()
}

// FlushAll flushes all opened engines.
func (be Backend) FlushAll(ctx context.Context) error {
return be.abstract.FlushAllEngines(ctx)
}

// TotalMemoryConsume returns the total memory consumed by the backend.
func (be Backend) TotalMemoryConsume() int64 {
return be.abstract.TotalMemoryConsume()
}
Expand Down Expand Up @@ -362,14 +370,17 @@ func (be Backend) OpenEngine(ctx context.Context, config *EngineConfig, tableNam
}, nil
}

// CollectLocalDuplicateRows collects duplicate rows from the local backend.
func (be Backend) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (bool, error) {
return be.abstract.CollectLocalDuplicateRows(ctx, tbl, tableName, opts)
}

// CollectRemoteDuplicateRows collects duplicate rows from the remote backend.
func (be Backend) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (bool, error) {
return be.abstract.CollectRemoteDuplicateRows(ctx, tbl, tableName, opts)
}

// ResolveDuplicateRows resolves duplicate rows from the backend.
func (be Backend) ResolveDuplicateRows(ctx context.Context, tbl table.Table, tableName string, algorithm config.DuplicateResolutionAlgorithm) error {
return be.abstract.ResolveDuplicateRows(ctx, tbl, tableName, algorithm)
}
Expand All @@ -390,6 +401,7 @@ func (engine *OpenedEngine) Flush(ctx context.Context) error {
return engine.backend.FlushEngine(ctx, engine.uuid)
}

// LocalWriter returns a writer that writes to the local backend.
func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterConfig) (*LocalEngineWriter, error) {
w, err := engine.backend.LocalWriter(ctx, cfg, engine.uuid)
if err != nil {
Expand All @@ -398,6 +410,7 @@ func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterCon
return &LocalEngineWriter{writer: w, tableName: engine.tableName}, nil
}

// TotalMemoryConsume returns the total memory consumed by the engine.
func (engine *OpenedEngine) TotalMemoryConsume() int64 {
return engine.engine.backend.TotalMemoryConsume()
}
Expand All @@ -407,10 +420,12 @@ func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string,
return w.writer.AppendRows(ctx, w.tableName, columnNames, rows)
}

// Close closes the engine and returns the status of the engine.
func (w *LocalEngineWriter) Close(ctx context.Context) (ChunkFlushStatus, error) {
return w.writer.Close(ctx)
}

// IsSynced returns whether the engine is synced.
func (w *LocalEngineWriter) IsSynced() bool {
return w.writer.IsSynced()
}
Expand Down Expand Up @@ -480,14 +495,17 @@ func (engine *ClosedEngine) Cleanup(ctx context.Context) error {
return err
}

// Logger returns the logger for the engine.
func (engine *ClosedEngine) Logger() log.Logger {
return engine.logger
}

// ChunkFlushStatus is the status of a chunk flush.
type ChunkFlushStatus interface {
Flushed() bool
}

// EngineWriter is the interface for writing data to an engine.
type EngineWriter interface {
AppendRows(
ctx context.Context,
Expand All @@ -499,6 +517,7 @@ type EngineWriter interface {
Close(ctx context.Context) (ChunkFlushStatus, error)
}

// GetEngineUUID returns the engine UUID.
func (engine *OpenedEngine) GetEngineUUID() uuid.UUID {
return engine.uuid
}
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/encode/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/types"
)

// EncodingConfig is the configuration for the encoding backend.
type EncodingConfig struct {
SessionOptions
Path string // path of data file
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/kv/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (e *BaseKVEncoder) GetOrCreateRecord() []types.Datum {
}

// Record2KV converts a row into a KV pair.
func (e *BaseKVEncoder) Record2KV(record, originalRow []types.Datum, rowID int64) (*KvPairs, error) {
func (e *BaseKVEncoder) Record2KV(record, originalRow []types.Datum, rowID int64) (*Pairs, error) {
_, err := e.Table.AddRecord(e.SessionCtx, record)
if err != nil {
e.logger.Error("kv encode failed",
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/lightning/backend/kv/kv2sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/types"
)

// TableKVDecoder is a KVDecoder that decodes the key-value pairs of a table.
type TableKVDecoder struct {
tbl table.Table
se *Session
Expand All @@ -35,14 +36,17 @@ type TableKVDecoder struct {
genCols []GeneratedCol
}

// Name implements KVDecoder.Name.
func (t *TableKVDecoder) Name() string {
return t.tableName
}

// DecodeHandleFromRowKey implements KVDecoder.DecodeHandleFromRowKey.
func (t *TableKVDecoder) DecodeHandleFromRowKey(key []byte) (kv.Handle, error) {
return tablecodec.DecodeRowKey(key)
}

// DecodeHandleFromIndex implements KVDecoder.DecodeHandleFromIndex.
func (t *TableKVDecoder) DecodeHandleFromIndex(indexInfo *model.IndexInfo, key []byte, value []byte) (kv.Handle, error) {
cols := tables.BuildRowcodecColInfoForIndexColumns(indexInfo, t.tbl.Meta())
return tablecodec.DecodeIndexHandle(key, value, len(cols))
Expand All @@ -53,6 +57,7 @@ func (t *TableKVDecoder) DecodeRawRowData(h kv.Handle, value []byte) ([]types.Da
return tables.DecodeRawRowData(t.se, t.tbl.Meta(), h, t.tbl.Cols(), value)
}

// DecodeRawRowDataAsStr decodes raw row data into a string.
func (t *TableKVDecoder) DecodeRawRowDataAsStr(h kv.Handle, value []byte) (res string) {
row, _, err := t.DecodeRawRowData(h, value)
if err == nil {
Expand Down Expand Up @@ -109,6 +114,7 @@ func (t *TableKVDecoder) IterRawIndexKeys(h kv.Handle, rawRow []byte, fn func([]
return nil
}

// NewTableKVDecoder creates a new TableKVDecoder.
func NewTableKVDecoder(
tbl table.Table,
tableName string,
Expand Down
24 changes: 20 additions & 4 deletions br/pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ type MemBuf struct {
kv.MemBuffer
buf *BytesBuf
availableBufs []*BytesBuf
kvPairs *KvPairs
kvPairs *Pairs
size int
}

// Recycle recycles the byte buffer.
func (mb *MemBuf) Recycle(buf *BytesBuf) {
buf.idx = 0
buf.cap = len(buf.buf)
Expand All @@ -106,6 +107,7 @@ func (mb *MemBuf) Recycle(buf *BytesBuf) {
mb.Unlock()
}

// AllocateBuf allocates a byte buffer.
func (mb *MemBuf) AllocateBuf(size int) {
mb.Lock()
size = mathutil.Max(units.MiB, int(utils.NextPowerOfTwo(int64(size)))*2)
Expand All @@ -130,6 +132,7 @@ func (mb *MemBuf) AllocateBuf(size int) {
mb.Unlock()
}

// Set sets the key-value pair.
func (mb *MemBuf) Set(k kv.Key, v []byte) error {
kvPairs := mb.kvPairs
size := len(k) + len(v)
Expand All @@ -147,10 +150,12 @@ func (mb *MemBuf) Set(k kv.Key, v []byte) error {
return nil
}

// SetWithFlags implements the kv.MemBuffer interface.
func (mb *MemBuf) SetWithFlags(k kv.Key, v []byte, ops ...kv.FlagsOp) error {
return mb.Set(k, v)
}

// Delete implements the kv.MemBuffer interface.
func (mb *MemBuf) Delete(k kv.Key) error {
return errors.New("unsupported operation")
}
Expand All @@ -159,6 +164,7 @@ func (mb *MemBuf) Delete(k kv.Key) error {
func (mb *MemBuf) Release(h kv.StagingHandle) {
}

// Staging creates a new staging buffer.
func (mb *MemBuf) Staging() kv.StagingHandle {
return 0
}
Expand All @@ -181,17 +187,21 @@ type kvUnionStore struct {
MemBuf
}

// GetMemBuffer implements the kv.UnionStore interface.
func (s *kvUnionStore) GetMemBuffer() kv.MemBuffer {
return &s.MemBuf
}

// GetIndexName implements the kv.UnionStore interface.
func (s *kvUnionStore) GetIndexName(tableID, indexID int64) string {
panic("Unsupported Operation")
}

// CacheIndexName implements the kv.UnionStore interface.
func (s *kvUnionStore) CacheIndexName(tableID, indexID int64, name string) {
}

// CacheTableInfo implements the kv.UnionStore interface.
func (s *kvUnionStore) CacheTableInfo(id int64, info *model.TableInfo) {
}

Expand All @@ -202,14 +212,17 @@ type transaction struct {
kvUnionStore
}

// GetMemBuffer implements the kv.Transaction interface.
func (t *transaction) GetMemBuffer() kv.MemBuffer {
return &t.kvUnionStore.MemBuf
}

// Discard implements the kv.Transaction interface.
func (t *transaction) Discard() {
// do nothing
}

// Flush implements the kv.Transaction interface.
func (t *transaction) Flush() (int, error) {
// do nothing
return 0, nil
Expand Down Expand Up @@ -263,6 +276,7 @@ func NewSessionCtx(options *encode.SessionOptions, logger log.Logger) sessionctx
return NewSession(options, logger)
}

// NewSession creates a new trimmed down Session matching the options.
func NewSession(options *encode.SessionOptions, logger log.Logger) *Session {
s := &Session{
values: make(map[fmt.Stringer]interface{}, 1),
Expand Down Expand Up @@ -304,18 +318,19 @@ func NewSession(options *encode.SessionOptions, logger log.Logger) *Session {
}
vars.TxnCtx = nil
s.Vars = vars
s.txn.kvPairs = &KvPairs{}
s.txn.kvPairs = &Pairs{}

return s
}

func (se *Session) TakeKvPairs() *KvPairs {
// TakeKvPairs returns the current Pairs and resets the buffer.
func (se *Session) TakeKvPairs() *Pairs {
memBuf := &se.txn.MemBuf
pairs := memBuf.kvPairs
if pairs.BytesBuf != nil {
pairs.MemBuf = memBuf
}
memBuf.kvPairs = &KvPairs{Pairs: make([]common.KvPair, 0, len(pairs.Pairs))}
memBuf.kvPairs = &Pairs{Pairs: make([]common.KvPair, 0, len(pairs.Pairs))}
memBuf.size = 0
return pairs
}
Expand Down Expand Up @@ -363,6 +378,7 @@ func (se *Session) GetStmtStats() *stmtstats.StatementStats {
return nil
}

// Close implements the sessionctx.Context interface
func (se *Session) Close() {
memBuf := &se.txn.MemBuf
if memBuf.buf != nil {
Expand Down
Loading

0 comments on commit 9cf69a9

Please sign in to comment.