Skip to content

Commit

Permalink
domain: support dump sql meta in plan replayer (#39863)
Browse files Browse the repository at this point in the history
close #39883
  • Loading branch information
Yisaer authored Dec 13, 2022
1 parent 222faa4 commit 4b3a442
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 16 deletions.
1 change: 1 addition & 0 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ type PlanReplayerDumpTask struct {
TblStats map[int64]interface{}

// variables used to dump the plan
StartTS uint64
SessionBindings []*bindinfo.BindRecord
EncodedPlan string
SessionVars *variable.SessionVars
Expand Down
42 changes: 37 additions & 5 deletions domain/plan_replayer_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
)

const (
// planReplayerSQLMeta indicates sql meta path for plan replayer
planReplayerSQLMeta = "sql_meta.toml"
// PlanReplayerConfigFile indicates config file path for plan replayer
PlanReplayerConfigFile = "config.toml"
// PlanReplayerMetaFile meta file path for plan replayer
Expand All @@ -55,6 +57,11 @@ const (
PlanReplayerGlobalBindingFile = "global_bindings.sql"
)

const (
// PlanReplayerSQLMetaStartTS indicates the startTS in plan replayer sql meta
PlanReplayerSQLMetaStartTS = "startTS"
)

type tableNamePair struct {
DBName string
TableName string
Expand Down Expand Up @@ -131,6 +138,7 @@ func (tne *tableNameExtractor) handleIsView(t *ast.TableName) (bool, error) {
// DumpPlanReplayerInfo will dump the information about sqls.
// The files will be organized into the following format:
/*
|-sql_meta.toml
|-meta.txt
|-schema
| |-db1.table1.schema.txt
Expand Down Expand Up @@ -164,7 +172,7 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
sessionVars := task.SessionVars
execStmts := task.ExecStmts
zw := zip.NewWriter(zf)
records := generateRecords(task)
var records []PlanReplayerStatusRecord
defer func() {
if err != nil {
logutil.BgLogger().Error("dump plan replayer failed", zap.Error(err))
Expand All @@ -183,6 +191,12 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
}
insertPlanReplayerStatus(ctx, sctx, records)
}()

// Dump SQLMeta
if err = dumpSQLMeta(zw, task); err != nil {
return err
}

// Dump config
if err = dumpConfig(zw); err != nil {
return err
Expand Down Expand Up @@ -244,10 +258,11 @@ func DumpPlanReplayerInfo(ctx context.Context, sctx sessionctx.Context,
}

if len(task.EncodedPlan) > 0 {
records = generateRecords(task)
return dumpEncodedPlan(sctx, zw, task.EncodedPlan)
}
// Dump explain
return dumpExplain(sctx, zw, execStmts, task.Analyze)
return dumpExplain(sctx, zw, task, &records)
}

func generateRecords(task *PlanReplayerDumpTask) []PlanReplayerStatusRecord {
Expand All @@ -265,6 +280,19 @@ func generateRecords(task *PlanReplayerDumpTask) []PlanReplayerStatusRecord {
return records
}

func dumpSQLMeta(zw *zip.Writer, task *PlanReplayerDumpTask) error {
cf, err := zw.Create(planReplayerSQLMeta)
if err != nil {
return errors.AddStack(err)
}
varMap := make(map[string]string)
varMap[PlanReplayerSQLMetaStartTS] = strconv.FormatUint(task.StartTS, 10)
if err := toml.NewEncoder(cf).Encode(varMap); err != nil {
return errors.AddStack(err)
}
return nil
}

func dumpConfig(zw *zip.Writer) error {
cf, err := zw.Create(PlanReplayerConfigFile)
if err != nil {
Expand Down Expand Up @@ -488,12 +516,12 @@ func dumpEncodedPlan(ctx sessionctx.Context, zw *zip.Writer, encodedPlan string)
return nil
}

func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, execStmts []ast.StmtNode, isAnalyze bool) error {
for i, stmtExec := range execStmts {
func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, task *PlanReplayerDumpTask, records *[]PlanReplayerStatusRecord) error {
for i, stmtExec := range task.ExecStmts {
sql := stmtExec.Text()
var recordSets []sqlexec.RecordSet
var err error
if isAnalyze {
if task.Analyze {
// Explain analyze
recordSets, err = ctx.(sqlexec.SQLExecutor).Execute(context.Background(), fmt.Sprintf("explain analyze %s", sql))
if err != nil {
Expand Down Expand Up @@ -522,6 +550,10 @@ func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, execStmts []ast.StmtNod
return err
}
}
*records = append(*records, PlanReplayerStatusRecord{
OriginSQL: sql,
Token: task.FileName,
})
}
return nil
}
Expand Down
13 changes: 9 additions & 4 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,17 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS
}
}
if c.Ctx.GetSessionVars().EnablePlanReplayerCapture && !c.Ctx.GetSessionVars().InRestrictedSQL {
checkPlanReplayerCaptureTask(c.Ctx, stmtNode)
startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS()
if err != nil {
return nil, err
}
checkPlanReplayerCaptureTask(c.Ctx, stmtNode, startTS)
}

return stmt, nil
}

func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode) {
func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) {
dom := domain.GetDomain(sctx)
if dom == nil {
return
Expand All @@ -178,21 +182,22 @@ func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode
for _, task := range tasks {
if task.SQLDigest == sqlDigest.String() {
if task.PlanDigest == "*" || task.PlanDigest == planDigest.String() {
sendPlanReplayerDumpTask(sqlDigest.String(), planDigest.String(), sctx, stmtNode)
sendPlanReplayerDumpTask(sqlDigest.String(), planDigest.String(), sctx, stmtNode, startTS)
return
}
}
}
}

func sendPlanReplayerDumpTask(sqlDigest, planDigest string, sctx sessionctx.Context, stmtNode ast.StmtNode) {
func sendPlanReplayerDumpTask(sqlDigest, planDigest string, sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) {
stmtCtx := sctx.GetSessionVars().StmtCtx
handle := sctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle)
dumpTask := &domain.PlanReplayerDumpTask{
PlanReplayerTaskKey: domain.PlanReplayerTaskKey{
SQLDigest: sqlDigest,
PlanDigest: planDigest,
},
StartTS: startTS,
EncodePlan: GetEncodedPlan,
TblStats: stmtCtx.TableStats,
SessionBindings: handle.GetAllBindRecord(),
Expand Down
1 change: 1 addition & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func checkFileName(s string) bool {
"global_bindings.sql",
"sql/sql0.sql",
"explain/sql0.txt",
"sql_meta.toml",
}
for _, f := range files {
if strings.Compare(f, s) == 0 {
Expand Down
22 changes: 15 additions & 7 deletions executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -133,7 +134,12 @@ func (e *PlanReplayerExec) createFile() error {
func (e *PlanReplayerDumpInfo) dump(ctx context.Context) (err error) {
fileName := e.FileName
zf := e.File
startTS, err := sessiontxn.GetTxnManager(e.ctx).GetStmtReadTS()
if err != nil {
return err
}
task := &domain.PlanReplayerDumpTask{
StartTS: startTS,
FileName: fileName,
Zf: zf,
SessionVars: e.ctx.GetSessionVars(),
Expand Down Expand Up @@ -375,21 +381,23 @@ func createSchemaAndItems(ctx sessionctx.Context, f *zip.File) error {
if err != nil {
return errors.AddStack(err)
}
sqls := strings.Split(buf.String(), ";")
if len(sqls) != 3 {
return errors.New("plan replayer: create schema and tables failed")
}
originText := buf.String()
index1 := strings.Index(originText, ";")
createDatabaseSQL := originText[:index1+1]
index2 := strings.Index(originText[index1+1:], ";")
useDatabaseSQL := originText[index1+1:][:index2+1]
createTableSQL := originText[index1+1:][index2+1:]
c := context.Background()
// create database if not exists
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, sqls[0])
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, createDatabaseSQL)
logutil.BgLogger().Debug("plan replayer: skip error", zap.Error(err))
// use database
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, sqls[1])
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, useDatabaseSQL)
if err != nil {
return err
}
// create table or view
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, sqls[2])
_, err = ctx.(sqlexec.SQLExecutor).Execute(c, createTableSQL)
if err != nil {
return err
}
Expand Down

0 comments on commit 4b3a442

Please sign in to comment.