Skip to content

Commit

Permalink
load data statement, separate data preparing routine and commit routine
Browse files Browse the repository at this point in the history
offer two modes for load data, seq mode as before, default new separeted mode
  • Loading branch information
cfzjywxk committed Aug 5, 2019
1 parent 050172c commit 3898f7e
Show file tree
Hide file tree
Showing 10 changed files with 265 additions and 52 deletions.
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,8 +733,8 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
Ctx: b.ctx,
},
}

var defaultLoadDataBatchCnt uint64 = 20000 // TODO this will be changed to variable in another pr
loadDataExec.loadDataInfo.InitQueues()
loadDataExec.loadDataInfo.SetMaxRowsInBatch(defaultLoadDataBatchCnt)

return loadDataExec
Expand Down
3 changes: 2 additions & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,9 @@ func checkCases(tests []testCase, ld *executor.LoadDataInfo,
data, reachLimit, err1 := ld.InsertData(context.Background(), tt.data1, tt.data2)
c.Assert(err1, IsNil)
c.Assert(reachLimit, IsFalse)
err1 = ld.CheckAndInsertOneBatch()
err1 = ld.CheckAndInsertOneBatch(ld.GetRows(), ld.GetCurBatchCnt())
c.Assert(err1, IsNil)
ld.SetMaxRowsInBatch(20000)
if tt.restData == nil {
c.Assert(data, HasLen, 0,
Commentf("data1:%v, data2:%v, data:%v", string(tt.data1), string(tt.data2), string(data)))
Expand Down
151 changes: 141 additions & 10 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"context"
"fmt"
"runtime"
"strings"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -101,6 +102,12 @@ func (e *LoadDataExec) Open(ctx context.Context) error {
return nil
}

// CommitTask for feeding from data preparing routine into commit routine
type CommitTask struct {
cnt uint64
rows [][]types.Datum
}

// LoadDataInfo saves the information of loading data operation.
type LoadDataInfo struct {
*InsertValues
Expand All @@ -113,17 +120,142 @@ type LoadDataInfo struct {
IgnoreLines uint64
Ctx sessionctx.Context
rows [][]types.Datum

// these fields are used for pipeline data prepare and commit
commitTaskQueue chan CommitTask
QuitCommit chan struct{}
QuitProcess chan struct{}
}

// GetRows getter for rows
func (e *LoadDataInfo) GetRows() [][]types.Datum {
return e.rows
}

// GetCurBatchCnt getter for curBatchCnt
func (e *LoadDataInfo) GetCurBatchCnt() uint64 {
return e.curBatchCnt
}

// CloseTaskQueue preparing routine to inform commit routine no more data
func (e *LoadDataInfo) CloseTaskQueue() {
close(e.commitTaskQueue)
}

// InitQueues initialize task queue and error report queue
func (e *LoadDataInfo) InitQueues() {
e.commitTaskQueue = make(chan CommitTask, 1000)
e.QuitCommit = make(chan struct{})
e.QuitProcess = make(chan struct{})
}

// ForceQuitCommit let commit quit directly
func (e *LoadDataInfo) ForceQuitCommit() {
close(e.QuitCommit)
}

// ForceQuitProcess let process quit directly
func (e *LoadDataInfo) ForceQuitProcess() {
close(e.QuitProcess)
}

// MakeCommitTask produce commit task with data in LoadDataInfo.rows LoadDataInfo.curBatchCnt
func (e *LoadDataInfo) MakeCommitTask() CommitTask {
return CommitTask{e.curBatchCnt, e.rows}
}

// EnqOneTask feed one batch commit task to commit work
func (e *LoadDataInfo) EnqOneTask(ctx context.Context) error {
var err error
if e.curBatchCnt > 0 {
e.commitTaskQueue <- e.MakeCommitTask()
logutil.BgLogger().Debug("one task enqueued ", zap.Int("current queue len", len(e.commitTaskQueue)))
// reset rows buffer, will reallocate buffer but NOT reuse
e.SetMaxRowsInBatch(e.maxRowsInBatch)
}
return err
}

// CommitOneTask insert Data from LoadDataInfo.rows, then make commit and refresh txn
func (e *LoadDataInfo) CommitOneTask(ctx context.Context, task CommitTask, resetBuf bool) error {
var err error
err = e.CheckAndInsertOneBatch(task.rows, task.cnt)
if err != nil {
logutil.BgLogger().Error("commit error CheckAndInsert", zap.Error(err))
return err
}
if err = e.Ctx.StmtCommit(); err != nil {
logutil.BgLogger().Error("commit error commit", zap.Error(err))
return err
}
// Make sure that there are no retries when committing.
if err = e.Ctx.RefreshTxnCtx(ctx); err != nil {
logutil.BgLogger().Error("commit error refresh", zap.Error(err))
return err
}
// this only set in sequential mode, e.rows buffer will be reused in sequential mode
if resetBuf {
e.curBatchCnt = 0
}
return err
}

// CommitWork commit batch sequentially
func (e *LoadDataInfo) CommitWork(ctx context.Context) error {
var err error
defer func() {
r := recover()
if r != nil {
buf := make([]byte, 4096)
stackSize := runtime.Stack(buf, false)
buf = buf[:stackSize]
logutil.Logger(ctx).Error("CommitWork panicked", zap.String("stack", string(buf)))
}
if err != nil || r != nil {
e.ForceQuitProcess()
}
}()
var tasks uint64 = 0
var end = false
for !end {
select {
case commitTask, ok := <-e.commitTaskQueue:
if ok {
err = e.CommitOneTask(ctx, commitTask, false)
if err != nil {
break
}
tasks++
logutil.BgLogger().Debug("commit one task finished",
zap.Uint64("finished tasks count", tasks),
zap.Int("pending tasks count", len(e.commitTaskQueue)))
} else {
end = true
logutil.BgLogger().Info("commit work all finished",
zap.Uint64("total processed", tasks),
zap.Uint64("batch size", e.maxRowsInBatch))
break
}
case <-e.QuitCommit:
err = errors.New("commit forced to quit")
logutil.Logger(ctx).Error("commit forced to quit, possible preparation failed")
break
}
if err != nil {
break
}
}
return err
}

// SetMaxRowsInBatch sets the max number of rows to insert in a batch.
func (e *LoadDataInfo) SetMaxRowsInBatch(limit uint64) {
e.maxRowsInBatch = limit
if uint64(cap(e.rows)) < limit {
e.rows = make([][]types.Datum, 0, limit)
for i := 0; uint64(i) < limit; i++ {
e.rows = append(e.rows, make([]types.Datum, len(e.Table.Cols())))
}
e.rows = make([][]types.Datum, 0, limit)
for i := 0; uint64(i) < limit; i++ {
e.rows = append(e.rows, make([]types.Datum, len(e.Table.Cols())))
}
e.curBatchCnt = 0
}

// getValidData returns prevData and curData that starts from starting symbol.
Expand Down Expand Up @@ -278,17 +410,16 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte)
}

// CheckAndInsertOneBatch is used to commit one transaction batch full filled data
func (e *LoadDataInfo) CheckAndInsertOneBatch() error {
func (e *LoadDataInfo) CheckAndInsertOneBatch(rows [][]types.Datum, cnt uint64) error {
var err error
if e.curBatchCnt == 0 {
if cnt == 0 {
return err
}
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(e.curBatchCnt)
err = e.batchCheckAndInsert(e.rows[0:e.curBatchCnt], e.addRecordLD)
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(cnt)
err = e.batchCheckAndInsert(rows[0:cnt], e.addRecordLD)
if err != nil {
return err
}
e.curBatchCnt = 0
return err
}

Expand Down
6 changes: 4 additions & 2 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1845,8 +1845,9 @@ func (s *testSuite4) TestLoadData(c *C) {
_, reachLimit, err := ld.InsertData(context.Background(), nil, nil)
c.Assert(err, IsNil)
c.Assert(reachLimit, IsFalse)
err = ld.CheckAndInsertOneBatch()
err = ld.CheckAndInsertOneBatch(ld.GetRows(), ld.GetCurBatchCnt())
c.Assert(err, IsNil)
ld.SetMaxRowsInBatch(20000)
r := tk.MustQuery(selectSQL)
r.Check(nil)

Expand Down Expand Up @@ -2096,8 +2097,9 @@ func (s *testSuite4) TestLoadDataIntoPartitionedTable(c *C) {

_, _, err := ld.InsertData(context.Background(), nil, []byte("1,2\n3,4\n5,6\n7,8\n9,10\n"))
c.Assert(err, IsNil)
err = ld.CheckAndInsertOneBatch()
err = ld.CheckAndInsertOneBatch(ld.GetRows(), ld.GetCurBatchCnt())
c.Assert(err, IsNil)
ld.SetMaxRowsInBatch(20000)
ld.SetMessage()
err = ctx.StmtCommit()
c.Assert(err, IsNil)
Expand Down
Loading

0 comments on commit 3898f7e

Please sign in to comment.