Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: trace the execution of the insert operation #11667

Merged
merged 7 commits into from
Aug 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (s *testColumnSuite) checkColumnKVExist(ctx sessionctx.Context, t table.Tab
if err != nil {
return errors.Trace(err)
}
data, err := txn.Get(key)
data, err := txn.Get(context.TODO(), key)
if !isExist {
if terror.ErrorEqual(err, kv.ErrNotExist) {
return nil
Expand Down
7 changes: 7 additions & 0 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
Expand All @@ -33,6 +34,12 @@ const (
// Select sends a DAG request, returns SelectResult.
// In kvReq, KeyRanges is required, Concurrency/KeepOrder/Desc/IsolationLevel/Priority are optional.
func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fieldTypes []*types.FieldType, fb *statistics.QueryFeedback) (SelectResult, error) {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("distsql.Select", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

// For testing purpose.
if hook := ctx.Value("CheckSelectRequestHook"); hook != nil {
hook.(func(*kv.Request))(kvReq)
Expand Down
16 changes: 11 additions & 5 deletions executor/batch_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"context"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -89,7 +90,7 @@ func (b *batchChecker) encodeNewRow(ctx sessionctx.Context, t table.Table, row [

// getKeysNeedCheck gets keys converted from to-be-insert rows to record keys and unique index keys,
// which need to be checked whether they are duplicate keys.
func (b *batchChecker) getKeysNeedCheck(ctx sessionctx.Context, t table.Table, rows [][]types.Datum) ([]toBeCheckedRow, error) {
func (b *batchChecker) getKeysNeedCheck(ctx context.Context, sctx sessionctx.Context, t table.Table, rows [][]types.Datum) ([]toBeCheckedRow, error) {
nUnique := 0
for _, v := range t.WritableIndices() {
if v.Meta().Unique {
Expand All @@ -111,7 +112,7 @@ func (b *batchChecker) getKeysNeedCheck(ctx sessionctx.Context, t table.Table, r

var err error
for _, row := range rows {
toBeCheckRows, err = b.getKeysNeedCheckOneRow(ctx, t, row, nUnique, handleCol, toBeCheckRows)
toBeCheckRows, err = b.getKeysNeedCheckOneRow(sctx, t, row, nUnique, handleCol, toBeCheckRows)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -191,7 +192,7 @@ func (b *batchChecker) getKeysNeedCheckOneRow(ctx sessionctx.Context, t table.Ta
// batchGetInsertKeys uses batch-get to fetch all key-value pairs to be checked for ignore or duplicate key update.
func (b *batchChecker) batchGetInsertKeys(ctx context.Context, sctx sessionctx.Context, t table.Table, newRows [][]types.Datum) (err error) {
// Get keys need to be checked.
b.toBeCheckedRows, err = b.getKeysNeedCheck(sctx, t, newRows)
b.toBeCheckedRows, err = b.getKeysNeedCheck(ctx, sctx, t, newRows)
if err != nil {
return err
}
Expand Down Expand Up @@ -251,6 +252,11 @@ func (b *batchChecker) initDupOldRowFromUniqueKey(ctx context.Context, sctx sess

// initDupOldRowValue initializes dupOldRowValues which contain the to-be-updated rows from storage.
func (b *batchChecker) initDupOldRowValue(ctx context.Context, sctx sessionctx.Context, t table.Table, newRows [][]types.Datum) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("batchCheck.initDupOldRowValue", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
b.dupOldRowValues = make(map[string][]byte, len(newRows))
b.initDupOldRowFromHandleKey()
return b.initDupOldRowFromUniqueKey(ctx, sctx, newRows)
Expand All @@ -270,8 +276,8 @@ func (b *batchChecker) fillBackKeys(t table.Table, row toBeCheckedRow, handle in
}

// deleteDupKeys picks primary/unique key-value pairs from rows and remove them from the dupKVs
func (b *batchChecker) deleteDupKeys(ctx sessionctx.Context, t table.Table, rows [][]types.Datum) error {
cleanupRows, err := b.getKeysNeedCheck(ctx, t, rows)
func (b *batchChecker) deleteDupKeys(ctx context.Context, sctx sessionctx.Context, t table.Table, rows [][]types.Datum) error {
cleanupRows, err := b.getKeysNeedCheck(ctx, sctx, t, rows)
if err != nil {
return err
}
Expand Down
32 changes: 20 additions & 12 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package executor
import (
"context"
"fmt"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -76,7 +78,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
}
} else {
for _, row := range rows {
if _, err := e.addRecord(row); err != nil {
if _, err := e.addRecord(ctx, row); err != nil {
return err
}
}
Expand Down Expand Up @@ -104,7 +106,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
if err != nil {
return err
}
err = e.updateDupRow(r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, r, handle, e.OnDuplicate)
if err != nil {
return err
}
Expand All @@ -117,7 +119,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
if err != nil {
return err
}
err = e.updateDupRow(r, handle, e.OnDuplicate)
err = e.updateDupRow(ctx, r, handle, e.OnDuplicate)
if err != nil {
return err
}
Expand All @@ -130,7 +132,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
// and key-values should be filled back to dupOldRowValues for the further row check,
// due to there may be duplicate keys inside the insert statement.
if newRows[i] != nil {
newHandle, err := e.addRecord(newRows[i])
newHandle, err := e.addRecord(ctx, newRows[i])
if err != nil {
return err
}
Expand Down Expand Up @@ -169,26 +171,32 @@ func (e *InsertExec) Open(ctx context.Context) error {
}

// updateDupRow updates a duplicate row to a new row.
func (e *InsertExec) updateDupRow(row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error {
func (e *InsertExec) updateDupRow(ctx context.Context, row toBeCheckedRow, handle int64, onDuplicate []*expression.Assignment) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("InsertExec.updateDupRow", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

oldRow, err := e.getOldRow(e.ctx, row.t, handle, e.GenExprs)
if err != nil {
logutil.BgLogger().Error("get old row failed when insert on dup", zap.Int64("handle", handle), zap.String("toBeInsertedRow", types.DatumsToStrNoErr(row.row)))
return err
}
// Do update row.
updatedRow, handleChanged, newHandle, err := e.doDupRowUpdate(handle, oldRow, row.row, onDuplicate)
updatedRow, handleChanged, newHandle, err := e.doDupRowUpdate(ctx, handle, oldRow, row.row, onDuplicate)
if e.ctx.GetSessionVars().StmtCtx.DupKeyAsWarning && kv.ErrKeyExists.Equal(err) {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(err)
return nil
}
if err != nil {
return err
}
return e.updateDupKeyValues(handle, newHandle, handleChanged, oldRow, updatedRow)
return e.updateDupKeyValues(ctx, handle, newHandle, handleChanged, oldRow, updatedRow)
}

// doDupRowUpdate updates the duplicate row.
func (e *InsertExec) doDupRowUpdate(handle int64, oldRow []types.Datum, newRow []types.Datum,
func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle int64, oldRow []types.Datum, newRow []types.Datum,
cols []*expression.Assignment) ([]types.Datum, bool, int64, error) {
assignFlag := make([]bool, len(e.Table.WritableCols()))
// See http://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_values
Expand All @@ -212,23 +220,23 @@ func (e *InsertExec) doDupRowUpdate(handle int64, oldRow []types.Datum, newRow [
}

newData := row4Update[:len(oldRow)]
_, handleChanged, newHandle, err := updateRecord(e.ctx, handle, oldRow, newData, assignFlag, e.Table, true)
_, handleChanged, newHandle, err := updateRecord(ctx, e.ctx, handle, oldRow, newData, assignFlag, e.Table, true)
if err != nil {
return nil, false, 0, err
}
return newData, handleChanged, newHandle, nil
}

// updateDupKeyValues updates the dupKeyValues for further duplicate key check.
func (e *InsertExec) updateDupKeyValues(oldHandle int64, newHandle int64,
func (e *InsertExec) updateDupKeyValues(ctx context.Context, oldHandle int64, newHandle int64,
handleChanged bool, oldRow []types.Datum, updatedRow []types.Datum) error {
// There is only one row per update.
fillBackKeysInRows, err := e.getKeysNeedCheck(e.ctx, e.Table, [][]types.Datum{updatedRow})
fillBackKeysInRows, err := e.getKeysNeedCheck(ctx, e.ctx, e.Table, [][]types.Datum{updatedRow})
if err != nil {
return err
}
// Delete old keys and fill back new key-values of the updated row.
err = e.deleteDupKeys(e.ctx, e.Table, [][]types.Datum{oldRow})
err = e.deleteDupKeys(ctx, e.ctx, e.Table, [][]types.Datum{oldRow})
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ func (e *InsertValues) handleWarning(err error) {

// batchCheckAndInsert checks rows with duplicate errors.
// All duplicate rows will be ignored and appended as duplicate warnings.
func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.Datum, addRecord func(row []types.Datum) (int64, error)) error {
func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.Datum, addRecord func(ctx context.Context, row []types.Datum) (int64, error)) error {
// all the rows will be checked, so it is safe to set BatchCheck = true
e.ctx.GetSessionVars().StmtCtx.BatchCheck = true
err := e.batchGetInsertKeys(ctx, e.ctx, e.Table, rows)
Expand Down Expand Up @@ -611,7 +611,7 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
// There may be duplicate keys inside the insert statement.
if !skip {
e.ctx.GetSessionVars().StmtCtx.AddCopiedRows(1)
_, err = addRecord(rows[i])
_, err = addRecord(ctx, rows[i])
if err != nil {
return err
}
Expand All @@ -626,15 +626,15 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
return nil
}

func (e *InsertValues) addRecord(row []types.Datum) (int64, error) {
func (e *InsertValues) addRecord(ctx context.Context, row []types.Datum) (int64, error) {
txn, err := e.ctx.Txn(true)
if err != nil {
return 0, err
}
if !e.ctx.GetSessionVars().ConstraintCheckInPlace {
txn.SetOption(kv.PresumeKeyNotExists, nil)
}
h, err := e.Table.AddRecord(e.ctx, row)
h, err := e.Table.AddRecord(e.ctx, row, table.WithCtx(ctx))
txn.DelOption(kv.PresumeKeyNotExists)
if err != nil {
return 0, err
Expand Down
4 changes: 2 additions & 2 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,11 +332,11 @@ func (e *LoadDataInfo) colsToRow(ctx context.Context, cols []field) []types.Datu
return row
}

func (e *LoadDataInfo) addRecordLD(row []types.Datum) (int64, error) {
func (e *LoadDataInfo) addRecordLD(ctx context.Context, row []types.Datum) (int64, error) {
if row == nil {
return 0, nil
}
h, err := e.addRecord(row)
h, err := e.addRecord(ctx, row)
if err != nil {
e.handleWarning(err)
}
Expand Down
10 changes: 5 additions & 5 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
return err1
}

handleVal, err1 := e.get(idxKey)
handleVal, err1 := e.get(ctx, idxKey)
if err1 != nil && !kv.ErrNotExist.Equal(err1) {
return err1
}
Expand Down Expand Up @@ -125,7 +125,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
}

key := tablecodec.EncodeRowKeyWithHandle(e.tblInfo.ID, e.handle)
val, err := e.get(key)
val, err := e.get(ctx, key)
if err != nil && !kv.ErrNotExist.Equal(err) {
return err
}
Expand Down Expand Up @@ -175,15 +175,15 @@ func (e *PointGetExecutor) encodeIndexKey() (_ []byte, err error) {
return tablecodec.EncodeIndexSeekKey(e.tblInfo.ID, e.idxInfo.ID, encodedIdxVals), nil
}

func (e *PointGetExecutor) get(key kv.Key) (val []byte, err error) {
func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) (val []byte, err error) {
txn, err := e.ctx.Txn(true)
if err != nil {
return nil, err
}
if txn != nil && txn.Valid() && !txn.IsReadOnly() {
// We cannot use txn.Get directly here because the snapshot in txn and the snapshot of e.snapshot may be
// different for pessimistic transaction.
val, err = txn.GetMemBuffer().Get(key)
val, err = txn.GetMemBuffer().Get(ctx, key)
if err == nil {
return val, err
}
Expand All @@ -192,7 +192,7 @@ func (e *PointGetExecutor) get(key kv.Key) (val []byte, err error) {
}
// fallthrough to snapshot get.
}
return e.snapshot.Get(key)
return e.snapshot.Get(ctx, key)
}

func (e *PointGetExecutor) decodeRowValToChunk(rowVal []byte, chk *chunk.Chunk) error {
Expand Down
18 changes: 9 additions & 9 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (e *ReplaceExec) Open(ctx context.Context) error {

// removeRow removes the duplicate row and cleanup its keys in the key-value map,
// but if the to-be-removed row equals to the to-be-added row, no remove or add things to do.
func (e *ReplaceExec) removeRow(handle int64, r toBeCheckedRow) (bool, error) {
func (e *ReplaceExec) removeRow(ctx context.Context, handle int64, r toBeCheckedRow) (bool, error) {
newRow := r.row
oldRow, err := e.batchChecker.getOldRow(e.ctx, r.t, handle, e.GenExprs)
if err != nil {
Expand All @@ -75,22 +75,22 @@ func (e *ReplaceExec) removeRow(handle int64, r toBeCheckedRow) (bool, error) {
e.ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)

// Cleanup keys map, because the record was removed.
err = e.deleteDupKeys(e.ctx, r.t, [][]types.Datum{oldRow})
err = e.deleteDupKeys(ctx, e.ctx, r.t, [][]types.Datum{oldRow})
if err != nil {
return false, err
}
return false, nil
}

// replaceRow removes all duplicate rows for one row, then inserts it.
func (e *ReplaceExec) replaceRow(r toBeCheckedRow) error {
func (e *ReplaceExec) replaceRow(ctx context.Context, r toBeCheckedRow) error {
if r.handleKey != nil {
if _, found := e.dupKVs[string(r.handleKey.newKV.key)]; found {
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKV.key)
if err != nil {
return err
}
rowUnchanged, err := e.removeRow(handle, r)
rowUnchanged, err := e.removeRow(ctx, handle, r)
if err != nil {
return err
}
Expand All @@ -102,7 +102,7 @@ func (e *ReplaceExec) replaceRow(r toBeCheckedRow) error {

// Keep on removing duplicated rows.
for {
rowUnchanged, foundDupKey, err := e.removeIndexRow(r)
rowUnchanged, foundDupKey, err := e.removeIndexRow(ctx, r)
if err != nil {
return err
}
Expand All @@ -116,7 +116,7 @@ func (e *ReplaceExec) replaceRow(r toBeCheckedRow) error {
}

// No duplicated rows now, insert the row.
newHandle, err := e.addRecord(r.row)
newHandle, err := e.addRecord(ctx, r.row)
if err != nil {
return err
}
Expand All @@ -130,14 +130,14 @@ func (e *ReplaceExec) replaceRow(r toBeCheckedRow) error {
// 2. bool: true when found the duplicated key. This only means that duplicated key was found,
// and the row was removed.
// 3. error: the error.
func (e *ReplaceExec) removeIndexRow(r toBeCheckedRow) (bool, bool, error) {
func (e *ReplaceExec) removeIndexRow(ctx context.Context, r toBeCheckedRow) (bool, bool, error) {
for _, uk := range r.uniqueKeys {
if val, found := e.dupKVs[string(uk.newKV.key)]; found {
handle, err := tables.DecodeHandle(val)
if err != nil {
return false, found, err
}
rowUnchanged, err := e.removeRow(handle, r)
rowUnchanged, err := e.removeRow(ctx, handle, r)
if err != nil {
return false, found, err
}
Expand Down Expand Up @@ -172,7 +172,7 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error {
}
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(uint64(len(newRows)))
for _, r := range e.toBeCheckedRows {
err = e.replaceRow(r)
err = e.replaceRow(ctx, r)
if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -83,6 +84,12 @@ type TableReaderExecutor struct {

// Open initialzes necessary variables for using this executor.
func (e *TableReaderExecutor) Open(ctx context.Context) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("TableReaderExecutor.Open", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaDistSQL)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

Expand Down
Loading