diff --git a/executor/builder.go b/executor/builder.go index ffa4a8318e4de..915833478e198 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -733,8 +733,8 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor { Ctx: b.ctx, }, } - var defaultLoadDataBatchCnt uint64 = 20000 // TODO this will be changed to variable in another pr + loadDataExec.loadDataInfo.InitQueues() loadDataExec.loadDataInfo.SetMaxRowsInBatch(defaultLoadDataBatchCnt) return loadDataExec diff --git a/executor/executor_test.go b/executor/executor_test.go index fba261cb887b3..793bd6d99e113 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -500,8 +500,9 @@ func checkCases(tests []testCase, ld *executor.LoadDataInfo, data, reachLimit, err1 := ld.InsertData(context.Background(), tt.data1, tt.data2) c.Assert(err1, IsNil) c.Assert(reachLimit, IsFalse) - err1 = ld.CheckAndInsertOneBatch() + err1 = ld.CheckAndInsertOneBatch(ld.GetRows(), ld.GetCurBatchCnt()) c.Assert(err1, IsNil) + ld.SetMaxRowsInBatch(20000) if tt.restData == nil { c.Assert(data, HasLen, 0, Commentf("data1:%v, data2:%v, data:%v", string(tt.data1), string(tt.data2), string(data))) diff --git a/executor/load_data.go b/executor/load_data.go index 05cd1fe8d11d1..a04db4a0ce89d 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -17,6 +17,7 @@ import ( "bytes" "context" "fmt" + "runtime" "strings" "github.com/pingcap/errors" @@ -101,6 +102,12 @@ func (e *LoadDataExec) Open(ctx context.Context) error { return nil } +// CommitTask for feeding from data preparing routine into commit routine +type CommitTask struct { + cnt uint64 + rows [][]types.Datum +} + // LoadDataInfo saves the information of loading data operation. type LoadDataInfo struct { *InsertValues @@ -113,17 +120,142 @@ type LoadDataInfo struct { IgnoreLines uint64 Ctx sessionctx.Context rows [][]types.Datum + + // these fields are used for pipeline data prepare and commit + commitTaskQueue chan CommitTask + QuitCommit chan struct{} + QuitProcess chan struct{} +} + +// GetRows getter for rows +func (e *LoadDataInfo) GetRows() [][]types.Datum { + return e.rows +} + +// GetCurBatchCnt getter for curBatchCnt +func (e *LoadDataInfo) GetCurBatchCnt() uint64 { + return e.curBatchCnt +} + +// CloseTaskQueue preparing routine to inform commit routine no more data +func (e *LoadDataInfo) CloseTaskQueue() { + close(e.commitTaskQueue) +} + +// InitQueues initialize task queue and error report queue +func (e *LoadDataInfo) InitQueues() { + e.commitTaskQueue = make(chan CommitTask, 1000) + e.QuitCommit = make(chan struct{}) + e.QuitProcess = make(chan struct{}) +} + +// ForceQuitCommit let commit quit directly +func (e *LoadDataInfo) ForceQuitCommit() { + close(e.QuitCommit) +} + +// ForceQuitProcess let process quit directly +func (e *LoadDataInfo) ForceQuitProcess() { + close(e.QuitProcess) +} + +// MakeCommitTask produce commit task with data in LoadDataInfo.rows LoadDataInfo.curBatchCnt +func (e *LoadDataInfo) MakeCommitTask() CommitTask { + return CommitTask{e.curBatchCnt, e.rows} +} + +// EnqOneTask feed one batch commit task to commit work +func (e *LoadDataInfo) EnqOneTask(ctx context.Context) error { + var err error + if e.curBatchCnt > 0 { + e.commitTaskQueue <- e.MakeCommitTask() + logutil.BgLogger().Debug("one task enqueued ", zap.Int("current queue len", len(e.commitTaskQueue))) + // reset rows buffer, will reallocate buffer but NOT reuse + e.SetMaxRowsInBatch(e.maxRowsInBatch) + } + return err +} + +// CommitOneTask insert Data from LoadDataInfo.rows, then make commit and refresh txn +func (e *LoadDataInfo) CommitOneTask(ctx context.Context, task CommitTask, resetBuf bool) error { + var err error + err = e.CheckAndInsertOneBatch(task.rows, task.cnt) + if err != nil { + logutil.BgLogger().Error("commit error CheckAndInsert", zap.Error(err)) + return err + } + if err = e.Ctx.StmtCommit(); err != nil { + logutil.BgLogger().Error("commit error commit", zap.Error(err)) + return err + } + // Make sure that there are no retries when committing. + if err = e.Ctx.RefreshTxnCtx(ctx); err != nil { + logutil.BgLogger().Error("commit error refresh", zap.Error(err)) + return err + } + // this only set in sequential mode, e.rows buffer will be reused in sequential mode + if resetBuf { + e.curBatchCnt = 0 + } + return err +} + +// CommitWork commit batch sequentially +func (e *LoadDataInfo) CommitWork(ctx context.Context) error { + var err error + defer func() { + r := recover() + if r != nil { + buf := make([]byte, 4096) + stackSize := runtime.Stack(buf, false) + buf = buf[:stackSize] + logutil.Logger(ctx).Error("CommitWork panicked", zap.String("stack", string(buf))) + } + if err != nil || r != nil { + e.ForceQuitProcess() + } + }() + var tasks uint64 = 0 + var end = false + for !end { + select { + case commitTask, ok := <-e.commitTaskQueue: + if ok { + err = e.CommitOneTask(ctx, commitTask, false) + if err != nil { + break + } + tasks++ + logutil.BgLogger().Debug("commit one task finished", + zap.Uint64("finished tasks count", tasks), + zap.Int("pending tasks count", len(e.commitTaskQueue))) + } else { + end = true + logutil.BgLogger().Info("commit work all finished", + zap.Uint64("total processed", tasks), + zap.Uint64("batch size", e.maxRowsInBatch)) + break + } + case <-e.QuitCommit: + err = errors.New("commit forced to quit") + logutil.Logger(ctx).Error("commit forced to quit, possible preparation failed") + break + } + if err != nil { + break + } + } + return err } // SetMaxRowsInBatch sets the max number of rows to insert in a batch. func (e *LoadDataInfo) SetMaxRowsInBatch(limit uint64) { e.maxRowsInBatch = limit - if uint64(cap(e.rows)) < limit { - e.rows = make([][]types.Datum, 0, limit) - for i := 0; uint64(i) < limit; i++ { - e.rows = append(e.rows, make([]types.Datum, len(e.Table.Cols()))) - } + e.rows = make([][]types.Datum, 0, limit) + for i := 0; uint64(i) < limit; i++ { + e.rows = append(e.rows, make([]types.Datum, len(e.Table.Cols()))) } + e.curBatchCnt = 0 } // getValidData returns prevData and curData that starts from starting symbol. @@ -278,17 +410,16 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte) } // CheckAndInsertOneBatch is used to commit one transaction batch full filled data -func (e *LoadDataInfo) CheckAndInsertOneBatch() error { +func (e *LoadDataInfo) CheckAndInsertOneBatch(rows [][]types.Datum, cnt uint64) error { var err error - if e.curBatchCnt == 0 { + if cnt == 0 { return err } - e.ctx.GetSessionVars().StmtCtx.AddRecordRows(e.curBatchCnt) - err = e.batchCheckAndInsert(e.rows[0:e.curBatchCnt], e.addRecordLD) + e.ctx.GetSessionVars().StmtCtx.AddRecordRows(cnt) + err = e.batchCheckAndInsert(rows[0:cnt], e.addRecordLD) if err != nil { return err } - e.curBatchCnt = 0 return err } diff --git a/executor/write_test.go b/executor/write_test.go index 14e86b09a3c05..e341f40ddf0c6 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -1845,8 +1845,9 @@ func (s *testSuite4) TestLoadData(c *C) { _, reachLimit, err := ld.InsertData(context.Background(), nil, nil) c.Assert(err, IsNil) c.Assert(reachLimit, IsFalse) - err = ld.CheckAndInsertOneBatch() + err = ld.CheckAndInsertOneBatch(ld.GetRows(), ld.GetCurBatchCnt()) c.Assert(err, IsNil) + ld.SetMaxRowsInBatch(20000) r := tk.MustQuery(selectSQL) r.Check(nil) @@ -2096,8 +2097,9 @@ func (s *testSuite4) TestLoadDataIntoPartitionedTable(c *C) { _, _, err := ld.InsertData(context.Background(), nil, []byte("1,2\n3,4\n5,6\n7,8\n9,10\n")) c.Assert(err, IsNil) - err = ld.CheckAndInsertOneBatch() + err = ld.CheckAndInsertOneBatch(ld.GetRows(), ld.GetCurBatchCnt()) c.Assert(err, IsNil) + ld.SetMaxRowsInBatch(20000) ld.SetMessage() err = ctx.StmtCommit() c.Assert(err, IsNil) diff --git a/server/conn.go b/server/conn.go index 6b3767d3156a9..253ca406894fc 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1042,9 +1042,8 @@ func (cc *clientConn) writeReq(filePath string) error { return cc.flush() } -var defaultLoadDataBatchCnt uint64 = 20000 - -func insertDataWithCommit(ctx context.Context, prevData, curData []byte, loadDataInfo *executor.LoadDataInfo) ([]byte, error) { +func insertDataWithCommit(ctx context.Context, prevData, + curData []byte, loadDataInfo *executor.LoadDataInfo, enqTask bool) ([]byte, error) { var err error var reachLimit bool for { @@ -1055,16 +1054,15 @@ func insertDataWithCommit(ctx context.Context, prevData, curData []byte, loadDat if !reachLimit { break } - err := loadDataInfo.CheckAndInsertOneBatch() - if err != nil { - return nil, err - } - if err = loadDataInfo.Ctx.StmtCommit(); err != nil { - return nil, err + + if enqTask { + // push into commit task queue + err = loadDataInfo.EnqOneTask(ctx) + } else { + err = loadDataInfo.CommitOneTask(ctx, loadDataInfo.MakeCommitTask(), true) } - // Make sure that there are no retries when committing. - if err = loadDataInfo.Ctx.RefreshTxnCtx(ctx); err != nil { - return nil, err + if err != nil { + return prevData, err } curData = prevData prevData = nil @@ -1072,30 +1070,59 @@ func insertDataWithCommit(ctx context.Context, prevData, curData []byte, loadDat return prevData, nil } -// handleLoadData does the additional work after processing the 'load data' query. -// It sends client a file path, then reads the file content from client, inserts data into database. -func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor.LoadDataInfo) error { - // If the server handles the load data request, the client has to set the ClientLocalFiles capability. - if cc.capability&mysql.ClientLocalFiles == 0 { - return errNotAllowedCommand - } - if loadDataInfo == nil { - return errors.New("load data info is empty") +// seqLoadData process batch commit same routine +func seqLoadData(ctx context.Context, cc *clientConn, loadDataInfo *executor.LoadDataInfo) error { + var err error + var shouldBreak bool + var prevData, curData []byte + for { + curData, err = cc.readPacket() + if err != nil { + if terror.ErrorNotEqual(err, io.EOF) { + logutil.Logger(ctx).Error("read packet failed", zap.Error(err)) + break + } + } + if len(curData) == 0 { + shouldBreak = true + if len(prevData) == 0 { + break + } + } + prevData, err = insertDataWithCommit(ctx, prevData, curData, loadDataInfo, false) + if err != nil { + break + } + if shouldBreak { + break + } } - - err := cc.writeReq(loadDataInfo.Path) if err != nil { - return err + loadDataInfo.Ctx.StmtRollback() + } else { + err = loadDataInfo.CommitOneTask(ctx, loadDataInfo.MakeCommitTask(), true) } + return err +} +// processStream process input stream from network +func processStream(ctx context.Context, cc *clientConn, loadDataInfo *executor.LoadDataInfo) { + var err error var shouldBreak bool var prevData, curData []byte - // TODO: Make the loadDataRowCnt settable. - loadDataInfo.SetMaxRowsInBatch(defaultLoadDataBatchCnt) - err = loadDataInfo.Ctx.NewTxn(ctx) - if err != nil { - return err - } + defer func() { + r := recover() + if r != nil { + buf := make([]byte, 4096) + stackSize := runtime.Stack(buf, false) + buf = buf[:stackSize] + logutil.Logger(ctx).Error("process routine panicked", zap.String("stack", string(buf))) + } + if err != nil || r != nil { + loadDataInfo.ForceQuitCommit() + } + loadDataInfo.CloseTaskQueue() + }() for { curData, err = cc.readPacket() if err != nil { @@ -1110,7 +1137,13 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor break } } - prevData, err = insertDataWithCommit(ctx, prevData, curData, loadDataInfo) + // prepare batch and enqueue task + prevData, err = insertDataWithCommit(ctx, prevData, curData, loadDataInfo, true) + select { + case <-loadDataInfo.QuitProcess: + return + default: + } if err != nil { break } @@ -1118,16 +1151,50 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor break } } - loadDataInfo.SetMessage() - if err != nil { - loadDataInfo.Ctx.StmtRollback() + logutil.Logger(ctx).Error("load data process error", zap.Error(err)) } else { - err = loadDataInfo.CheckAndInsertOneBatch() - if err == nil { - err = loadDataInfo.Ctx.StmtCommit() + err = loadDataInfo.EnqOneTask(ctx) + if err != nil { + logutil.Logger(ctx).Error("load data process error", zap.Error(err)) } } +} + +// TODO: Make the loadDataRowCnt settable. +var defaultLoadDataBatchCnt uint64 = 20000 + +// handleLoadData does the additional work after processing the 'load data' query. +// It sends client a file path, then reads the file content from client, inserts data into database. +func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor.LoadDataInfo) error { + // If the server handles the load data request, the client has to set the ClientLocalFiles capability. + if cc.capability&mysql.ClientLocalFiles == 0 { + return errNotAllowedCommand + } + if loadDataInfo == nil { + return errors.New("load data info is empty") + } + + err := cc.writeReq(loadDataInfo.Path) + if err != nil { + return err + } + + loadDataInfo.InitQueues() + loadDataInfo.SetMaxRowsInBatch(defaultLoadDataBatchCnt) + err = loadDataInfo.Ctx.NewTxn(ctx) + if err != nil { + return err + } + seqProcess := loadDataInfo.Ctx.GetSessionVars().LoadDataSeqProcess + if !seqProcess { + // processStream process input data, enqueue commit task + go processStream(ctx, cc, loadDataInfo) + err = loadDataInfo.CommitWork(ctx) + } else { + err = seqLoadData(ctx, cc, loadDataInfo) + } + loadDataInfo.SetMessage() var txn kv.Transaction var err1 error diff --git a/session/session.go b/session/session.go index 84db86e0e6b72..010b4f9d9d7a2 100644 --- a/session/session.go +++ b/session/session.go @@ -1745,6 +1745,7 @@ var builtinGlobalVariable = []string{ variable.TiDBExpensiveQueryTimeThreshold, variable.TiDBEnableNoopFuncs, variable.TiDBEnableIndexMerge, + variable.TiDBLoadDataSeqProcess, } var ( diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 715cede8bc46c..653850d084177 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -399,6 +399,9 @@ type SessionVars struct { // use noop funcs or not EnableNoopFuncs bool + + // load data seqeuntially process or not + LoadDataSeqProcess bool } // ConnectionInfo present connection used by audit. @@ -453,6 +456,7 @@ func NewSessionVars() *SessionVars { WaitSplitRegionTimeout: DefWaitSplitRegionTimeout, EnableIndexMerge: false, EnableNoopFuncs: DefTiDBEnableNoopFuncs, + LoadDataSeqProcess: DefTiDBLoadDataSeqProcess, } vars.Concurrency = Concurrency{ IndexLookupConcurrency: DefIndexLookupConcurrency, @@ -831,6 +835,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.EnableIndexMerge = TiDBOptOn(val) case TiDBEnableNoopFuncs: s.EnableNoopFuncs = TiDBOptOn(val) + case TiDBLoadDataSeqProcess: + s.LoadDataSeqProcess = TiDBOptOn(val) } s.systems[name] = val return nil diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index f42c04bf3c99e..b941da17e2760 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -704,6 +704,7 @@ var defaultSysVars = []*SysVar{ {ScopeSession, TiDBLowResolutionTSO, "0"}, {ScopeSession, TiDBExpensiveQueryTimeThreshold, strconv.Itoa(DefTiDBExpensiveQueryTimeThreshold)}, {ScopeGlobal | ScopeSession, TiDBEnableNoopFuncs, BoolToIntStr(DefTiDBEnableNoopFuncs)}, + {ScopeGlobal | ScopeSession, TiDBLoadDataSeqProcess, BoolToIntStr(DefTiDBLoadDataSeqProcess)}, } // SynonymsSysVariables is synonyms of system variables. diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index f2b7998ab0341..ae862a02ba2e9 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -286,6 +286,9 @@ const ( // TiDBEnableNoopFuncs set true will enable using fake funcs(like get_lock release_lock) TiDBEnableNoopFuncs = "tidb_enable_noop_functions" + + // TiDBLoadDataSeqProcess set true will make load data process and commit sequentially in one routine + TiDBLoadDataSeqProcess = "tidb_load_data_seq_process" ) // Default TiDB system variable values. @@ -350,6 +353,7 @@ const ( DefTiDBWaitSplitRegionFinish = true DefWaitSplitRegionTimeout = 300 // 300s DefTiDBEnableNoopFuncs = false + DefTiDBLoadDataSeqProcess = false ) // Process global variables. diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index 47a60f1a7b7ed..82288f9128753 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -379,7 +379,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, TiDBBatchInsert, TiDBDisableTxnAutoRetry, TiDBEnableStreaming, TiDBBatchDelete, TiDBBatchCommit, TiDBEnableCascadesPlanner, TiDBEnableWindowFunction, TiDBCheckMb4ValueInUTF8, TiDBLowResolutionTSO, TiDBEnableIndexMerge, TiDBEnableNoopFuncs, - TiDBScatterRegion, TiDBGeneralLog, TiDBConstraintCheckInPlace: + TiDBScatterRegion, TiDBGeneralLog, TiDBConstraintCheckInPlace, TiDBLoadDataSeqProcess: fallthrough case GeneralLog, AvoidTemporalUpgrade, BigTables, CheckProxyUsers, LogBin, CoreFile, EndMakersInJSON, SQLLogBin, OfflineMode, PseudoSlaveMode, LowPriorityUpdates,