Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

execution: support explain analyze in mpp execution. #22053

Merged
merged 25 commits into from
Jan 7, 2021
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

// DispatchMPPTasks dispathes all tasks and returns an iterator.
func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType) (SelectResult, error) {
func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int) (SelectResult, error) {
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, tasks)
if resp == nil {
err := errors.New("client returns nil response")
Expand All @@ -49,6 +49,9 @@ func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.
ctx: sctx,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
encodeType: encodeType,
copPlanIDs: planIDs,
rootPlanID: rootID,
storeType: kv.TiFlash,
}, nil

}
Expand Down Expand Up @@ -111,6 +114,7 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
sqlType: label,
memTracker: kvReq.MemTracker,
encodeType: encodetype,
storeType: kvReq.StoreType,
}, nil
}

Expand Down Expand Up @@ -146,6 +150,7 @@ func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars *kv.
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
sqlType: label,
encodeType: tipb.EncodeType_TypeDefault,
storeType: kvReq.StoreType,
}
return result, nil
}
Expand All @@ -164,6 +169,7 @@ func Checksum(ctx context.Context, client kv.Client, kvReq *kv.Request, vars *kv
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
sqlType: metrics.LblGeneral,
encodeType: tipb.EncodeType_TypeDefault,
storeType: kvReq.StoreType,
}
return result, nil
}
Expand Down
6 changes: 4 additions & 2 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ type selectResult struct {
copPlanIDs []int
rootPlanID int

storeType kv.StoreType

fetchDuration time.Duration
durationReported bool
memTracker *memory.Tracker
Expand Down Expand Up @@ -283,15 +285,15 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *tikv
r.stats.mergeCopRuntimeStats(copStats, respTime)

if copStats.ScanDetail != nil && len(r.copPlanIDs) > 0 {
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RecordScanDetail(r.copPlanIDs[len(r.copPlanIDs)-1], copStats.ScanDetail)
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RecordScanDetail(r.copPlanIDs[len(r.copPlanIDs)-1], r.storeType.Name(), copStats.ScanDetail)
}

for i, detail := range r.selectResp.GetExecutionSummaries() {
if detail != nil && detail.TimeProcessedNs != nil &&
detail.NumProducedRows != nil && detail.NumIterations != nil {
planID := r.copPlanIDs[i]
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.
RecordOneCopTask(planID, callee, detail)
RecordOneCopTask(planID, r.storeType.Name(), callee, detail)
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions distsql/select_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/util/execdetails"
Expand All @@ -27,7 +28,7 @@ import (
func (s *testSuite) TestUpdateCopRuntimeStats(c *C) {
ctx := mock.NewContext()
ctx.GetSessionVars().StmtCtx = new(stmtctx.StatementContext)
sr := selectResult{ctx: ctx}
sr := selectResult{ctx: ctx, storeType: kv.TiKV}
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, IsNil)
sr.rootPlanID = 1234
sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "a"}}, 0)
Expand All @@ -47,5 +48,5 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) {
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, NotNil)
c.Assert(len(sr.selectResp.GetExecutionSummaries()), Equals, len(sr.copPlanIDs))
sr.updateCopRuntimeStats(context.Background(), &tikv.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0)
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetCopStats(1234).String(), Equals, "tikv_task:{time:1ns, loops:1}, scan_detail: {total_process_keys: 0, total_keys: 0, rocksdb: {delete_skipped_count: 0, key_skipped_count: 0, block: {cache_hit_count: 0, read_count: 0, read_byte: 0 Bytes}}}")
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetOrCreateCopStats(1234, "tikv").String(), Equals, "tikv_task:{time:1ns, loops:1}, scan_detail: {total_process_keys: 0, total_keys: 0, rocksdb: {delete_skipped_count: 0, key_skipped_count: 0, block: {cache_hit_count: 0, read_count: 0, read_byte: 0 Bytes}}}")
}
11 changes: 10 additions & 1 deletion executor/mpp_gather.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,20 @@ func (e *MPPGather) appendMPPDispatchReq(pf *plannercore.Fragment, tasks []*kv.M
return nil
}

func collectPlanIDS(plan plannercore.PhysicalPlan, ids []int) []int {
ids = append(ids, plan.ID())
for _, child := range plan.Children() {
ids = collectPlanIDS(child, ids)
}
return ids
}

