Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: load data statement, separate data preparing routine and commit routine #11533

Merged
merged 9 commits into from
Sep 16, 2019
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,8 +734,8 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
Ctx: b.ctx,
},
}

var defaultLoadDataBatchCnt uint64 = 20000 // TODO this will be changed to variable in another pr
loadDataExec.loadDataInfo.InitQueues()
loadDataExec.loadDataInfo.SetMaxRowsInBatch(defaultLoadDataBatchCnt)

return loadDataExec
Expand Down
3 changes: 2 additions & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,9 @@ func checkCases(tests []testCase, ld *executor.LoadDataInfo,
data, reachLimit, err1 := ld.InsertData(context.Background(), tt.data1, tt.data2)
c.Assert(err1, IsNil)
c.Assert(reachLimit, IsFalse)
err1 = ld.CheckAndInsertOneBatch(context.Background())
err1 = ld.CheckAndInsertOneBatch(context.Background(), ld.GetRows(), ld.GetCurBatchCnt())
c.Assert(err1, IsNil)
ld.SetMaxRowsInBatch(20000)
if tt.restData == nil {
c.Assert(data, HasLen, 0,
Commentf("data1:%v, data2:%v, data:%v", string(tt.data1), string(tt.data2), string(data)))
Expand Down
174 changes: 162 additions & 12 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"context"
"fmt"
"runtime"
"strings"

"github.com/pingcap/errors"
Expand All @@ -33,7 +34,8 @@ import (
)

var (
null = []byte("NULL")
null = []byte("NULL")
taskQueueSize = 64 // the maximum number of pending tasks to commit in queue
)

// LoadDataExec represents a load data executor.
Expand Down Expand Up @@ -101,6 +103,12 @@ func (e *LoadDataExec) Open(ctx context.Context) error {
return nil
}

// CommitTask is used for fetching data from data preparing routine into committing routine.
type CommitTask struct {
cnt uint64
rows [][]types.Datum
}

// LoadDataInfo saves the information of loading data operation.
type LoadDataInfo struct {
*InsertValues
Expand All @@ -113,17 +121,160 @@ type LoadDataInfo struct {
IgnoreLines uint64
Ctx sessionctx.Context
rows [][]types.Datum

// these fields are used for pipeline data prepare and commit
commitTaskQueue chan CommitTask
QuitCommit chan struct{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you just use a single quit channel ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use two to let process and commit both quit immediately when error occurs in one of them, commit will close QuitProcess and process will close QuitCommit .

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do it by only using one signal channel.

QuitProcess chan struct{}
}

// GetRows getter for rows
func (e *LoadDataInfo) GetRows() [][]types.Datum {
return e.rows
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
}

// GetCurBatchCnt getter for curBatchCnt
func (e *LoadDataInfo) GetCurBatchCnt() uint64 {
return e.curBatchCnt
}

// CloseTaskQueue preparing routine to inform commit routine no more data
func (e *LoadDataInfo) CloseTaskQueue() {
close(e.commitTaskQueue)
}

// InitQueues initialize task queue and error report queue
func (e *LoadDataInfo) InitQueues() {
e.commitTaskQueue = make(chan CommitTask, taskQueueSize)
e.QuitCommit = make(chan struct{})
e.QuitProcess = make(chan struct{})
}

// ForceQuitCommit let commit quit directly
func (e *LoadDataInfo) ForceQuitCommit() {
close(e.QuitCommit)
}

// ForceQuitProcess let process quit directly
func (e *LoadDataInfo) ForceQuitProcess() {
close(e.QuitProcess)
}

// MakeCommitTask produce commit task with data in LoadDataInfo.rows LoadDataInfo.curBatchCnt
func (e *LoadDataInfo) MakeCommitTask() CommitTask {
return CommitTask{e.curBatchCnt, e.rows}
}

// EnqOneTask feed one batch commit task to commit work
func (e *LoadDataInfo) EnqOneTask(ctx context.Context) error {
var err error
if e.curBatchCnt > 0 {
sendOk := false
for !sendOk {
select {
case e.commitTaskQueue <- e.MakeCommitTask():
sendOk = true
case <-e.QuitProcess:
err = errors.New("EnqOneTask forced to quit")
logutil.Logger(ctx).Error("EnqOneTask forced to quit, possible commitWork error")
return err
}
}
logutil.Logger(ctx).Debug("one task enqueued ", zap.Int("current queue len", len(e.commitTaskQueue)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove debug message.

// reset rows buffer, will reallocate buffer but NOT reuse
e.SetMaxRowsInBatch(e.maxRowsInBatch)
}
return err
lysu marked this conversation as resolved.
Show resolved Hide resolved
}

// CommitOneTask insert Data from LoadDataInfo.rows, then make commit and refresh txn
func (e *LoadDataInfo) CommitOneTask(ctx context.Context, task CommitTask, resetBuf bool) error {
var err error
defer func() {
if err != nil {
e.Ctx.StmtRollback()
}
}()
err = e.CheckAndInsertOneBatch(ctx, task.rows, task.cnt)
if err != nil {
logutil.Logger(ctx).Error("commit error CheckAndInsert", zap.Error(err))
return err
}
if err = e.Ctx.StmtCommit(); err != nil {
logutil.Logger(ctx).Error("commit error commit", zap.Error(err))
return err
}
// Make sure that there are no retries when committing.
if err = e.Ctx.RefreshTxnCtx(ctx); err != nil {
logutil.Logger(ctx).Error("commit error refresh", zap.Error(err))
return err
}
// this only set in sequential mode, e.rows buffer will be reused in sequential mode
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is sequential mode?

if resetBuf {
e.curBatchCnt = 0
}
return err
}

// CommitWork commit batch sequentially
func (e *LoadDataInfo) CommitWork(ctx context.Context) error {
var err error
defer func() {
r := recover()
if r != nil {
buf := make([]byte, 4096)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Zap log library has a Stack function to get stack tracing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

roger, changed

stackSize := runtime.Stack(buf, false)
buf = buf[:stackSize]
logutil.Logger(ctx).Error("CommitWork panicked", zap.String("stack", string(buf)))
}
if err != nil || r != nil {
e.ForceQuitProcess()
}
if err != nil {
e.ctx.StmtRollback()
}
}()
var tasks uint64 = 0
var end = false
for !end {
select {
case <-e.QuitCommit:
err = errors.New("commit forced to quit")
logutil.Logger(ctx).Error("commit forced to quit, possible preparation failed")
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
break
case commitTask, ok := <-e.commitTaskQueue:
if ok {
err = e.CommitOneTask(ctx, commitTask, false)
if err != nil {
break
lysu marked this conversation as resolved.
Show resolved Hide resolved
}
tasks++
logutil.Logger(ctx).Debug("commit one task finished",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

zap.Uint64("finished tasks count", tasks),
zap.Int("pending tasks count", len(e.commitTaskQueue)))
} else {
end = true
logutil.Logger(ctx).Info("commit work all finished",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

zap.Uint64("total processed task count", tasks),
zap.Uint64("batch size", e.maxRowsInBatch))
break
}
}
if err != nil {
break
}
}
return err
lysu marked this conversation as resolved.
Show resolved Hide resolved
}

// SetMaxRowsInBatch sets the max number of rows to insert in a batch.
func (e *LoadDataInfo) SetMaxRowsInBatch(limit uint64) {
e.maxRowsInBatch = limit
if uint64(cap(e.rows)) < limit {
e.rows = make([][]types.Datum, 0, limit)
for i := 0; uint64(i) < limit; i++ {
e.rows = append(e.rows, make([]types.Datum, len(e.Table.Cols())))
}
e.rows = make([][]types.Datum, 0, limit)
for i := 0; uint64(i) < limit; i++ {
e.rows = append(e.rows, make([]types.Datum, len(e.Table.Cols())))
}
e.curBatchCnt = 0
}

// getValidData returns prevData and curData that starts from starting symbol.
Expand Down Expand Up @@ -269,7 +420,7 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte)
e.curBatchCnt++
if e.maxRowsInBatch != 0 && e.rowCount%e.maxRowsInBatch == 0 {
reachLimit = true
logutil.BgLogger().Info("batch limit hit when inserting rows", zap.Int("maxBatchRows", e.maxChunkSize),
logutil.Logger(ctx).Info("batch limit hit when inserting rows", zap.Int("maxBatchRows", e.maxChunkSize),
zap.Uint64("totalRows", e.rowCount))
break
}
Expand All @@ -278,17 +429,16 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte)
}

// CheckAndInsertOneBatch is used to commit one transaction batch full filled data
func (e *LoadDataInfo) CheckAndInsertOneBatch(ctx context.Context) error {
func (e *LoadDataInfo) CheckAndInsertOneBatch(ctx context.Context, rows [][]types.Datum, cnt uint64) error {
var err error
if e.curBatchCnt == 0 {
if cnt == 0 {
return err
}
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(e.curBatchCnt)
err = e.batchCheckAndInsert(ctx, e.rows[0:e.curBatchCnt], e.addRecordLD)
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(cnt)
err = e.batchCheckAndInsert(ctx, rows[0:cnt], e.addRecordLD)
if err != nil {
return err
}
e.curBatchCnt = 0
return err
}

Expand Down
6 changes: 4 additions & 2 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1845,8 +1845,9 @@ func (s *testSuite4) TestLoadData(c *C) {
_, reachLimit, err := ld.InsertData(context.Background(), nil, nil)
c.Assert(err, IsNil)
c.Assert(reachLimit, IsFalse)
err = ld.CheckAndInsertOneBatch(context.Background())
err = ld.CheckAndInsertOneBatch(context.Background(), ld.GetRows(), ld.GetCurBatchCnt())
c.Assert(err, IsNil)
ld.SetMaxRowsInBatch(20000)
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
r := tk.MustQuery(selectSQL)
r.Check(nil)

Expand Down Expand Up @@ -2096,8 +2097,9 @@ func (s *testSuite4) TestLoadDataIntoPartitionedTable(c *C) {

_, _, err := ld.InsertData(context.Background(), nil, []byte("1,2\n3,4\n5,6\n7,8\n9,10\n"))
c.Assert(err, IsNil)
err = ld.CheckAndInsertOneBatch(context.Background())
err = ld.CheckAndInsertOneBatch(context.Background(), ld.GetRows(), ld.GetCurBatchCnt())
c.Assert(err, IsNil)
ld.SetMaxRowsInBatch(20000)
ld.SetMessage()
err = ctx.StmtCommit()
c.Assert(err, IsNil)
Expand Down
Loading