Skip to content

Commit

Permalink
cherry pick pingcap#20713 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
crazycs520 authored and ti-srebot committed Nov 4, 2020
1 parent 3a40109 commit 68d3cb3
Show file tree
Hide file tree
Showing 10 changed files with 323 additions and 15 deletions.
4 changes: 3 additions & 1 deletion executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,9 @@ func (e *InsertRuntimeStat) String() string {
if e.Prefetch > 0 {
buf.WriteString(fmt.Sprintf("check_insert:{total_time:%v, mem_insert_time:%v, prefetch:%v", e.CheckInsertTime, e.CheckInsertTime-e.Prefetch, e.Prefetch))
if e.SnapshotRuntimeStats != nil {
buf.WriteString(fmt.Sprintf(", rpc:{%s}", e.SnapshotRuntimeStats.String()))
if rpc := e.SnapshotRuntimeStats.String(); len(rpc) > 0 {
buf.WriteString(fmt.Sprintf(", rpc:{%s}", rpc))
}
}
buf.WriteString("}")
} else {
Expand Down
21 changes: 10 additions & 11 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,6 @@ type LoadDataExec struct {
loadDataInfo *LoadDataInfo
}

// NewLoadDataInfo returns a LoadDataInfo structure, and it's only used for tests now.
func NewLoadDataInfo(ctx sessionctx.Context, row []types.Datum, tbl table.Table, cols []*table.Column) *LoadDataInfo {
insertVal := &InsertValues{baseExecutor: newBaseExecutor(ctx, nil, 0), Table: tbl}
return &LoadDataInfo{
row: row,
InsertValues: insertVal,
Table: tbl,
Ctx: ctx,
}
}

// Next implements the Executor Next interface.
func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.GrowAndReset(e.maxChunkSize)
Expand Down Expand Up @@ -99,6 +88,8 @@ func (e *LoadDataExec) Open(ctx context.Context) error {
if e.loadDataInfo.insertColumns != nil {
e.loadDataInfo.initEvalBuffer()
}
// Init for runtime stats.
e.loadDataInfo.collectRuntimeStatsEnabled()
return nil
}

Expand Down Expand Up @@ -433,6 +424,14 @@ func (e *LoadDataInfo) InsertData(ctx context.Context, prevData, curData []byte)

// CheckAndInsertOneBatch is used to commit one transaction batch full filled data
func (e *LoadDataInfo) CheckAndInsertOneBatch(ctx context.Context, rows [][]types.Datum, cnt uint64) error {
if e.stats != nil && e.stats.BasicRuntimeStats != nil {
// Since this method will not call by executor Next,
// so we need record the basic executor runtime stats by ourself.
start := time.Now()
defer func() {
e.stats.BasicRuntimeStats.Record(time.Since(start), 0)
}()
}
var err error
if cnt == 0 {
return err
Expand Down
6 changes: 6 additions & 0 deletions planner/core/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,12 @@ func (p Insert) Init(ctx sessionctx.Context) *Insert {
return &p
}

// Init initializes LoadData.
func (p LoadData) Init(ctx sessionctx.Context) *LoadData {
p.basePlan = newBasePlan(ctx, plancodec.TypeLoadData, 0)
return &p
}

// Init initializes LogicalShow.
func (p LogicalShow) Init(ctx sessionctx.Context) *LogicalShow {
p.baseLogicalPlan = newBaseLogicalPlan(ctx, plancodec.TypeShow, &p, 0)
Expand Down
15 changes: 15 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2457,6 +2457,7 @@ func (b *PlanBuilder) buildSelectPlanOfInsert(ctx context.Context, insert *ast.I
}

func (b *PlanBuilder) buildLoadData(ctx context.Context, ld *ast.LoadDataStmt) (Plan, error) {
<<<<<<< HEAD
p := &LoadData{
IsLocal: ld.IsLocal,
OnDuplicate: ld.OnDuplicate,
Expand All @@ -2467,6 +2468,20 @@ func (b *PlanBuilder) buildLoadData(ctx context.Context, ld *ast.LoadDataStmt) (
LinesInfo: ld.LinesInfo,
IgnoreLines: ld.IgnoreLines,
}
=======
p := LoadData{
IsLocal: ld.IsLocal,
OnDuplicate: ld.OnDuplicate,
Path: ld.Path,
Table: ld.Table,
Columns: ld.Columns,
FieldsInfo: ld.FieldsInfo,
LinesInfo: ld.LinesInfo,
IgnoreLines: ld.IgnoreLines,
ColumnAssignments: ld.ColumnAssignments,
ColumnsAndUserVars: ld.ColumnsAndUserVars,
}.Init(b.ctx)
>>>>>>> ae5dc3f69... executor: fix issue of load data statement doesn't record into slow query and statements_summary (#20713)
user := b.ctx.GetSessionVars().User
var insertErr error
if user != nil {
Expand Down
49 changes: 47 additions & 2 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/arena"
Expand Down Expand Up @@ -1370,34 +1371,78 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {
err = cc.writeMultiResultset(ctx, rss, false)
}
} else {
<<<<<<< HEAD
loadDataInfo := cc.ctx.Value(executor.LoadDataVarKey)
if loadDataInfo != nil {
defer cc.ctx.SetValue(executor.LoadDataVarKey, nil)
if err = cc.handleLoadData(ctx, loadDataInfo.(*executor.LoadDataInfo)); err != nil {
return err
}
=======
handled, err := cc.handleQuerySpecial(ctx, status)
if handled {
execStmt := cc.ctx.Value(session.ExecStmtVarKey)
if execStmt != nil {
execStmt.(*executor.ExecStmt).FinishExecuteStmt(0, err == nil, false)
}
}
if err != nil {
return err
>>>>>>> ae5dc3f69... executor: fix issue of load data statement doesn't record into slow query and statements_summary (#20713)
}

<<<<<<< HEAD
loadStats := cc.ctx.Value(executor.LoadStatsVarKey)
if loadStats != nil {
defer cc.ctx.SetValue(executor.LoadStatsVarKey, nil)
if err = cc.handleLoadStats(ctx, loadStats.(*executor.LoadStatsInfo)); err != nil {
return err
}
}

=======
func (cc *clientConn) handleQuerySpecial(ctx context.Context, status uint16) (bool, error) {
handled := false
loadDataInfo := cc.ctx.Value(executor.LoadDataVarKey)
if loadDataInfo != nil {
handled = true
defer cc.ctx.SetValue(executor.LoadDataVarKey, nil)
if err := cc.handleLoadData(ctx, loadDataInfo.(*executor.LoadDataInfo)); err != nil {
return handled, err
>>>>>>> ae5dc3f69... executor: fix issue of load data statement doesn't record into slow query and statements_summary (#20713)
}

<<<<<<< HEAD
indexAdvise := cc.ctx.Value(executor.IndexAdviseVarKey)
if indexAdvise != nil {
defer cc.ctx.SetValue(executor.IndexAdviseVarKey, nil)
err = cc.handleIndexAdvise(ctx, indexAdvise.(*executor.IndexAdviseInfo))
if err != nil {
return err
}
=======
loadStats := cc.ctx.Value(executor.LoadStatsVarKey)
if loadStats != nil {
handled = true
defer cc.ctx.SetValue(executor.LoadStatsVarKey, nil)
if err := cc.handleLoadStats(ctx, loadStats.(*executor.LoadStatsInfo)); err != nil {
return handled, err
>>>>>>> ae5dc3f69... executor: fix issue of load data statement doesn't record into slow query and statements_summary (#20713)
}

<<<<<<< HEAD
err = cc.writeOK(ctx)
}
return err
=======
indexAdvise := cc.ctx.Value(executor.IndexAdviseVarKey)
if indexAdvise != nil {
handled = true
defer cc.ctx.SetValue(executor.IndexAdviseVarKey, nil)
if err := cc.handleIndexAdvise(ctx, indexAdvise.(*executor.IndexAdviseInfo)); err != nil {
return handled, err
}
}
return handled, cc.writeOkWith(ctx, cc.ctx.LastMessage(), cc.ctx.AffectedRows(), cc.ctx.LastInsertID(), status, cc.ctx.WarningCount())
>>>>>>> ae5dc3f69... executor: fix issue of load data statement doesn't record into slow query and statements_summary (#20713)
}

// handleFieldList returns the field list for a table.
Expand Down
58 changes: 58 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"path/filepath"
"regexp"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -442,6 +443,63 @@ func (cli *testServerClient) runTestLoadDataWithSelectIntoOutfile(c *C, server *
}
})
}
func (cli *testServerClient) runTestLoadDataForSlowLog(c *C, server *Server) {
path := "/tmp/load_data_test.csv"
fp, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
c.Assert(err, IsNil)
c.Assert(fp, NotNil)
defer func() {
err = fp.Close()
c.Assert(err, IsNil)
err = os.Remove(path)
c.Assert(err, IsNil)
}()
_, err = fp.WriteString(
"1 1\n" +
"2 2\n" +
"3 3\n" +
"4 4\n" +
"5 5\n")
c.Assert(err, IsNil)

cli.runTestsOnNewDB(c, func(config *mysql.Config) {
config.AllowAllFiles = true
config.Params = map[string]string{"sql_mode": "''"}
}, "load_data_slow_query", func(dbt *DBTest) {
dbt.mustExec("create table t_slow (a int key, b int)")
defer func() {
dbt.mustExec("set tidb_slow_log_threshold=300;")
dbt.mustExec("set @@global.tidb_enable_stmt_summary=0")
}()
dbt.mustExec("set tidb_slow_log_threshold=0;")
dbt.mustExec("set @@global.tidb_enable_stmt_summary=1")
query := fmt.Sprintf("load data local infile %q into table t_slow", path)
dbt.mustExec(query)
dbt.mustExec("insert ignore into t_slow values (1,1);")

checkPlan := func(rows *sql.Rows, expectPlan string) {
dbt.Check(rows.Next(), IsTrue, Commentf("unexpected data"))
var plan sql.NullString
err = rows.Scan(&plan)
dbt.Check(err, IsNil)
planStr := strings.ReplaceAll(plan.String, "\t", " ")
planStr = strings.ReplaceAll(planStr, "\n", " ")
c.Assert(planStr, Matches, expectPlan)
}

// Test for record slow log for load data statement.
rows := dbt.mustQuery(fmt.Sprintf("select plan from information_schema.slow_query where query like 'load data local infile %% into table t_slow;' order by time desc limit 1"))
expectedPlan := ".*LoadData.* time.* loops.* prepare.* check_insert.* mem_insert_time:.* prefetch.* rpc.* commit_txn.*"
checkPlan(rows, expectedPlan)
// Test for record statements_summary for load data statement.
rows = dbt.mustQuery(fmt.Sprintf("select plan from information_schema.STATEMENTS_SUMMARY where QUERY_SAMPLE_TEXT like 'load data local infile %%' limit 1"))
checkPlan(rows, expectedPlan)
// Test log normal statement after executing load date.
rows = dbt.mustQuery(fmt.Sprintf("select plan from information_schema.slow_query where query = 'insert ignore into t_slow values (1,1);' order by time desc limit 1"))
expectedPlan = ".*Insert.* time.* loops.* prepare.* check_insert.* mem_insert_time:.* prefetch.* rpc.*"
checkPlan(rows, expectedPlan)
})
}

func (cli *testServerClient) runTestLoadData(c *C, server *Server) {
// create a file and write data.
Expand Down
4 changes: 4 additions & 0 deletions server/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/testkit"
)

Expand Down Expand Up @@ -87,6 +88,8 @@ func (ts *tidbTestSuiteBase) SetUpSuite(c *C) {
cfg.Status.ReportStatus = true
cfg.Status.StatusPort = ts.statusPort
cfg.Performance.TCPKeepAlive = true
err = logutil.InitLogger(cfg.Log.ToLogConfig())
c.Assert(err, IsNil)

server, err := NewServer(cfg, ts.tidbdrv)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -140,6 +143,7 @@ func (ts *tidbTestSuite) TestPreparedTimestamp(c *C) {
func (ts *tidbTestSerialSuite) TestLoadData(c *C) {
ts.runTestLoadData(c, ts.server)
ts.runTestLoadDataWithSelectIntoOutfile(c, ts.server)
ts.runTestLoadDataForSlowLog(c, ts.server)
}

func (ts *tidbTestSerialSuite) TestStmtCount(c *C) {
Expand Down
Loading

0 comments on commit 68d3cb3

Please sign in to comment.