From c8faac98cb8c5933b5911882ec304db0927106f2 Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Thu, 28 Oct 2021 14:20:48 +0800 Subject: [PATCH 1/4] planner: add more test cases about plan-cache+collation (#29201) --- planner/core/prepare_test.go | 47 ++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/planner/core/prepare_test.go b/planner/core/prepare_test.go index c74b985fe0432..d47239dae2434 100644 --- a/planner/core/prepare_test.go +++ b/planner/core/prepare_test.go @@ -321,6 +321,53 @@ func (s *testPrepareSerialSuite) TestPrepareCacheChangingParamType(c *C) { } } +func (s *testPrepareSerialSuite) TestPrepareCacheChangeCharsetCollation(c *C) { + defer testleak.AfterTest(c)() + store, dom, err := newStoreWithBootstrap() + c.Assert(err, IsNil) + tk := testkit.NewTestKit(c, store) + orgEnable := core.PreparedPlanCacheEnabled() + defer func() { + dom.Close() + err = store.Close() + c.Assert(err, IsNil) + core.SetPreparedPlanCache(orgEnable) + }() + core.SetPreparedPlanCache(true) + tk.Se, err = session.CreateSession4TestWithOpt(store, &session.Opt{ + PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64), + }) + c.Assert(err, IsNil) + + tk.MustExec(`use test`) + tk.MustExec(`drop table if exists t`) + tk.MustExec(`create table t (a varchar(64))`) + tk.MustExec(`set character_set_connection=utf8`) + + tk.MustExec(`prepare s from 'select * from t where a=?'`) + tk.MustExec(`set @x='a'`) + tk.MustExec(`execute s using @x`) + tk.MustExec(`set @x='b'`) + tk.MustExec(`execute s using @x`) + tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1")) + + tk.MustExec(`set character_set_connection=latin1`) + tk.MustExec(`set @x='c'`) + tk.MustExec(`execute s using @x`) + tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("0")) // cannot reuse the previous plan since the charset is changed + tk.MustExec(`set @x='d'`) + tk.MustExec(`execute s using @x`) + tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1")) + + tk.MustExec(`set collation_connection=binary`) + tk.MustExec(`set @x='e'`) + tk.MustExec(`execute s using @x`) + tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("0")) // cannot reuse the previous plan since the collation is changed + tk.MustExec(`set @x='f'`) + tk.MustExec(`execute s using @x`) + tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1")) +} + func (s *testPlanSerialSuite) TestPrepareCacheDeferredFunction(c *C) { defer testleak.AfterTest(c)() store, dom, err := newStoreWithBootstrap() From aef456d0d4ae9c3d98d4cb876bdfc104da991583 Mon Sep 17 00:00:00 2001 From: Zak Zhao <57036248+joccau@users.noreply.github.com> Date: Thu, 28 Oct 2021 14:40:48 +0800 Subject: [PATCH 2/4] br: Simplify long function:BackupSchemas() (#29153) --- br/pkg/backup/schema.go | 105 +++++++++++++++++++++++----------------- 1 file changed, 61 insertions(+), 44 deletions(-) diff --git a/br/pkg/backup/schema.go b/br/pkg/backup/schema.go index 34ab023d5245d..b1b0e8e16f0f6 100644 --- a/br/pkg/backup/schema.go +++ b/br/pkg/backup/schema.go @@ -21,7 +21,6 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/statistics/handle" - "github.com/pingcap/tipb/go-tipb" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -32,7 +31,7 @@ const ( DefaultSchemaConcurrency = 64 ) -type scheamInfo struct { +type schemaInfo struct { tableInfo *model.TableInfo dbInfo *model.DBInfo crc64xor uint64 @@ -44,12 +43,12 @@ type scheamInfo struct { // Schemas is task for backuping schemas. type Schemas struct { // name -> schema - schemas map[string]*scheamInfo + schemas map[string]*schemaInfo } func newBackupSchemas() *Schemas { return &Schemas{ - schemas: make(map[string]*scheamInfo), + schemas: make(map[string]*schemaInfo), } } @@ -58,7 +57,7 @@ func (ss *Schemas) addSchema( ) { name := fmt.Sprintf("%s.%s", utils.EncloseName(dbInfo.Name.L), utils.EncloseName(tableInfo.Name.L)) - ss.schemas[name] = &scheamInfo{ + ss.schemas[name] = &schemaInfo{ tableInfo: tableInfo, dbInfo: dbInfo, } @@ -101,53 +100,27 @@ func (ss *Schemas) BackupSchemas( if !skipChecksum { logger.Info("table checksum start") start := time.Now() - checksumResp, err := calculateChecksum( - ectx, schema.tableInfo, store.GetClient(), backupTS, copConcurrency) + err := schema.calculateChecksum(ectx, store.GetClient(), backupTS, copConcurrency) if err != nil { return errors.Trace(err) } - schema.crc64xor = checksumResp.Checksum - schema.totalKvs = checksumResp.TotalKvs - schema.totalBytes = checksumResp.TotalBytes logger.Info("table checksum finished", - zap.Uint64("Crc64Xor", checksumResp.Checksum), - zap.Uint64("TotalKvs", checksumResp.TotalKvs), - zap.Uint64("TotalBytes", checksumResp.TotalBytes), + zap.Uint64("Crc64Xor", schema.crc64xor), + zap.Uint64("TotalKvs", schema.totalKvs), + zap.Uint64("TotalBytes", schema.totalBytes), zap.Duration("take", time.Since(start))) } if statsHandle != nil { - jsonTable, err := statsHandle.DumpStatsToJSON( - schema.dbInfo.Name.String(), schema.tableInfo, nil) - if err != nil { + if err := schema.dumpStatsToJSON(statsHandle); err != nil { logger.Error("dump table stats failed", logutil.ShortError(err)) } - schema.stats = jsonTable } + // Send schema to metawriter - dbBytes, err := json.Marshal(schema.dbInfo) - if err != nil { - return errors.Trace(err) - } - tableBytes, err := json.Marshal(schema.tableInfo) + s, err := schema.encodeToSchema() if err != nil { return errors.Trace(err) } - var statsBytes []byte - if schema.stats != nil { - statsBytes, err = json.Marshal(schema.stats) - if err != nil { - return errors.Trace(err) - } - } - s := &backuppb.Schema{ - Db: dbBytes, - Table: tableBytes, - Crc64Xor: schema.crc64xor, - TotalKvs: schema.totalKvs, - TotalBytes: schema.totalBytes, - Stats: statsBytes, - } - if err := metaWriter.Send(s, op); err != nil { return errors.Trace(err) } @@ -168,24 +141,68 @@ func (ss *Schemas) Len() int { return len(ss.schemas) } -func calculateChecksum( +func (s *schemaInfo) calculateChecksum( ctx context.Context, - table *model.TableInfo, client kv.Client, backupTS uint64, concurrency uint, -) (*tipb.ChecksumResponse, error) { - exe, err := checksum.NewExecutorBuilder(table, backupTS). +) error { + exe, err := checksum.NewExecutorBuilder(s.tableInfo, backupTS). SetConcurrency(concurrency). Build() if err != nil { - return nil, errors.Trace(err) + return errors.Trace(err) } + checksumResp, err := exe.Execute(ctx, client, func() { // TODO: update progress here. }) + if err != nil { + return errors.Trace(err) + } + + s.crc64xor = checksumResp.Checksum + s.totalKvs = checksumResp.TotalKvs + s.totalBytes = checksumResp.TotalBytes + return nil +} + +func (s *schemaInfo) dumpStatsToJSON(statsHandle *handle.Handle) error { + jsonTable, err := statsHandle.DumpStatsToJSON( + s.dbInfo.Name.String(), s.tableInfo, nil) + if err != nil { + return errors.Trace(err) + } + + s.stats = jsonTable + return nil +} + +func (s *schemaInfo) encodeToSchema() (*backuppb.Schema, error) { + dbBytes, err := json.Marshal(s.dbInfo) + if err != nil { + return nil, errors.Trace(err) + } + + tableBytes, err := json.Marshal(s.tableInfo) if err != nil { return nil, errors.Trace(err) } - return checksumResp, nil + + var statsBytes []byte + if s.stats != nil { + statsBytes, err = json.Marshal(s.stats) + if err != nil { + return nil, errors.Trace(err) + } + } + + return &backuppb.Schema{ + Db: dbBytes, + Table: tableBytes, + Crc64Xor: s.crc64xor, + TotalKvs: s.totalKvs, + TotalBytes: s.totalBytes, + Stats: statsBytes, + }, nil } From 4b4984e6ccfd33ba929468a5d455b524eb35f863 Mon Sep 17 00:00:00 2001 From: Ma Yingchun Date: Thu, 28 Oct 2021 15:18:49 +0800 Subject: [PATCH 3/4] planner: remove duplicate predicates in the Selection operator (#28317) --- cmd/explaintest/r/explain_easy.result | 2 +- expression/constant_propagation.go | 1 + planner/core/physical_plan_test.go | 29 +++++++++++++++++++++++ planner/core/testdata/plan_suite_in.json | 6 +++++ planner/core/testdata/plan_suite_out.json | 27 ++++++++++++++------- 5 files changed, 56 insertions(+), 9 deletions(-) diff --git a/cmd/explaintest/r/explain_easy.result b/cmd/explaintest/r/explain_easy.result index 766edb8f9ed58..2eec1954d46df 100644 --- a/cmd/explaintest/r/explain_easy.result +++ b/cmd/explaintest/r/explain_easy.result @@ -509,7 +509,7 @@ StreamAgg 1.00 root funcs:count(1)->Column#22 ├─TableDual 8000.00 root rows:0 └─Projection 0.01 root test.test01.stat_date, test.test01.show_date, test.test01.region_id └─TableReader 0.01 root data:Selection - └─Selection 0.01 cop[tikv] eq(test.test01.period, 1), ge(test.test01.stat_date, 20191202), ge(test.test01.stat_date, 20191202), gt(cast(test.test01.registration_num, bigint(20) BINARY), 0), le(test.test01.stat_date, 20191202), le(test.test01.stat_date, 20191202) + └─Selection 0.01 cop[tikv] eq(test.test01.period, 1), ge(test.test01.stat_date, 20191202), gt(cast(test.test01.registration_num, bigint(20) BINARY), 0), le(test.test01.stat_date, 20191202) └─TableFullScan 10000.00 cop[tikv] table:test01 keep order:false, stats:pseudo drop table if exists t; create table t(a int, nb int not null, nc int not null); diff --git a/expression/constant_propagation.go b/expression/constant_propagation.go index e9a63b05ad10a..b007d4b79a2eb 100644 --- a/expression/constant_propagation.go +++ b/expression/constant_propagation.go @@ -349,6 +349,7 @@ func (s *propConstSolver) solve(conditions []Expression) []Expression { s.propagateConstantEQ() s.propagateColumnEQ() s.conditions = propagateConstantDNF(s.ctx, s.conditions) + s.conditions = RemoveDupExprs(s.ctx, s.conditions) return s.conditions } diff --git a/planner/core/physical_plan_test.go b/planner/core/physical_plan_test.go index 84f3631c7e473..1b3d13c02cc03 100644 --- a/planner/core/physical_plan_test.go +++ b/planner/core/physical_plan_test.go @@ -1987,3 +1987,32 @@ func (s *testPlanSuite) TestSelectionPartialPushDown(c *C) { tk.MustQuery("explain format='brief' " + ts).Check(testkit.Rows(output[i].Plan...)) } } + +func (s *testPlanSuite) TestIssue28316(c *C) { + var ( + input []string + output []struct { + SQL string + Plan []string + } + ) + s.testData.GetTestCases(c, &input, &output) + store, dom, err := newStoreWithBootstrap() + c.Assert(err, IsNil) + defer func() { + dom.Close() + store.Close() + }() + tk := testkit.NewTestKit(c, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + + for i, ts := range input { + s.testData.OnRecord(func() { + output[i].SQL = ts + output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery("explain format='brief'" + ts).Rows()) + }) + tk.MustQuery("explain format='brief' " + ts).Check(testkit.Rows(output[i].Plan...)) + } +} diff --git a/planner/core/testdata/plan_suite_in.json b/planner/core/testdata/plan_suite_in.json index 526c7af78b2b1..09233f05dd121 100644 --- a/planner/core/testdata/plan_suite_in.json +++ b/planner/core/testdata/plan_suite_in.json @@ -740,5 +740,11 @@ // Make sure row_count(tikv_selection) == row_count(index_lookup) and row_count(index_lookup) > row_count(tidb_selection) "select * from t2 use index(idx_a) where a > 1 and b > 1 and c > 1" ] + }, + { + "name": "TestIssue28316", + "cases": [ + "select * from t where t.a < 3 and t.a < 3" + ] } ] diff --git a/planner/core/testdata/plan_suite_out.json b/planner/core/testdata/plan_suite_out.json index 4d2c468590362..482075cb0349e 100644 --- a/planner/core/testdata/plan_suite_out.json +++ b/planner/core/testdata/plan_suite_out.json @@ -1476,14 +1476,12 @@ ], "Plan": [ "Sort 12500.00 root test.t1.c_int, test.t2.c_str", - "└─MergeJoin 12500.00 root inner join, left key:test.t1.c_int, test.t1.c_int, right key:test.t2.c_int, test.t2.c_int", - " ├─Sort(Build) 10000.00 root test.t2.c_int, test.t2.c_int", - " │ └─UnionScan 10000.00 root ", - " │ └─TableReader 10000.00 root data:TableFullScan", - " │ └─TableFullScan 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo", - " └─Sort(Probe) 10000.00 root test.t1.c_int, test.t1.c_int", - " └─TableReader 10000.00 root data:TableFullScan", - " └─TableFullScan 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo" + "└─MergeJoin 12500.00 root inner join, left key:test.t1.c_int, right key:test.t2.c_int", + " ├─UnionScan(Build) 10000.00 root ", + " │ └─TableReader 10000.00 root data:TableFullScan", + " │ └─TableFullScan 10000.00 cop[tikv] table:t2 keep order:true, stats:pseudo", + " └─TableReader(Probe) 10000.00 root data:TableFullScan", + " └─TableFullScan 10000.00 cop[tikv] table:t1 keep order:true, stats:pseudo" ] } ] @@ -2666,5 +2664,18 @@ ] } ] + }, + { + "Name": "TestIssue28316", + "Cases": [ + { + "SQL": "select * from t where t.a < 3 and t.a < 3", + "Plan": [ + "TableReader 3323.33 root data:Selection", + "└─Selection 3323.33 cop[tikv] lt(test.t.a, 3)", + " └─TableFullScan 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" + ] + } + ] } ] From b562dc90e7c4381b0e76c6e788b584090eb4d657 Mon Sep 17 00:00:00 2001 From: rebelice Date: Thu, 28 Oct 2021 15:42:49 +0800 Subject: [PATCH 4/4] executor: refactor plan replayer (#28956) --- executor/builder.go | 7 +- executor/executor_test.go | 37 +++ executor/plan_replayer.go | 490 +++++++++++++++++++++++++++++------- planner/core/planbuilder.go | 4 + server/conn.go | 22 -- session/session.go | 1 - 6 files changed, 440 insertions(+), 121 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 6e8615e424563..df2b87dd5f620 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -908,8 +908,11 @@ func (b *executorBuilder) buildIndexAdvise(v *plannercore.IndexAdvise) Executor func (b *executorBuilder) buildPlanReplayerSingle(v *plannercore.PlanReplayerSingle) Executor { e := &PlanReplayerSingleExec{ - baseExecutor: newBaseExecutor(b.ctx, nil, v.ID()), - info: &PlanReplayerSingleInfo{v.ExecStmt, v.Analyze, v.Load, v.File, b.ctx}, + baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), + ExecStmt: v.ExecStmt, + Analyze: v.Analyze, + Load: v.Load, + File: v.File, } return e } diff --git a/executor/executor_test.go b/executor/executor_test.go index cb1357458697b..7d7e8a806305a 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -15,6 +15,7 @@ package executor_test import ( + "archive/zip" "context" "flag" "fmt" @@ -9156,6 +9157,42 @@ func (s *testSuite) TestGetResultRowsCount(c *C) { } } +func checkFileName(s string) bool { + files := []string{ + "config.toml", + "meta.txt", + "stats/test.t_dump_single.json", + "schema/test.t_dump_single.schema.txt", + "variables.toml", + "sqls.sql", + "session_bindings.sql", + "global_bindings.sql", + "explain.txt", + } + for _, f := range files { + if strings.Compare(f, s) == 0 { + return true + } + } + return false +} + +func (s *testSuiteWithData) TestPlanReplayerDumpSingle(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t_dump_single") + tk.MustExec("create table t_dump_single(a int)") + res := tk.MustQuery("plan replayer dump explain select * from t_dump_single") + path := s.testData.ConvertRowsToStrings(res.Rows()) + + reader, err := zip.OpenReader(path[0]) + c.Assert(err, IsNil) + defer reader.Close() + for _, file := range reader.File { + c.Assert(checkFileName(file.Name), IsTrue) + } +} + func (s *testSuiteP1) TestIssue28935(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("set @@tidb_enable_vectorized_expression=true") diff --git a/executor/plan_replayer.go b/executor/plan_replayer.go index 17af9558f38d4..2bef0101e6d5b 100644 --- a/executor/plan_replayer.go +++ b/executor/plan_replayer.go @@ -17,162 +17,460 @@ package executor import ( "archive/zip" "context" - "crypto/md5" // #nosec G501 - "encoding/hex" + "crypto/rand" + "encoding/base64" + "encoding/json" "fmt" - "math/rand" "os" + "path/filepath" + "strings" "time" + "github.com/BurntSushi/toml" "github.com/pingcap/errors" + "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/printer" + "github.com/pingcap/tidb/util/sqlexec" + "go.uber.org/zap" ) -const replayerPath string = "/tmp/replayer" - -// TTL of plan replayer files -const remainedInterval float64 = 3 - -// PlanReplayerInfo saves the information of plan replayer operation. -type PlanReplayerInfo interface { - // Process dose the export/import work for reproducing sql queries. - Process() (string, error) -} - // PlanReplayerSingleExec represents a plan replayer executor. type PlanReplayerSingleExec struct { baseExecutor - info *PlanReplayerSingleInfo -} - -// PlanReplayerSingleInfo saves the information of plan replayer operation. -type PlanReplayerSingleInfo struct { ExecStmt ast.StmtNode Analyze bool Load bool File string - Ctx sessionctx.Context -} -type fileInfo struct { - StartTime time.Time - Token [16]byte + endFlag bool } -type fileList struct { - FileInfo map[string]fileInfo - TokenMap map[[16]byte]string +type tableNamePair struct { + DBName string + TableName string } -// planReplayerVarKeyType is a dummy type to avoid naming collision in context. -type planReplayerVarKeyType int - -// String defines a Stringer function for debugging and pretty printing. -func (k planReplayerVarKeyType) String() string { - return "plan_replayer_var" +type tableNameExtractor struct { + curDB string + names map[tableNamePair]struct{} } -// planReplayerFileListType is a dummy type to avoid naming collision in context. -type planReplayerFileListType int - -// String defines a Stringer function for debugging and pretty printing. -func (k planReplayerFileListType) String() string { - return "plan_replayer_file_list" +func (tne *tableNameExtractor) Enter(in ast.Node) (ast.Node, bool) { + if _, ok := in.(*ast.TableName); ok { + return in, true + } + return in, false } -// PlanReplayerVarKey is a variable key for plan replayer. -const PlanReplayerVarKey planReplayerVarKeyType = 0 - -// PlanReplayerFileList is a variable key for plan replayer's file list. -const PlanReplayerFileList planReplayerFileListType = 0 +func (tne *tableNameExtractor) Leave(in ast.Node) (ast.Node, bool) { + if t, ok := in.(*ast.TableName); ok { + tp := tableNamePair{DBName: t.Schema.L, TableName: t.Name.L} + if tp.DBName == "" { + tp.DBName = tne.curDB + } + if _, ok := tne.names[tp]; !ok { + tne.names[tp] = struct{}{} + } + } + return in, true +} // Next implements the Executor Next interface. func (e *PlanReplayerSingleExec) Next(ctx context.Context, req *chunk.Chunk) error { req.GrowAndReset(e.maxChunkSize) - if e.info.ExecStmt == nil { + if e.endFlag { + return nil + } + if e.ExecStmt == nil { return errors.New("plan replayer: sql is empty") } - val := e.ctx.Value(PlanReplayerVarKey) - if val != nil { - e.ctx.SetValue(PlanReplayerVarKey, nil) - return errors.New("plan replayer: previous plan replayer option isn't closed normally") + res, err := e.dumpSingle(filepath.Join(domain.GetPlanReplayerDirName(), fmt.Sprintf("%v", os.Getpid()))) + if err != nil { + return err } - e.ctx.SetValue(PlanReplayerVarKey, e.info) + req.AppendString(0, res) + e.endFlag = true return nil } -// Close implements the Executor Close interface. -func (e *PlanReplayerSingleExec) Close() error { +// dumpSingle will dump the information about a single sql. +// The files will be organized into the following format: +// |-meta.txt +// |-schema.sql +// |-stats +// | |-stats1.json +// | |-stats2.json +// | |-.... +// |-config.toml +// |-variables.toml +// |-bindings.sql +// |-sqls.sql +// |_explain +// |-explain.txt +// +func (e *PlanReplayerSingleExec) dumpSingle(path string) (string, error) { + // Create path + err := os.MkdirAll(path, os.ModePerm) + if err != nil { + return "", errors.AddStack(err) + } + + // Generate key and create zip file + time := time.Now().UnixNano() + b := make([]byte, 16) + _, err = rand.Read(b) + if err != nil { + return "", err + } + key := base64.URLEncoding.EncodeToString(b) + fileName := fmt.Sprintf("replayer_single_%v_%v.zip", key, time) + zf, err := os.Create(filepath.Join(path, fileName)) + if err != nil { + return "", errors.AddStack(err) + } + + // Create zip writer + zw := zip.NewWriter(zf) + defer func() { + err := zw.Close() + if err != nil { + logutil.BgLogger().Warn("Closing zip writer failed", zap.Error(err)) + } + err = zf.Close() + if err != nil { + logutil.BgLogger().Warn("Closing zip file failed", zap.Error(err)) + } + }() + + // Dump config + if err = dumpConfig(zw); err != nil { + return "", err + } + + // Dump meta + if err = dumpMeta(zw); err != nil { + return "", err + } + + // Retrieve current DB + sessionVars := e.ctx.GetSessionVars() + dbName := model.NewCIStr(sessionVars.CurrentDB) + do := domain.GetDomain(e.ctx) + + // Retrieve all tables + pairs, err := extractTableNames(e.ExecStmt, dbName.L) + if err != nil { + return "", errors.AddStack(errors.New(fmt.Sprintf("plan replayer: invalid SQL text, err: %v", err))) + } + + // Dump Schema + if err = dumpSchemas(e.ctx, zw, pairs); err != nil { + return "", err + } + + // Dump stats + if err = dumpStats(zw, pairs, do); err != nil { + return "", err + } + + // Dump variables + if err = dumpVariables(e.ctx, zw); err != nil { + return "", err + } + + // Dump sql + sql, err := zw.Create("sqls.sql") + if err != nil { + return "", nil + } + _, err = sql.Write([]byte(e.ExecStmt.Text())) + if err != nil { + return "", err + } + + // Dump session bindings + if err = dumpSessionBindings(e.ctx, zw); err != nil { + return "", err + } + + // Dump global bindings + if err = dumpGlobalBindings(e.ctx, zw); err != nil { + return "", err + } + + // Dump explain + if err = dumpExplain(e.ctx, zw, e.ExecStmt.Text(), e.Analyze); err != nil { + return "", err + } + + return filepath.Join(path, fileName), nil +} + +func dumpConfig(zw *zip.Writer) error { + cf, err := zw.Create("config.toml") + if err != nil { + return errors.AddStack(err) + } + if err := toml.NewEncoder(cf).Encode(config.GetGlobalConfig()); err != nil { + return errors.AddStack(err) + } return nil } -// Open implements the Executor Open interface. -func (e *PlanReplayerSingleExec) Open(ctx context.Context) error { +func dumpMeta(zw *zip.Writer) error { + mt, err := zw.Create("meta.txt") + if err != nil { + return errors.AddStack(err) + } + _, err = mt.Write([]byte(printer.GetTiDBInfo())) + if err != nil { + return errors.AddStack(err) + } return nil } -// Process dose the export/import work for reproducing sql queries. -func (e *PlanReplayerSingleInfo) Process() (string, error) { - // TODO: plan replayer load will be developed later - if e.Load { - return "", nil +func dumpSchemas(ctx sessionctx.Context, zw *zip.Writer, pairs map[tableNamePair]struct{}) error { + for pair := range pairs { + err := getShowCreateTable(pair, zw, ctx) + if err != nil { + return err + } } - return e.dumpSingle() + return nil } -func (e *PlanReplayerSingleInfo) dumpSingle() (string, error) { - // Create path - err := os.MkdirAll(replayerPath, os.ModePerm) +func dumpStats(zw *zip.Writer, pairs map[tableNamePair]struct{}, do *domain.Domain) error { + for pair := range pairs { + jsonTbl, err := getStatsForTable(do, pair) + if err != nil { + return err + } + statsFw, err := zw.Create(fmt.Sprintf("stats/%v.%v.json", pair.DBName, pair.TableName)) + if err != nil { + return errors.AddStack(err) + } + data, err := json.Marshal(jsonTbl) + if err != nil { + return errors.AddStack(err) + } + _, err = statsFw.Write(data) + if err != nil { + return errors.AddStack(err) + } + } + return nil +} + +func dumpVariables(ctx sessionctx.Context, zw *zip.Writer) error { + varMap := make(map[string]string) + recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "show variables") + if err != nil { + return err + } + sRows, err := resultSetToStringSlice(context.Background(), recordSets[0]) if err != nil { - return "", errors.New("plan replayer: cannot create plan replayer path") + return err + } + vf, err := zw.Create("variables.toml") + if err != nil { + return errors.AddStack(err) + } + for _, row := range sRows { + varMap[row[0]] = row[1] + } + if err := toml.NewEncoder(vf).Encode(varMap); err != nil { + return errors.AddStack(err) + } + if len(recordSets) > 0 { + if err := recordSets[0].Close(); err != nil { + return err + } } + return nil +} - // Create zip file - startTime := time.Now() - fileName := fmt.Sprintf("replayer_single_%v.zip", startTime.UnixNano()) - zf, err := os.Create(replayerPath + "/" + fileName) +func dumpSessionBindings(ctx sessionctx.Context, zw *zip.Writer) error { + recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "show bindings") if err != nil { - return "", errors.New("plan replayer: cannot create zip file") + return err } - val := e.Ctx.Value(PlanReplayerFileList) - if val == nil { - e.Ctx.SetValue(PlanReplayerFileList, fileList{FileInfo: make(map[string]fileInfo), TokenMap: make(map[[16]byte]string)}) + sRows, err := resultSetToStringSlice(context.Background(), recordSets[0]) + if err != nil { + return err + } + bf, err := zw.Create("session_bindings.sql") + if err != nil { + return errors.AddStack(err) + } + for _, row := range sRows { + fmt.Fprintf(bf, "%s\n", strings.Join(row, "\t")) + } + if len(recordSets) > 0 { + if err := recordSets[0].Close(); err != nil { + return err + } + } + return nil +} + +func dumpGlobalBindings(ctx sessionctx.Context, zw *zip.Writer) error { + recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), "show global bindings") + if err != nil { + return err + } + sRows, err := resultSetToStringSlice(context.Background(), recordSets[0]) + if err != nil { + return err + } + bf, err := zw.Create("global_bindings.sql") + if err != nil { + return errors.AddStack(err) + } + for _, row := range sRows { + fmt.Fprintf(bf, "%s\n", strings.Join(row, "\t")) + } + if len(recordSets) > 0 { + if err := recordSets[0].Close(); err != nil { + return err + } + } + return nil +} + +func dumpExplain(ctx sessionctx.Context, zw *zip.Writer, sql string, isAnalyze bool) error { + var recordSets []sqlexec.RecordSet + var err error + if isAnalyze { + // Explain analyze + recordSets, err = ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), fmt.Sprintf("explain analyze %s", sql)) + if err != nil { + return err + } } else { - // Clean outdated files - Flist := val.(fileList).FileInfo - TList := val.(fileList).TokenMap - for k, v := range Flist { - if time.Since(v.StartTime).Minutes() > remainedInterval { - err := os.Remove(replayerPath + "/" + k) + // Explain + recordSets, err = ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), fmt.Sprintf("explain %s", sql)) + if err != nil { + return err + } + } + sRows, err := resultSetToStringSlice(context.Background(), recordSets[0]) + if err != nil { + return err + } + fw, err := zw.Create("explain.txt") + if err != nil { + return errors.AddStack(err) + } + for _, row := range sRows { + fmt.Fprintf(fw, "%s\n", strings.Join(row, "\t")) + } + if len(recordSets) > 0 { + if err := recordSets[0].Close(); err != nil { + return err + } + } + return nil +} + +func extractTableNames(ExecStmt ast.StmtNode, curDB string) (map[tableNamePair]struct{}, error) { + extractor := &tableNameExtractor{ + curDB: curDB, + names: make(map[tableNamePair]struct{}), + } + ExecStmt.Accept(extractor) + return extractor.names, nil +} + +func getStatsForTable(do *domain.Domain, pair tableNamePair) (*handle.JSONTable, error) { + is := do.InfoSchema() + h := do.StatsHandle() + tbl, err := is.TableByName(model.NewCIStr(pair.DBName), model.NewCIStr(pair.TableName)) + if err != nil { + return nil, err + } + js, err := h.DumpStatsToJSON(pair.DBName, tbl.Meta(), nil) + return js, err +} + +func getShowCreateTable(pair tableNamePair, zw *zip.Writer, ctx sessionctx.Context) error { + recordSets, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), fmt.Sprintf("show create table `%v`.`%v`", pair.DBName, pair.TableName)) + if err != nil { + return err + } + sRows, err := resultSetToStringSlice(context.Background(), recordSets[0]) + if err != nil { + return err + } + fw, err := zw.Create(fmt.Sprintf("schema/%v.%v.schema.txt", pair.DBName, pair.TableName)) + if err != nil { + return errors.AddStack(err) + } + for _, row := range sRows { + fmt.Fprintf(fw, "%s\n", strings.Join(row, "\t")) + } + if len(recordSets) > 0 { + if err := recordSets[0].Close(); err != nil { + return err + } + } + return nil +} + +func resultSetToStringSlice(ctx context.Context, rs sqlexec.RecordSet) ([][]string, error) { + rows, err := getRows(ctx, rs) + if err != nil { + return nil, err + } + err = rs.Close() + if err != nil { + return nil, err + } + sRows := make([][]string, len(rows)) + for i, row := range rows { + iRow := make([]string, row.Len()) + for j := 0; j < row.Len(); j++ { + if row.IsNull(j) { + iRow[j] = "" + } else { + d := row.GetDatum(j, &rs.Fields()[j].Column.FieldType) + iRow[j], err = d.ToString() if err != nil { - logutil.BgLogger().Warn(fmt.Sprintf("Cleaning outdated file %s failed.", k)) + return nil, err } - delete(Flist, k) - delete(TList, v.Token) } } + sRows[i] = iRow } - // Generate Token - token := md5.Sum([]byte(fmt.Sprintf("%s%d", fileName, rand.Int63()))) // #nosec G401 G404 - e.Ctx.Value(PlanReplayerFileList).(fileList).FileInfo[fileName] = fileInfo{StartTime: startTime, Token: token} - e.Ctx.Value(PlanReplayerFileList).(fileList).TokenMap[token] = fileName + return sRows, nil +} - // Create zip writer - zw := zip.NewWriter(zf) - defer func() { - err := zw.Close() +func getRows(ctx context.Context, rs sqlexec.RecordSet) ([]chunk.Row, error) { + if rs == nil { + return nil, nil + } + var rows []chunk.Row + req := rs.NewChunk() + // Must reuse `req` for imitating server.(*clientConn).writeChunks + for { + err := rs.Next(ctx, req) if err != nil { - logutil.BgLogger().Warn("Closing zip writer failed.") + return nil, err } - err = zf.Close() - if err != nil { - logutil.BgLogger().Warn("Closing zip file failed.") + if req.NumRows() == 0 { + break } - }() - // TODO: DUMP PLAN REPLAYER FILES IN ZIP WRITER - return hex.EncodeToString(token[:]), nil + iter := chunk.NewIterator4Chunk(req.CopyConstruct()) + for row := iter.Begin(); row != iter.End(); row = iter.Next() { + rows = append(rows, row) + } + } + return rows, nil } diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 391ff3f0930ed..8c93a15a5a447 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -4194,6 +4194,10 @@ func buildShowSchema(s *ast.ShowStmt, isView bool, isSequence bool) (schema *exp func (b *PlanBuilder) buildPlanReplayer(pc *ast.PlanReplayerStmt) Plan { p := &PlanReplayerSingle{ExecStmt: pc.Stmt, Analyze: pc.Analyze, Load: pc.Load, File: pc.File} + schema := newColumnsWithNames(1) + schema.Append(buildColumnWithName("", "Dump_link", mysql.TypeVarchar, 128)) + p.SetSchema(schema.col2Schema()) + p.names = schema.names return p } diff --git a/server/conn.go b/server/conn.go index d7551c40a2ea9..ba4ca6457e30c 100644 --- a/server/conn.go +++ b/server/conn.go @@ -1681,15 +1681,6 @@ func (cc *clientConn) handleIndexAdvise(ctx context.Context, indexAdviseInfo *ex return nil } -// handlePlanReplayer dose the export/import work for reproducing sql queries. -func (cc *clientConn) handlePlanReplayer(ctx context.Context, info executor.PlanReplayerInfo) (string, error) { - switch info.(type) { - case *executor.PlanReplayerSingleInfo: - return info.Process() - } - return "", errors.New("plan replayer: not supporting info type") -} - func (cc *clientConn) audit(eventType plugin.GeneralEvent) { err := plugin.ForeachPlugin(plugin.Audit, func(p *plugin.Plugin) error { audit := plugin.DeclareAuditManifest(p.Manifest) @@ -1969,19 +1960,6 @@ func (cc *clientConn) handleQuerySpecial(ctx context.Context, status uint16) (bo } } - planReplayer := cc.ctx.Value(executor.PlanReplayerVarKey) - if planReplayer != nil { - handled = true - defer cc.ctx.SetValue(executor.PlanReplayerVarKey, nil) - token, err := cc.handlePlanReplayer(ctx, planReplayer.(executor.PlanReplayerInfo)) - if err != nil { - return handled, err - } - if token != "" { - return handled, cc.writeOkWith(ctx, token, cc.ctx.AffectedRows(), cc.ctx.LastInsertID(), status, cc.ctx.WarningCount()) - } - } - return handled, cc.writeOkWith(ctx, cc.ctx.LastMessage(), cc.ctx.AffectedRows(), cc.ctx.LastInsertID(), status, cc.ctx.WarningCount()) } diff --git a/session/session.go b/session/session.go index 42c9255c1f573..27782745fcc94 100644 --- a/session/session.go +++ b/session/session.go @@ -1634,7 +1634,6 @@ var querySpecialKeys = []fmt.Stringer{ executor.LoadDataVarKey, executor.LoadStatsVarKey, executor.IndexAdviseVarKey, - executor.PlanReplayerVarKey, } func (s *session) hasQuerySpecial() bool {