Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: refactor for reuse part2 #42626

Merged
merged 21 commits into from
Mar 28, 2023
Merged
Prev Previous commit
Next Next commit
refactor
  • Loading branch information
D3Hunter committed Mar 28, 2023
commit 692d664aca7b645d6dc6f010cf1c571fd1ba3e43
18 changes: 10 additions & 8 deletions br/pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (*invalidIterator) Valid() bool {
func (*invalidIterator) Close() {
}

// BytesBuf bytes buffer.
type BytesBuf struct {
buf []byte
idx int
Expand Down Expand Up @@ -81,6 +82,7 @@ func (b *BytesBuf) destroy() {
}
}

// MemBuf used to store the data in memory.
type MemBuf struct {
sync.Mutex
kv.MemBuffer
Expand Down Expand Up @@ -161,7 +163,7 @@ func (mb *MemBuf) Staging() kv.StagingHandle {
return 0
}

// Cleanup cleanup the resources referenced by the StagingHandle.
// Cleanup the resources referenced by the StagingHandle.
// If the changes are not published by `Release`, they will be discarded.
func (mb *MemBuf) Cleanup(h kv.StagingHandle) {}

Expand All @@ -175,33 +177,33 @@ func (t *transaction) Len() int {
return t.GetMemBuffer().Len()
}

type UnionStore struct {
type kvUnionStore struct {
MemBuf
}

func (s *UnionStore) GetMemBuffer() kv.MemBuffer {
func (s *kvUnionStore) GetMemBuffer() kv.MemBuffer {
return &s.MemBuf
}

func (s *UnionStore) GetIndexName(tableID, indexID int64) string {
func (s *kvUnionStore) GetIndexName(tableID, indexID int64) string {
panic("Unsupported Operation")
}

func (s *UnionStore) CacheIndexName(tableID, indexID int64, name string) {
func (s *kvUnionStore) CacheIndexName(tableID, indexID int64, name string) {
}

func (s *UnionStore) CacheTableInfo(id int64, info *model.TableInfo) {
func (s *kvUnionStore) CacheTableInfo(id int64, info *model.TableInfo) {
}

// transaction is a trimmed down Transaction type which only supports adding a
// new KV pair.
type transaction struct {
kv.Transaction
UnionStore
kvUnionStore
}

func (t *transaction) GetMemBuffer() kv.MemBuffer {
return &t.UnionStore.MemBuf
return &t.kvUnionStore.MemBuf
}

func (t *transaction) Discard() {
Expand Down
67 changes: 28 additions & 39 deletions executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ const (

// LogicalImportMode represents the import mode is SQL-like.
LogicalImportMode = "logical" // tidb backend
physicalImportMode = "physical" // local backend
PhysicalImportMode = "physical" // local backend
unlimitedWriteSpeed = config.ByteSize(math.MaxInt64)
minDiskQuota = config.ByteSize(10 << 30) // 10GiB
minWriteSpeed = config.ByteSize(1 << 10) // 1KiB/s
Expand Down Expand Up @@ -134,12 +134,12 @@ type LoadDataController struct {
// how input field(or input column) from data file is mapped, either to a column or variable.
// if there's NO column list clause in load data statement, then it's table's columns
// else it's user defined list.
fieldMappings []*FieldMapping
// see InsertValues.insertColumns
FieldMappings []*FieldMapping
// see InsertValues.InsertColumns
// todo: our behavior is different with mysql. such as for table t(a,b)
// - "...(a,a) set a=100" is allowed in mysql, but not in tidb
// - "...(a,b) set b=100" will set b=100 in mysql, but in tidb the set is ignored.
insertColumns []*table.Column
InsertColumns []*table.Column

// used for DELIMITED DATA format
FieldNullDef []string
Expand All @@ -148,13 +148,13 @@ type LoadDataController struct {
IgnoreLines uint64

// import options
importMode string
ImportMode string
diskQuota config.ByteSize
checksum config.PostOpLevel
addIndex bool
analyze config.PostOpLevel
threadCnt int64
batchSize int64
BatchSize int64
maxWriteSpeed config.ByteSize // per second
splitFile bool
maxRecordedErrors int64 // -1 means record all error
Expand Down Expand Up @@ -296,13 +296,13 @@ func (e *LoadDataController) initDefaultOptions() {
threadCnt = int(math.Max(1, float64(threadCnt)*0.75))
}

e.importMode = LogicalImportMode
e.ImportMode = LogicalImportMode
_ = e.diskQuota.UnmarshalText([]byte("50GiB")) // todo confirm with pm
e.checksum = config.OpLevelRequired
e.addIndex = true
e.analyze = config.OpLevelOptional
e.threadCnt = int64(threadCnt)
e.batchSize = 1000
e.BatchSize = 1000
e.maxWriteSpeed = unlimitedWriteSpeed
e.splitFile = false
e.maxRecordedErrors = 100
Expand Down Expand Up @@ -338,17 +338,17 @@ func (e *LoadDataController) initOptions(seCtx sessionctx.Context, options []*pl
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
v = strings.ToLower(v)
if v != LogicalImportMode && v != physicalImportMode {
if v != LogicalImportMode && v != PhysicalImportMode {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
e.importMode = v
e.ImportMode = v
}

if e.importMode == LogicalImportMode {
if e.ImportMode == LogicalImportMode {
// some options are only allowed in physical mode
for _, opt := range specifiedOptions {
if _, ok := optionsForPhysicalImport[opt.Name]; ok {
return exeerrors.ErrLoadDataUnsupportedOption.FastGenByArgs(opt.Name, e.importMode)
return exeerrors.ErrLoadDataUnsupportedOption.FastGenByArgs(opt.Name, e.ImportMode)
}
}
}
Expand Down Expand Up @@ -398,8 +398,8 @@ func (e *LoadDataController) initOptions(seCtx sessionctx.Context, options []*pl
}
}
if opt, ok := specifiedOptions[batchSizeOption]; ok {
e.batchSize, isNull, err = opt.Value.EvalInt(seCtx, chunk.Row{})
if err != nil || isNull || e.batchSize < 0 {
e.BatchSize, isNull, err = opt.Value.EvalInt(seCtx, chunk.Row{})
if err != nil || isNull || e.BatchSize < 0 {
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
}
}
Expand Down Expand Up @@ -464,7 +464,7 @@ func (e *LoadDataController) initFieldMappings() []string {
fieldMapping := &FieldMapping{
Column: v,
}
e.fieldMappings = append(e.fieldMappings, fieldMapping)
e.FieldMappings = append(e.FieldMappings, fieldMapping)
columns = append(columns, v.Name.O)
}

Expand All @@ -485,7 +485,7 @@ func (e *LoadDataController) initFieldMappings() []string {
Column: column,
UserVar: v.UserVar,
}
e.fieldMappings = append(e.fieldMappings, fieldMapping)
e.FieldMappings = append(e.FieldMappings, fieldMapping)
}

return columns
Expand All @@ -512,11 +512,11 @@ func (e *LoadDataController) initLoadColumns(columnNames []string) error {
for _, col := range cols {
if !col.IsGenerated() {
// todo: should report error here, since in reorderColumns we report error if en(cols) != len(columnNames)
e.insertColumns = append(e.insertColumns, col)
e.InsertColumns = append(e.InsertColumns, col)
}
}

// e.insertColumns is appended according to the original tables' column sequence.
// e.InsertColumns is appended according to the original tables' column sequence.
// We have to reorder it to follow the use-specified column order which is shown in the columnNames.
if err = e.reorderColumns(columnNames); err != nil {
return err
Expand All @@ -531,10 +531,10 @@ func (e *LoadDataController) initLoadColumns(columnNames []string) error {
return nil
}

// reorderColumns reorder the e.insertColumns according to the order of columnNames
// Note: We must ensure there must be one-to-one mapping between e.insertColumns and columnNames in terms of column name.
// reorderColumns reorder the e.InsertColumns according to the order of columnNames
// Note: We must ensure there must be one-to-one mapping between e.InsertColumns and columnNames in terms of column name.
func (e *LoadDataController) reorderColumns(columnNames []string) error {
cols := e.insertColumns
cols := e.InsertColumns

if len(cols) != len(columnNames) {
return exeerrors.ErrColumnsNotMatched
Expand All @@ -556,30 +556,14 @@ func (e *LoadDataController) reorderColumns(columnNames []string) error {
reorderedColumns[idx] = col
}

e.insertColumns = reorderedColumns
e.InsertColumns = reorderedColumns

return nil
}

// GetInsertColumns get column list need to insert into target table.
// this list include all columns and in the same order as in fieldMappings and ColumnAssignments
func (e *LoadDataController) GetInsertColumns() []*table.Column {
return e.insertColumns
}

// GetFieldMapping get field mapping.
func (e *LoadDataController) GetFieldMapping() []*FieldMapping {
return e.fieldMappings
}

// GetFieldCount get field count.
func (e *LoadDataController) GetFieldCount() int {
return len(e.fieldMappings)
}

// GetBatchSize get batch size.
func (e *LoadDataController) GetBatchSize() int64 {
return e.batchSize
return len(e.FieldMappings)
}

// GenerateCSVConfig generates a CSV config for parser from LoadDataWorker.
Expand Down Expand Up @@ -765,6 +749,11 @@ func (e *LoadDataController) GetParser(ctx context.Context, dataFileInfo *LoadDa
return parser, nil
}

func (e *LoadDataController) Import(ctx context.Context) (int64, error) {
// todo: implement it
return 0, nil
}

// GetMsgFromBRError get msg from BR error.
// TODO: add GetMsg() to errors package to replace this function.
// see TestGetMsgFromBRError for more details.
Expand Down
8 changes: 4 additions & 4 deletions executor/importer/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ import (
func TestInitDefaultOptions(t *testing.T) {
e := LoadDataController{}
e.initDefaultOptions()
require.Equal(t, LogicalImportMode, e.importMode)
require.Equal(t, LogicalImportMode, e.ImportMode)
require.Equal(t, config.ByteSize(50<<30), e.diskQuota)
require.Equal(t, config.OpLevelRequired, e.checksum)
require.Equal(t, true, e.addIndex)
require.Equal(t, config.OpLevelOptional, e.analyze)
require.Equal(t, int64(runtime.NumCPU()), e.threadCnt)
require.Equal(t, int64(1000), e.batchSize)
require.Equal(t, int64(1000), e.BatchSize)
require.Equal(t, unlimitedWriteSpeed, e.maxWriteSpeed)
require.Equal(t, false, e.splitFile)
require.Equal(t, int64(100), e.maxRecordedErrors)
Expand Down Expand Up @@ -160,13 +160,13 @@ func TestInitOptions(t *testing.T) {
require.NoError(t, err, sql)
err = e.initOptions(ctx, convertOptions(stmt.(*ast.LoadDataStmt).Options))
require.NoError(t, err, sql)
require.Equal(t, physicalImportMode, e.importMode, sql)
require.Equal(t, PhysicalImportMode, e.ImportMode, sql)
require.Equal(t, config.ByteSize(100<<30), e.diskQuota, sql)
require.Equal(t, config.OpLevelOptional, e.checksum, sql)
require.False(t, e.addIndex, sql)
require.Equal(t, config.OpLevelRequired, e.analyze, sql)
require.Equal(t, int64(runtime.NumCPU()), e.threadCnt, sql)
require.Equal(t, int64(2000), e.batchSize, sql)
require.Equal(t, int64(2000), e.BatchSize, sql)
require.Equal(t, config.ByteSize(200<<20), e.maxWriteSpeed, sql)
require.True(t, e.splitFile, sql)
require.Equal(t, int64(123), e.maxRecordedErrors, sql)
Expand Down
11 changes: 7 additions & 4 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func NewLoadDataWorker(
GenExprs: plan.GenCols.Exprs,
isLoadData: true,
txnInUse: syncutil.Mutex{},
maxRowsInBatch: uint64(controller.GetBatchSize()),
maxRowsInBatch: uint64(controller.BatchSize),
}
restrictive := sctx.GetSessionVars().SQLMode.HasStrictMode() &&
plan.OnDuplicate != ast.OnDuplicateKeyHandlingIgnore
Expand Down Expand Up @@ -243,7 +243,7 @@ func NewLoadDataWorker(
}

func (e *LoadDataWorker) initInsertValues() error {
e.insertColumns = e.controller.GetInsertColumns()
e.insertColumns = e.controller.InsertColumns
e.rowLen = len(e.insertColumns)

for _, col := range e.insertColumns {
Expand All @@ -264,8 +264,11 @@ func (e *LoadDataWorker) loadRemote(ctx context.Context) (int64, error) {
return 0, err2
}

dataReaderInfos := e.controller.GetLoadDataReaderInfos()
if e.controller.ImportMode == importer.PhysicalImportMode {
return e.controller.Import(ctx)
}

dataReaderInfos := e.controller.GetLoadDataReaderInfos()
return e.Load(ctx, dataReaderInfos)
}

Expand Down Expand Up @@ -749,7 +752,7 @@ func (e *LoadDataWorker) parserData2TableData(
}
}

fieldMappings := e.controller.GetFieldMapping()
fieldMappings := e.controller.FieldMappings
for i := 0; i < len(fieldMappings); i++ {
if i >= len(parserData) {
if fieldMappings[i].Column == nil {
Expand Down