Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: make explain support explain anaylze (#7827)(#7888) #7925

Merged
merged 3 commits into from
Oct 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ast/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ func (n *TraceStmt) Accept(v Visitor) (Node, bool) {
type ExplainStmt struct {
stmtNode

Stmt StmtNode
Format string
Stmt StmtNode
Format string
Analyze bool
}

// Accept implements Node Accept interface.
Expand Down
4 changes: 2 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,12 @@ func (a *ExecStmt) logSlowQuery(txnTS uint64, succ bool) {
if sessVars.InRestrictedSQL {
internal = "[INTERNAL] "
}
execDetail := sessVars.StmtCtx.GetExecDetails()
if costTime < threshold {
logutil.SlowQueryLogger.Debugf(
"[QUERY] %vcost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
internal, costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
internal, costTime, execDetail, succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
} else {
execDetail := sessVars.StmtCtx.GetExecDetails()
logutil.SlowQueryLogger.Warnf(
"[SLOW_QUERY] %vcost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
internal, costTime, execDetail, succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
Expand Down
10 changes: 9 additions & 1 deletion executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package executor

import (
"sync"
"time"

"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -501,6 +502,10 @@ func (w *HashAggFinalWorker) run(ctx sessionctx.Context, waitGroup *sync.WaitGro

// Next implements the Executor Next interface.
func (e *HashAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.isUnparallelExec {
return errors.Trace(e.unparallelExec(ctx, chk))
Expand Down Expand Up @@ -756,8 +761,11 @@ func (e *StreamAggExec) Close() error {

// Next implements the Executor Next interface.
func (e *StreamAggExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()

for !e.executed && chk.NumRows() < e.maxChunkSize {
err := e.consumeOneGroup(ctx, chk)
if err != nil {
Expand Down
15 changes: 8 additions & 7 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
Expand Down Expand Up @@ -448,13 +449,12 @@ func (b *executorBuilder) buildLimit(v *plannercore.PhysicalLimit) Executor {
func (b *executorBuilder) buildPrepare(v *plannercore.Prepare) Executor {
base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID())
base.initCap = chunk.ZeroCapacity
e := &PrepareExec{
return &PrepareExec{
baseExecutor: base,
is: b.is,
name: v.Name,
sqlText: v.SQLText,
}
return e
}

func (b *executorBuilder) buildExecute(v *plannercore.Execute) Executor {
Expand Down Expand Up @@ -659,14 +659,15 @@ func (b *executorBuilder) buildTrace(v *plannercore.Trace) Executor {

// buildExplain builds a explain executor. `e.rows` collects final result to `ExplainExec`.
func (b *executorBuilder) buildExplain(v *plannercore.Explain) Executor {
e := &ExplainExec{
explainExec := &ExplainExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
explain: v,
}
e.rows = make([][]string, 0, len(v.Rows))
for _, row := range v.Rows {
e.rows = append(e.rows, row)
if v.Analyze {
b.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl()
explainExec.analyzeExec = b.build(v.ExecPlan)
}
return e
return explainExec
}

func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) Executor {
Expand Down
17 changes: 15 additions & 2 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sort"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/pingcap/tidb/distsql"
Expand Down Expand Up @@ -243,6 +244,10 @@ func (e *IndexReaderExecutor) Close() error {

// Next implements the Executor Next interface.
func (e *IndexReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
err := e.result.Next(ctx, chk)
if err != nil {
e.feedback.Invalidate()
Expand Down Expand Up @@ -474,7 +479,7 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha
}

func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, handles []int64) (Executor, error) {
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, &TableReaderExecutor{
tableReaderExec := &TableReaderExecutor{
baseExecutor: newBaseExecutor(e.ctx, e.schema, e.id+"_tableReader"),
table: e.table,
physicalTableID: e.physicalTableID,
Expand All @@ -483,7 +488,11 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, handles []in
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
corColInFilter: e.corColInTblSide,
plans: e.tblPlans,
}, handles)
}
// We assign `nil` to `runtimeStats` to forbidden `TableWorker` driven `IndexLookupExecutor`'s runtime stats collecting,
// because TableWorker information isn't showing in explain result now.
tableReaderExec.runtimeStats = nil
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, handles)
if err != nil {
log.Error(err)
return nil, errors.Trace(err)
Expand Down Expand Up @@ -512,6 +521,10 @@ func (e *IndexLookUpExecutor) Close() error {

// Next implements Exec Next interface.
func (e *IndexLookUpExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
for {
resultTask, err := e.getResultTask()
Expand Down
35 changes: 35 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/cznic/mathutil"
"github.com/pingcap/tidb/ast"
Expand All @@ -35,12 +36,14 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)

var (
_ Executor = &baseExecutor{}
_ Executor = &CheckTableExec{}
_ Executor = &HashAggExec{}
_ Executor = &LimitExec{}
Expand Down Expand Up @@ -71,6 +74,7 @@ type baseExecutor struct {
maxChunkSize int
children []Executor
retFieldTypes []*types.FieldType
runtimeStats *execdetails.RuntimeStats
}

// Open initializes children recursively and "childrenResults" according to children's schemas.
Expand Down Expand Up @@ -127,6 +131,9 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id strin
initCap: ctx.GetSessionVars().MaxChunkSize,
maxChunkSize: ctx.GetSessionVars().MaxChunkSize,
}
if ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
e.runtimeStats = e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.Get(e.id)
}
if schema != nil {
cols := schema.Columns
e.retFieldTypes = make([]*types.FieldType, len(cols))
Expand Down Expand Up @@ -168,6 +175,10 @@ type CancelDDLJobsExec struct {

// Next implements the Executor Next interface.
func (e *CancelDDLJobsExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if e.cursor >= len(e.jobIDs) {
return nil
Expand Down Expand Up @@ -610,6 +621,10 @@ type LimitExec struct {

// Next implements the Executor Next interface.
func (e *LimitExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.cursor >= e.end {
return nil
Expand Down Expand Up @@ -729,6 +744,10 @@ func (e *TableDualExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (e *TableDualExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.numReturned >= e.numDualRows {
return nil
Expand Down Expand Up @@ -780,6 +799,10 @@ func (e *SelectionExec) Close() error {

// Next implements the Executor Next interface.
func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)

if !e.batched {
Expand Down Expand Up @@ -855,6 +878,10 @@ type TableScanExec struct {

// Next implements the Executor Next interface.
func (e *TableScanExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if e.isVirtualTable {
return errors.Trace(e.nextChunk4InfoSchema(ctx, chk))
Expand Down Expand Up @@ -955,6 +982,10 @@ func (e *MaxOneRowExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (e *MaxOneRowExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.evaluated {
return nil
Expand Down Expand Up @@ -1097,6 +1128,10 @@ func (e *UnionExec) resultPuller(ctx context.Context, childID int) {

// Next implements the Executor Next interface.
func (e *UnionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.GrowAndReset(e.maxChunkSize)
if !e.initialized {
e.initialize(ctx)
Expand Down
46 changes: 44 additions & 2 deletions executor/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package executor

import (
"github.com/cznic/mathutil"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/util/chunk"
"golang.org/x/net/context"
)
Expand All @@ -23,18 +24,39 @@ import (
type ExplainExec struct {
baseExecutor

rows [][]string
cursor int
explain *core.Explain
analyzeExec Executor
rows [][]string
cursor int
}

// Open implements the Executor Open interface.
func (e *ExplainExec) Open(ctx context.Context) error {
if e.analyzeExec != nil {
return e.analyzeExec.Open(ctx)
}
return nil
}

// Close implements the Executor Close interface.
func (e *ExplainExec) Close() error {
if e.analyzeExec != nil {
e.analyzeExec.Close()
}
e.rows = nil
return nil
}

// Next implements the Executor Next interface.
func (e *ExplainExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.rows == nil {
var err error
e.rows, err = e.generateExplainInfo(ctx)
if err != nil {
return err
}
}

chk.GrowAndReset(e.maxChunkSize)
if e.cursor >= len(e.rows) {
return nil
Expand All @@ -49,3 +71,23 @@ func (e *ExplainExec) Next(ctx context.Context, chk *chunk.Chunk) error {
e.cursor += numCurRows
return nil
}

func (e *ExplainExec) generateExplainInfo(ctx context.Context) ([][]string, error) {
if e.analyzeExec != nil {
chk := e.analyzeExec.newFirstChunk()
for {
err := e.analyzeExec.Next(ctx, chk)
if err != nil {
return nil, err
}
if chk.NumRows() == 0 {
break
}
}
}
e.explain.RenderResult()
if e.analyzeExec != nil {
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = nil
}
return e.explain.Rows, nil
}
5 changes: 5 additions & 0 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"runtime"
"sort"
"sync"
"time"
"unsafe"

"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -189,6 +190,10 @@ func (e *IndexLookUpJoin) newInnerWorker(taskCh chan *lookUpJoinTask) *innerWork

// Next implements the Executor interface.
func (e *IndexLookUpJoin) Next(ctx context.Context, chk *chunk.Chunk) error {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
e.joinResult.Reset()
for {
Expand Down
9 changes: 9 additions & 0 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -483,6 +484,10 @@ func (e *HashJoinExec) join2Chunk(workerID uint, outerChk *chunk.Chunk, joinResu
// step 1. fetch data from inner child and build a hash table;
// step 2. fetch data from outer child in a background goroutine and probe the hash table in multiple join workers.
func (e *HashJoinExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
if !e.prepared {
e.innerFinished = make(chan error, 1)
go util.WithRecovery(func() { e.fetchInnerAndBuildHashTable(ctx) }, e.finishFetchInnerAndBuildHashTable)
Expand Down Expand Up @@ -696,6 +701,10 @@ func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error {

// Next implements the Executor interface.
func (e *NestedLoopApplyExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) {
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
for {
if e.innerIter == nil || e.innerIter.Current() == e.innerIter.End() {
Expand Down
Loading