Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: load data statement, separate data preparing routine and commit routine #11533

Merged
merged 9 commits into from
Sep 16, 2019
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,8 +739,8 @@ func (b *executorBuilder) buildLoadData(v *plannercore.LoadData) Executor {
Ctx: b.ctx,
},
}

var defaultLoadDataBatchCnt uint64 = 20000 // TODO this will be changed to variable in another pr
loadDataExec.loadDataInfo.InitQueues()
loadDataExec.loadDataInfo.SetMaxRowsInBatch(defaultLoadDataBatchCnt)

return loadDataExec
Expand Down
3 changes: 2 additions & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,9 @@ func checkCases(tests []testCase, ld *executor.LoadDataInfo,
data, reachLimit, err1 := ld.InsertData(context.Background(), tt.data1, tt.data2)
c.Assert(err1, IsNil)
c.Assert(reachLimit, IsFalse)
err1 = ld.CheckAndInsertOneBatch(context.Background())
err1 = ld.CheckAndInsertOneBatch(context.Background(), ld.GetRows(), ld.GetCurBatchCnt())
c.Assert(err1, IsNil)
ld.SetMaxRowsInBatch(20000)
if tt.restData == nil {
c.Assert(data, HasLen, 0,
Commentf("data1:%v, data2:%v, data:%v", string(tt.data1), string(tt.data2), string(data)))
Expand Down
164 changes: 152 additions & 12 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import (
)

var (
null = []byte("NULL")
null = []byte("NULL")
taskQueueSize = 64 // the maximum number of pending tasks to commit in queue
)

// LoadDataExec represents a load data executor.
Expand Down Expand Up @@ -101,6 +102,12 @@ func (e *LoadDataExec) Open(ctx context.Context) error {
return nil
}

// CommitTask is used for fetching data from data preparing routine into committing routine.
type CommitTask struct {
cnt uint64
rows [][]types.Datum
}

// LoadDataInfo saves the information of loading data operation.
type LoadDataInfo struct {
*InsertValues
Expand All @@ -113,17 +120,151 @@ type LoadDataInfo struct {
IgnoreLines uint64
Ctx sessionctx.Context
rows [][]types.Datum

commitTaskQueue chan CommitTask
StopCh chan struct{}
QuitCh chan struct{}
}

// GetRows getter for rows
func (e *LoadDataInfo) GetRows() [][]types.Datum {
return e.rows
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
}

// GetCurBatchCnt getter for curBatchCnt
func (e *LoadDataInfo) GetCurBatchCnt() uint64 {
return e.curBatchCnt
}

// CloseTaskQueue preparing routine to inform commit routine no more data
func (e *LoadDataInfo) CloseTaskQueue() {
close(e.commitTaskQueue)
}

// InitQueues initialize task queue and error report queue
func (e *LoadDataInfo) InitQueues() {
e.commitTaskQueue = make(chan CommitTask, taskQueueSize)
e.StopCh = make(chan struct{}, 2)
e.QuitCh = make(chan struct{})
}

// StartStopWatcher monitor StopCh to force quit
func (e *LoadDataInfo) StartStopWatcher() {
go func() {
<-e.StopCh
close(e.QuitCh)
}()
}

// ForceQuit let commit quit directly
func (e *LoadDataInfo) ForceQuit() {
e.StopCh <- struct{}{}
}

// MakeCommitTask produce commit task with data in LoadDataInfo.rows LoadDataInfo.curBatchCnt
func (e *LoadDataInfo) MakeCommitTask() CommitTask {
return CommitTask{e.curBatchCnt, e.rows}
}

// EnqOneTask feed one batch commit task to commit work
func (e *LoadDataInfo) EnqOneTask(ctx context.Context) error {
var err error
if e.curBatchCnt > 0 {
sendOk := false
for !sendOk {
select {
case e.commitTaskQueue <- e.MakeCommitTask():
sendOk = true
case <-e.QuitCh:
err = errors.New("EnqOneTask forced to quit")
logutil.Logger(ctx).Error("EnqOneTask forced to quit, possible commitWork error")
return err
}
}
// reset rows buffer, will reallocate buffer but NOT reuse
e.SetMaxRowsInBatch(e.maxRowsInBatch)
}
return err
lysu marked this conversation as resolved.
Show resolved Hide resolved
}

// CommitOneTask insert Data from LoadDataInfo.rows, then make commit and refresh txn
func (e *LoadDataInfo) CommitOneTask(ctx context.Context, task CommitTask) error {
var err error
defer func() {
if err != nil {
e.Ctx.StmtRollback()
}
}()
err = e.CheckAndInsertOneBatch(ctx, task.rows, task.cnt)
if err != nil {
logutil.Logger(ctx).Error("commit error CheckAndInsert", zap.Error(err))
return err
}
if err = e.Ctx.StmtCommit(); err != nil {
logutil.Logger(ctx).Error("commit error commit", zap.Error(err))
return err
}
// Make sure that there are no retries when committing.
if err = e.Ctx.RefreshTxnCtx(ctx); err != nil {
logutil.Logger(ctx).Error("commit error refresh", zap.Error(err))
return err
}
return err
}

// CommitWork commit batch sequentially
func (e *LoadDataInfo) CommitWork(ctx context.Context) error {
var err error
defer func() {
r := recover()
if r != nil {
logutil.Logger(ctx).Error("CommitWork panicked",
zap.Reflect("r", r),
zap.Stack("stack"))
}
if err != nil || r != nil {
e.ForceQuit()
}
if err != nil {
e.ctx.StmtRollback()
}
}()
var tasks uint64 = 0
var end = false
for !end {
select {
case <-e.QuitCh:
err = errors.New("commit forced to quit")
logutil.Logger(ctx).Error("commit forced to quit, possible preparation failed")
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
break
case commitTask, ok := <-e.commitTaskQueue:
if ok {
err = e.CommitOneTask(ctx, commitTask)
if err != nil {
break
lysu marked this conversation as resolved.
Show resolved Hide resolved
}
tasks++
} else {
end = true
break
}
}
if err != nil {
logutil.Logger(ctx).Error("load data commit work error", zap.Error(err))
break
}
}
return err
lysu marked this conversation as resolved.
Show resolved Hide resolved
}

// SetMaxRowsInBatch sets the max number of rows to insert in a batch.
func (e *LoadDataInfo) SetMaxRowsInBatch(limit uint64) {
e.maxRowsInBatch = limit
if uint64(cap(e.rows)) < limit {
e.rows = make([][]types.Datum, 0, limit)
for i := 0; uint64(i) < limit; i++ {
e.rows = append(e.rows, make([]types.Datum, len(e.Table.Cols())))
}
e.rows = make([][]types.Datum, 0, limit)
for i := 0; uint64(i) < limit; i++ {
e.rows = append(e.rows, make([]types.Datum, len(e.Table.Cols())))
}
e.curBatchCnt = 0
}

// getValidData returns prevData and curData that starts from starting symbol.
Expand Down Expand Up @@ -271,7 +412,7 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte)
e.curBatchCnt++
if e.maxRowsInBatch != 0 && e.rowCount%e.maxRowsInBatch == 0 {
reachLimit = true
logutil.BgLogger().Info("batch limit hit when inserting rows", zap.Int("maxBatchRows", e.maxChunkSize),
logutil.Logger(ctx).Info("batch limit hit when inserting rows", zap.Int("maxBatchRows", e.maxChunkSize),
zap.Uint64("totalRows", e.rowCount))
break
}
Expand All @@ -280,17 +421,16 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte)
}

// CheckAndInsertOneBatch is used to commit one transaction batch full filled data
func (e *LoadDataInfo) CheckAndInsertOneBatch(ctx context.Context) error {
func (e *LoadDataInfo) CheckAndInsertOneBatch(ctx context.Context, rows [][]types.Datum, cnt uint64) error {
var err error
if e.curBatchCnt == 0 {
if cnt == 0 {
return err
}
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(e.curBatchCnt)
err = e.batchCheckAndInsert(ctx, e.rows[0:e.curBatchCnt], e.addRecordLD)
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(cnt)
err = e.batchCheckAndInsert(ctx, rows[0:cnt], e.addRecordLD)
if err != nil {
return err
}
e.curBatchCnt = 0
return err
}

Expand Down
6 changes: 4 additions & 2 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1845,8 +1845,9 @@ func (s *testSuite4) TestLoadData(c *C) {
_, reachLimit, err := ld.InsertData(context.Background(), nil, nil)
c.Assert(err, IsNil)
c.Assert(reachLimit, IsFalse)
err = ld.CheckAndInsertOneBatch(context.Background())
err = ld.CheckAndInsertOneBatch(context.Background(), ld.GetRows(), ld.GetCurBatchCnt())
c.Assert(err, IsNil)
ld.SetMaxRowsInBatch(20000)
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
r := tk.MustQuery(selectSQL)
r.Check(nil)

Expand Down Expand Up @@ -2096,8 +2097,9 @@ func (s *testSuite4) TestLoadDataIntoPartitionedTable(c *C) {

_, _, err := ld.InsertData(context.Background(), nil, []byte("1,2\n3,4\n5,6\n7,8\n9,10\n"))
c.Assert(err, IsNil)
err = ld.CheckAndInsertOneBatch(context.Background())
err = ld.CheckAndInsertOneBatch(context.Background(), ld.GetRows(), ld.GetCurBatchCnt())
c.Assert(err, IsNil)
ld.SetMaxRowsInBatch(20000)
ld.SetMessage()
err = ctx.StmtCommit()
c.Assert(err, IsNil)
Expand Down
103 changes: 63 additions & 40 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,9 +1044,8 @@ func (cc *clientConn) writeReq(filePath string) error {
return cc.flush()
}

var defaultLoadDataBatchCnt uint64 = 20000

func insertDataWithCommit(ctx context.Context, prevData, curData []byte, loadDataInfo *executor.LoadDataInfo) ([]byte, error) {
func insertDataWithCommit(ctx context.Context, prevData,
curData []byte, loadDataInfo *executor.LoadDataInfo) ([]byte, error) {
var err error
var reachLimit bool
for {
Expand All @@ -1057,47 +1056,35 @@ func insertDataWithCommit(ctx context.Context, prevData, curData []byte, loadDat
if !reachLimit {
break
}
err := loadDataInfo.CheckAndInsertOneBatch(ctx)
// push into commit task queue
err = loadDataInfo.EnqOneTask(ctx)
if err != nil {
return nil, err
}
if err = loadDataInfo.Ctx.StmtCommit(); err != nil {
return nil, err
}
// Make sure that there are no retries when committing.
if err = loadDataInfo.Ctx.RefreshTxnCtx(ctx); err != nil {
return nil, err
return prevData, err
}
curData = prevData
prevData = nil
}
return prevData, nil
}

// handleLoadData does the additional work after processing the 'load data' query.
// It sends client a file path, then reads the file content from client, inserts data into database.
func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor.LoadDataInfo) error {
// If the server handles the load data request, the client has to set the ClientLocalFiles capability.
if cc.capability&mysql.ClientLocalFiles == 0 {
return errNotAllowedCommand
}
if loadDataInfo == nil {
return errors.New("load data info is empty")
}

err := cc.writeReq(loadDataInfo.Path)
if err != nil {
return err
}

// processStream process input stream from network
func processStream(ctx context.Context, cc *clientConn, loadDataInfo *executor.LoadDataInfo) {
var err error
var shouldBreak bool
var prevData, curData []byte
// TODO: Make the loadDataRowCnt settable.
loadDataInfo.SetMaxRowsInBatch(defaultLoadDataBatchCnt)
err = loadDataInfo.Ctx.NewTxn(ctx)
if err != nil {
return err
}
defer func() {
r := recover()
if r != nil {
logutil.Logger(ctx).Error("process routine panicked",
zap.Reflect("r", r),
zap.Stack("stack"))
}
if err != nil || r != nil {
loadDataInfo.ForceQuit()
} else {
loadDataInfo.CloseTaskQueue()
}
}()
for {
curData, err = cc.readPacket()
if err != nil {
Expand All @@ -1112,6 +1099,15 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor
break
}
}
select {
case <-loadDataInfo.QuitCh:
err = errors.New("processStream forced to quit")
default:
}
if err != nil {
break
}
// prepare batch and enqueue task
prevData, err = insertDataWithCommit(ctx, prevData, curData, loadDataInfo)
if err != nil {
break
Expand All @@ -1120,16 +1116,43 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor
break
}
}
loadDataInfo.SetMessage()

if err != nil {
loadDataInfo.Ctx.StmtRollback()
logutil.Logger(ctx).Error("load data process stream error", zap.Error(err))
} else {
err = loadDataInfo.CheckAndInsertOneBatch(ctx)
if err == nil {
err = loadDataInfo.Ctx.StmtCommit()
err = loadDataInfo.EnqOneTask(ctx)
if err != nil {
logutil.Logger(ctx).Error("load data process stream error", zap.Error(err))
}
}
}

// handleLoadData does the additional work after processing the 'load data' query.
// It sends client a file path, then reads the file content from client, inserts data into database.
func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor.LoadDataInfo) error {
cfzjywxk marked this conversation as resolved.
Show resolved Hide resolved
// If the server handles the load data request, the client has to set the ClientLocalFiles capability.
if cc.capability&mysql.ClientLocalFiles == 0 {
return errNotAllowedCommand
}
if loadDataInfo == nil {
return errors.New("load data info is empty")
}

err := cc.writeReq(loadDataInfo.Path)
if err != nil {
return err
}

loadDataInfo.InitQueues()
loadDataInfo.SetMaxRowsInBatch(uint64(loadDataInfo.Ctx.GetSessionVars().DMLBatchSize))
loadDataInfo.StartStopWatcher()
err = loadDataInfo.Ctx.NewTxn(ctx)
if err != nil {
return err
}
// processStream process input data, enqueue commit task
go processStream(ctx, cc, loadDataInfo)
err = loadDataInfo.CommitWork(ctx)
loadDataInfo.SetMessage()

var txn kv.Transaction
var err1 error
Expand Down
Loading