// Open decides the task counts and locations and generate exchange operators for every plan fragment.
// Then dispatch tasks to tiflash stores. If any task fails, it would cancel the rest tasks.
func (e *MPPGather) Open(ctx context.Context) (err error) {
// TODO: Move the construct tasks logic to planner, so we can see the explain results.
sender := e.originalPlan.(*plannercore.PhysicalExchangeSender)
planIDs := collectPlanIDS(e.originalPlan, nil)
rootTasks, err := plannercore.GenerateRootMPPTasks(e.ctx, e.startTS, sender, e.allocTaskID)
if err != nil {
return errors.Trace(err)
Expand All @@ -103,7 +112,7 @@ func (e *MPPGather) Open(ctx context.Context) (err error) {
if err != nil {
return errors.Trace(err)
}
e.respIter, err = distsql.DispatchMPPTasks(ctx, e.ctx, e.mppReqs, e.retFieldTypes)
e.respIter, err = distsql.DispatchMPPTasks(ctx, e.ctx, e.mppReqs, e.retFieldTypes, planIDs, e.id)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ require (
github.com/pingcap/parser v0.0.0-20201222091346-02c8ff27d0bc
github.com/pingcap/sysutil v0.0.0-20201130064824-f0c8aa6a6966
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible
github.com/pingcap/tipb v0.0.0-20201209065231-aa39b1b86217
github.com/pingcap/tipb v0.0.0-20201217093034-daf46606de18
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.9.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -671,8 +671,8 @@ github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20200417094153-7316d94df1ee/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20201209065231-aa39b1b86217 h1:Ophn4Ud/QHp1BH0FJOzbAVBW9Mw8BlX0gtWkK7ubDy0=
github.com/pingcap/tipb v0.0.0-20201209065231-aa39b1b86217/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20201217093034-daf46606de18 h1:p1ubEwK6Q6vYhjEYb6mET5z1uRh/FW1ZQA6FLsvDHmE=
github.com/pingcap/tipb v0.0.0-20201217093034-daf46606de18/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tiup v1.2.3 h1:8OCQF7sHhT6VqE8pZU1JTSogPA90OFuWWM/B746x0YY=
github.com/pingcap/tiup v1.2.3/go.mod h1:q8WzflNHjE1U49k2qstTL0clx2pKh8pkOzUFV4RTvQo=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA=
Expand Down
10 changes: 10 additions & 0 deletions planner/core/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,16 @@ func (p *PhysicalExchangeSender) ExplainInfo() string {
fmt.Fprintf(buffer, "HashPartition")
fmt.Fprintf(buffer, ", Hash Cols: %s", expression.ExplainColumnList(p.HashCols))
}
if len(p.Tasks) > 0 {
fmt.Fprintf(buffer, ", tasks: [")
for idx, task := range p.Tasks {
if idx != 0 {
fmt.Fprintf(buffer, ", ")
}
fmt.Fprintf(buffer, "%v", task.ID)
}
fmt.Fprintf(buffer, "]")
}
return buffer.String()
}

Expand Down
5 changes: 3 additions & 2 deletions planner/core/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (e *mppTaskGenerator) generateMPPTasks(s *PhysicalExchangeSender) ([]*kv.MP
if err != nil {
return nil, errors.Trace(err)
}
s.Tasks = []*kv.MPPTask{tidbTask}
s.TargetTasks = []*kv.MPPTask{tidbTask}
return rootTasks, nil
}

Expand All @@ -81,11 +81,12 @@ func (e *mppTaskGenerator) generateMPPTasksForFragment(f *Fragment) (tasks []*kv
}
for _, r := range f.ExchangeReceivers {
s := r.ChildPf.ExchangeSender
s.Tasks = tasks
s.TargetTasks = tasks
}
for _, task := range tasks {
logutil.BgLogger().Info("Dispatch mpp task", zap.Uint64("timestamp", task.StartTs), zap.Int64("ID", task.ID), zap.String("address", task.Meta.GetAddress()), zap.String("plan", ToString(f.ExchangeSender)))
}
f.ExchangeSender.Tasks = tasks
return tasks, nil
}

Expand Down
4 changes: 3 additions & 1 deletion planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,9 +856,11 @@ type PhysicalExchangeReceiver struct {
type PhysicalExchangeSender struct {
basePhysicalPlan

Tasks []*kv.MPPTask
TargetTasks []*kv.MPPTask
ExchangeType tipb.ExchangeType
HashCols []*expression.Column
// Tasks is the mpp task for current PhysicalExchangeSender
Tasks []*kv.MPPTask

Fragment *Fragment
}
Expand Down
4 changes: 2 additions & 2 deletions planner/core/plan_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,9 @@ func (e *PhysicalExchangeSender) ToPB(ctx sessionctx.Context, storeType kv.Store
return nil, errors.Trace(err)
}

encodedTask := make([][]byte, 0, len(e.Tasks))
encodedTask := make([][]byte, 0, len(e.TargetTasks))

for _, task := range e.Tasks {
for _, task := range e.TargetTasks {
encodedStr, err := task.ToPB().Marshal()
if err != nil {
return nil, errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion planner/core/stringer.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func toString(in Plan, strs []string, idxs []int) ([]string, []int) {
str = fmt.Sprintf(")")
case *PhysicalExchangeSender:
str = fmt.Sprintf("Send(")
for _, task := range x.Tasks {
for _, task := range x.TargetTasks {
str += fmt.Sprintf("%d, ", task.ID)
}
str = fmt.Sprintf(")")
Expand Down
36 changes: 30 additions & 6 deletions store/tikv/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasks

// mppResponse wraps mpp data packet.
type mppResponse struct {
pbResp *mpp.MPPDataPacket
pbResp *mpp.MPPDataPacket
detail *CopRuntimeStats
respTime time.Duration
respSize int64

err error
}
Expand All @@ -91,16 +94,26 @@ func (m *mppResponse) GetStartKey() kv.Key {

// GetExecDetails is unavailable currently.
func (m *mppResponse) GetCopRuntimeStats() *CopRuntimeStats {
return nil
return m.detail
}

// MemSize returns how many bytes of memory this response use
func (m *mppResponse) MemSize() int64 {
return int64(m.pbResp.Size())
if m.respSize != 0 {
return m.respSize
}

if m.detail != nil {
m.respSize += int64(sizeofExecDetails)
}
if m.pbResp != nil {
m.respSize += int64(m.pbResp.Size())
}
return m.respSize
}

func (m *mppResponse) RespTime() time.Duration {
return 0
return m.respTime
}

type mppIterator struct {
Expand Down Expand Up @@ -248,7 +261,7 @@ func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchReques

// TODO: cancel the whole process when some error happens
for {
err := m.handleMPPStreamResponse(resp, req)
err := m.handleMPPStreamResponse(bo, resp, req)
if err != nil {
m.sendError(err)
return
Expand Down Expand Up @@ -284,7 +297,7 @@ func (m *mppIterator) Close() error {
return nil
}

func (m *mppIterator) handleMPPStreamResponse(response *mpp.MPPDataPacket, req *kv.MPPDispatchRequest) (err error) {
func (m *mppIterator) handleMPPStreamResponse(bo *Backoffer, response *mpp.MPPDataPacket, req *kv.MPPDispatchRequest) (err error) {
if response.Error != nil {
err = errors.Errorf("other error for mpp stream: %s", response.Error.Msg)
logutil.BgLogger().Warn("other error",
Expand All @@ -296,7 +309,18 @@ func (m *mppIterator) handleMPPStreamResponse(response *mpp.MPPDataPacket, req *

resp := &mppResponse{
pbResp: response,
detail: new(CopRuntimeStats),
}

resp.detail.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond
resp.detail.BackoffSleep = make(map[string]time.Duration, len(bo.backoffTimes))
resp.detail.BackoffTimes = make(map[string]int, len(bo.backoffTimes))
for backoff := range bo.backoffTimes {
backoffName := backoff.String()
resp.detail.BackoffTimes[backoffName] = bo.backoffTimes[backoff]
resp.detail.BackoffSleep[backoffName] = time.Duration(bo.backoffSleepMS[backoff]) * time.Millisecond
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why *?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because time.Duration(bo.backoffSleepMS[backoff]) is Nanosecond based.

}
resp.detail.CalleeAddress = req.Meta.GetAddress()

m.sendToRespCh(resp)
return
Expand Down
Loading