-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
executor: load data statement, separate data preparing routine and commit routine #11533
Changes from 1 commit
e59d08d
a631f75
700d86d
e07a050
1057e4f
aa3a04b
d954d76
596d071
768bd60
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ import ( | |
"bytes" | ||
"context" | ||
"fmt" | ||
"runtime" | ||
"strings" | ||
|
||
"github.com/pingcap/errors" | ||
|
@@ -33,7 +34,8 @@ import ( | |
) | ||
|
||
var ( | ||
null = []byte("NULL") | ||
null = []byte("NULL") | ||
taskQueueSize = 64 // the maximum number of pending tasks to commit in queue | ||
) | ||
|
||
// LoadDataExec represents a load data executor. | ||
|
@@ -101,6 +103,12 @@ func (e *LoadDataExec) Open(ctx context.Context) error { | |
return nil | ||
} | ||
|
||
// CommitTask is used for fetching data from data preparing routine into committing routine. | ||
type CommitTask struct { | ||
cnt uint64 | ||
rows [][]types.Datum | ||
} | ||
|
||
// LoadDataInfo saves the information of loading data operation. | ||
type LoadDataInfo struct { | ||
*InsertValues | ||
|
@@ -113,17 +121,160 @@ type LoadDataInfo struct { | |
IgnoreLines uint64 | ||
Ctx sessionctx.Context | ||
rows [][]types.Datum | ||
|
||
// these fields are used for pipeline data prepare and commit | ||
commitTaskQueue chan CommitTask | ||
QuitCommit chan struct{} | ||
QuitProcess chan struct{} | ||
} | ||
|
||
// GetRows getter for rows | ||
func (e *LoadDataInfo) GetRows() [][]types.Datum { | ||
return e.rows | ||
cfzjywxk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
// GetCurBatchCnt getter for curBatchCnt | ||
func (e *LoadDataInfo) GetCurBatchCnt() uint64 { | ||
return e.curBatchCnt | ||
} | ||
|
||
// CloseTaskQueue preparing routine to inform commit routine no more data | ||
func (e *LoadDataInfo) CloseTaskQueue() { | ||
close(e.commitTaskQueue) | ||
} | ||
|
||
// InitQueues initialize task queue and error report queue | ||
func (e *LoadDataInfo) InitQueues() { | ||
e.commitTaskQueue = make(chan CommitTask, taskQueueSize) | ||
e.QuitCommit = make(chan struct{}) | ||
e.QuitProcess = make(chan struct{}) | ||
} | ||
|
||
// ForceQuitCommit let commit quit directly | ||
func (e *LoadDataInfo) ForceQuitCommit() { | ||
close(e.QuitCommit) | ||
} | ||
|
||
// ForceQuitProcess let process quit directly | ||
func (e *LoadDataInfo) ForceQuitProcess() { | ||
close(e.QuitProcess) | ||
} | ||
|
||
// MakeCommitTask produce commit task with data in LoadDataInfo.rows LoadDataInfo.curBatchCnt | ||
func (e *LoadDataInfo) MakeCommitTask() CommitTask { | ||
return CommitTask{e.curBatchCnt, e.rows} | ||
} | ||
|
||
// EnqOneTask feed one batch commit task to commit work | ||
func (e *LoadDataInfo) EnqOneTask(ctx context.Context) error { | ||
var err error | ||
if e.curBatchCnt > 0 { | ||
sendOk := false | ||
for !sendOk { | ||
select { | ||
case e.commitTaskQueue <- e.MakeCommitTask(): | ||
sendOk = true | ||
case <-e.QuitProcess: | ||
err = errors.New("EnqOneTask forced to quit") | ||
logutil.Logger(ctx).Error("EnqOneTask forced to quit, possible commitWork error") | ||
return err | ||
} | ||
} | ||
logutil.Logger(ctx).Debug("one task enqueued ", zap.Int("current queue len", len(e.commitTaskQueue))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove debug message. |
||
// reset rows buffer, will reallocate buffer but NOT reuse | ||
e.SetMaxRowsInBatch(e.maxRowsInBatch) | ||
} | ||
return err | ||
lysu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
// CommitOneTask insert Data from LoadDataInfo.rows, then make commit and refresh txn | ||
func (e *LoadDataInfo) CommitOneTask(ctx context.Context, task CommitTask, resetBuf bool) error { | ||
var err error | ||
defer func() { | ||
if err != nil { | ||
e.Ctx.StmtRollback() | ||
} | ||
}() | ||
err = e.CheckAndInsertOneBatch(ctx, task.rows, task.cnt) | ||
if err != nil { | ||
logutil.Logger(ctx).Error("commit error CheckAndInsert", zap.Error(err)) | ||
return err | ||
} | ||
if err = e.Ctx.StmtCommit(); err != nil { | ||
logutil.Logger(ctx).Error("commit error commit", zap.Error(err)) | ||
return err | ||
} | ||
// Make sure that there are no retries when committing. | ||
if err = e.Ctx.RefreshTxnCtx(ctx); err != nil { | ||
logutil.Logger(ctx).Error("commit error refresh", zap.Error(err)) | ||
return err | ||
} | ||
// this only set in sequential mode, e.rows buffer will be reused in sequential mode | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is sequential mode? |
||
if resetBuf { | ||
e.curBatchCnt = 0 | ||
} | ||
return err | ||
} | ||
|
||
// CommitWork commit batch sequentially | ||
func (e *LoadDataInfo) CommitWork(ctx context.Context) error { | ||
var err error | ||
defer func() { | ||
r := recover() | ||
if r != nil { | ||
buf := make([]byte, 4096) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Zap log library has a Stack function to get stack tracing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. roger, changed |
||
stackSize := runtime.Stack(buf, false) | ||
buf = buf[:stackSize] | ||
logutil.Logger(ctx).Error("CommitWork panicked", zap.String("stack", string(buf))) | ||
} | ||
if err != nil || r != nil { | ||
e.ForceQuitProcess() | ||
} | ||
if err != nil { | ||
e.ctx.StmtRollback() | ||
} | ||
}() | ||
var tasks uint64 = 0 | ||
var end = false | ||
for !end { | ||
select { | ||
case <-e.QuitCommit: | ||
err = errors.New("commit forced to quit") | ||
logutil.Logger(ctx).Error("commit forced to quit, possible preparation failed") | ||
cfzjywxk marked this conversation as resolved.
Show resolved
Hide resolved
|
||
break | ||
case commitTask, ok := <-e.commitTaskQueue: | ||
if ok { | ||
err = e.CommitOneTask(ctx, commitTask, false) | ||
if err != nil { | ||
break | ||
lysu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
tasks++ | ||
logutil.Logger(ctx).Debug("commit one task finished", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto. |
||
zap.Uint64("finished tasks count", tasks), | ||
zap.Int("pending tasks count", len(e.commitTaskQueue))) | ||
} else { | ||
end = true | ||
logutil.Logger(ctx).Info("commit work all finished", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto. |
||
zap.Uint64("total processed task count", tasks), | ||
zap.Uint64("batch size", e.maxRowsInBatch)) | ||
break | ||
} | ||
} | ||
if err != nil { | ||
break | ||
} | ||
} | ||
return err | ||
lysu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
// SetMaxRowsInBatch sets the max number of rows to insert in a batch. | ||
func (e *LoadDataInfo) SetMaxRowsInBatch(limit uint64) { | ||
e.maxRowsInBatch = limit | ||
if uint64(cap(e.rows)) < limit { | ||
e.rows = make([][]types.Datum, 0, limit) | ||
for i := 0; uint64(i) < limit; i++ { | ||
e.rows = append(e.rows, make([]types.Datum, len(e.Table.Cols()))) | ||
} | ||
e.rows = make([][]types.Datum, 0, limit) | ||
for i := 0; uint64(i) < limit; i++ { | ||
e.rows = append(e.rows, make([]types.Datum, len(e.Table.Cols()))) | ||
} | ||
e.curBatchCnt = 0 | ||
} | ||
|
||
// getValidData returns prevData and curData that starts from starting symbol. | ||
|
@@ -269,7 +420,7 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte) | |
e.curBatchCnt++ | ||
if e.maxRowsInBatch != 0 && e.rowCount%e.maxRowsInBatch == 0 { | ||
reachLimit = true | ||
logutil.BgLogger().Info("batch limit hit when inserting rows", zap.Int("maxBatchRows", e.maxChunkSize), | ||
logutil.Logger(ctx).Info("batch limit hit when inserting rows", zap.Int("maxBatchRows", e.maxChunkSize), | ||
zap.Uint64("totalRows", e.rowCount)) | ||
break | ||
} | ||
|
@@ -278,17 +429,16 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte) | |
} | ||
|
||
// CheckAndInsertOneBatch is used to commit one transaction batch full filled data | ||
func (e *LoadDataInfo) CheckAndInsertOneBatch(ctx context.Context) error { | ||
func (e *LoadDataInfo) CheckAndInsertOneBatch(ctx context.Context, rows [][]types.Datum, cnt uint64) error { | ||
var err error | ||
if e.curBatchCnt == 0 { | ||
if cnt == 0 { | ||
return err | ||
} | ||
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(e.curBatchCnt) | ||
err = e.batchCheckAndInsert(ctx, e.rows[0:e.curBatchCnt], e.addRecordLD) | ||
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(cnt) | ||
err = e.batchCheckAndInsert(ctx, rows[0:cnt], e.addRecordLD) | ||
if err != nil { | ||
return err | ||
} | ||
e.curBatchCnt = 0 | ||
return err | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you just use a single quit channel ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use two to let process and commit both quit immediately when error occurs in one of them, commit will close
QuitProcess
and process will closeQuitCommit
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can do it by only using one signal channel.