Skip to content

Commit

Permalink
planner: support stable result mode (#25971) (#26003)
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Jul 8, 2021
1 parent 517e000 commit 385bea5
Show file tree
Hide file tree
Showing 12 changed files with 897 additions and 1 deletion.
10 changes: 10 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,16 @@ func (s *testSuite5) TestSetVar(c *C) {
tk.MustQuery(`select @@session.tidb_slow_log_masking;`).Check(testkit.Rows("0"))
tk.MustExec("set session tidb_slow_log_masking = 1")
tk.MustQuery(`select @@session.tidb_slow_log_masking;`).Check(testkit.Rows("1"))

// test for tidb_enable_stable_result_mode
tk.MustQuery(`select @@tidb_enable_stable_result_mode`).Check(testkit.Rows("0"))
tk.MustExec(`set global tidb_enable_stable_result_mode = 1`)
tk.MustQuery(`select @@global.tidb_enable_stable_result_mode`).Check(testkit.Rows("1"))
tk.MustExec(`set global tidb_enable_stable_result_mode = 0`)
tk.MustQuery(`select @@global.tidb_enable_stable_result_mode`).Check(testkit.Rows("0"))
tk.MustExec(`set tidb_enable_stable_result_mode=1`)
tk.MustQuery(`select @@global.tidb_enable_stable_result_mode`).Check(testkit.Rows("0"))
tk.MustQuery(`select @@tidb_enable_stable_result_mode`).Check(testkit.Rows("1"))
}

func (s *testSuite5) TestTruncateIncorrectIntSessionVar(c *C) {
Expand Down
11 changes: 11 additions & 0 deletions planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var AllowCartesianProduct = atomic.NewBool(true)
const (
flagGcSubstitute uint64 = 1 << iota
flagPrunColumns
flagStabilizeResults
flagBuildKeyInfo
flagDecorrelate
flagEliminateAgg
Expand All @@ -59,6 +60,7 @@ const (
var optRuleList = []logicalOptRule{
&gcSubstituter{},
&columnPruner{},
&resultsStabilizer{},
&buildKeySolver{},
&decorrelateSolver{},
&aggregationEliminator{},
Expand Down Expand Up @@ -119,12 +121,21 @@ func CheckTableLock(ctx sessionctx.Context, is infoschema.InfoSchema, vs []visit
return nil
}

func checkStableResultMode(sctx sessionctx.Context) bool {
s := sctx.GetSessionVars()
st := s.StmtCtx
return s.EnableStableResultMode && (!st.InInsertStmt && !st.InUpdateStmt && !st.InDeleteStmt && !st.InLoadDataStmt)
}

// DoOptimize optimizes a logical plan to a physical plan.
func DoOptimize(ctx context.Context, sctx sessionctx.Context, flag uint64, logic LogicalPlan) (PhysicalPlan, float64, error) {
// if there is something after flagPrunColumns, do flagPrunColumnsAgain
if flag&flagPrunColumns > 0 && flag-flagPrunColumns > flagPrunColumns {
flag |= flagPrunColumnsAgain
}
if checkStableResultMode(sctx) {
flag |= flagStabilizeResults
}
logic, err := logicalOptimize(ctx, flag, logic)
if err != nil {
return nil, 0, err
Expand Down
5 changes: 5 additions & 0 deletions planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,11 @@ func (p *BatchPointGetPlan) GetCost(cols []*expression.Column) float64 {

// TryFastPlan tries to use the PointGetPlan for the query.
func TryFastPlan(ctx sessionctx.Context, node ast.Node) (p Plan) {
if checkStableResultMode(ctx) {
// the rule of stabilizing results has not taken effect yet, so cannot generate a plan here in this mode
return nil
}

ctx.GetSessionVars().PlanID = 0
ctx.GetSessionVars().PlanColumnID = 0
switch x := node.(type) {
Expand Down
120 changes: 120 additions & 0 deletions planner/core/rule_stabilize_results.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package core

import (
"context"

"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/planner/util"
)

/*
resultsStabilizer stabilizes query results.
NOTE: it's not a common rule for all queries, it's specially implemented for a few customers.
Results of some queries are not stable, for example:
create table t (a int); insert into t values (1), (2); select a from t;
In the case above, the result can be `1 2` or `2 1`, which is not stable.
This rule stabilizes results by modifying or injecting a Sort operator:
1. iterate the plan from the root, and ignore all input-order operators (Sel/Proj/Limit);
2. when meeting the first non-input-order operator,
2.1. if it's a Sort, update it by appending all output columns into its order-by list,
2.2. otherwise, inject a new Sort upon this operator.
*/
type resultsStabilizer struct {
}

func (rs *resultsStabilizer) optimize(ctx context.Context, lp LogicalPlan) (LogicalPlan, error) {
stable := rs.completeSort(lp)
if !stable {
lp = rs.injectSort(lp)
}
return lp, nil
}

func (rs *resultsStabilizer) completeSort(lp LogicalPlan) bool {
if rs.isInputOrderKeeper(lp) {
return rs.completeSort(lp.Children()[0])
} else if sort, ok := lp.(*LogicalSort); ok {
cols := sort.Schema().Columns // sort results by all output columns
if handleCol := rs.extractHandleCol(sort.Children()[0]); handleCol != nil {
cols = []*expression.Column{handleCol} // sort results by the handle column if we can get it
}
for _, col := range cols {
exist := false
for _, byItem := range sort.ByItems {
if col.Equal(nil, byItem.Expr) {
exist = true
break
}
}
if !exist {
sort.ByItems = append(sort.ByItems, &util.ByItems{Expr: col})
}
}
return true
}
return false
}

func (rs *resultsStabilizer) injectSort(lp LogicalPlan) LogicalPlan {
if rs.isInputOrderKeeper(lp) {
lp.SetChildren(rs.injectSort(lp.Children()[0]))
return lp
}

byItems := make([]*util.ByItems, 0, len(lp.Schema().Columns))
cols := lp.Schema().Columns
if handleCol := rs.extractHandleCol(lp); handleCol != nil {
cols = []*expression.Column{handleCol}
}
for _, col := range cols {
byItems = append(byItems, &util.ByItems{Expr: col})
}
sort := LogicalSort{
ByItems: byItems,
}.Init(lp.SCtx(), lp.SelectBlockOffset())
sort.SetChildren(lp)
return sort
}

func (rs *resultsStabilizer) isInputOrderKeeper(lp LogicalPlan) bool {
switch lp.(type) {
case *LogicalSelection, *LogicalProjection, *LogicalLimit:
return true
}
return false
}

// extractHandleCols does the best effort to get the handle column.
func (rs *resultsStabilizer) extractHandleCol(lp LogicalPlan) *expression.Column {
switch x := lp.(type) {
case *LogicalSelection, *LogicalLimit:
handleCol := rs.extractHandleCol(lp.Children()[0])
if x.Schema().Contains(handleCol) {
// some Projection Operator might be inlined, so check the column again here
return handleCol
}
case *DataSource:
handleCol := x.getPKIsHandleCol()
if handleCol != nil {
return handleCol
}
}
return nil
}

func (rs *resultsStabilizer) name() string {
return "stabilize_results"
}
189 changes: 189 additions & 0 deletions planner/core/rule_stabilize_results_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package core_test

import (
"math"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/util/kvcache"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testutil"
)

var _ = Suite(&testRuleStabilizeResults{})
var _ = SerialSuites(&testRuleStabilizeResultsSerial{})

type testRuleStabilizeResultsSerial struct {
store kv.Storage
dom *domain.Domain
}

func (s *testRuleStabilizeResultsSerial) SetUpTest(c *C) {
var err error
s.store, s.dom, err = newStoreWithBootstrap()
c.Assert(err, IsNil)
}

func (s *testRuleStabilizeResultsSerial) TestPlanCache(c *C) {
tk := testkit.NewTestKit(c, s.store)
orgEnable := plannercore.PreparedPlanCacheEnabled()
defer func() {
plannercore.SetPreparedPlanCache(orgEnable)
}()
plannercore.SetPreparedPlanCache(true)
var err error
tk.Se, err = session.CreateSession4TestWithOpt(s.store, &session.Opt{
PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64),
})
c.Assert(err, IsNil)

tk.MustExec("use test")
tk.MustExec("set tidb_enable_stable_result_mode=1")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int primary key, b int, c int, d int, key(b))")
tk.MustExec("prepare s1 from 'select * from t where a > ? limit 10'")
tk.MustExec("set @a = 10")
tk.MustQuery("execute s1 using @a").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustQuery("execute s1 using @a").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) // plan cache is still working
}

func (s *testRuleStabilizeResultsSerial) TestSQLBinding(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set tidb_enable_stable_result_mode=1")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int primary key, b int, c int, d int, key(b))")
tk.MustQuery("explain select * from t where a > 0 limit 1").Check(testkit.Rows(
"Limit_11 1.00 root offset:0, count:1",
"└─TableReader_21 1.00 root data:Limit_20",
" └─Limit_20 1.00 cop[tikv] offset:0, count:1",
" └─TableRangeScan_19 1.00 cop[tikv] table:t range:(0,+inf], keep order:true, stats:pseudo"))

tk.MustExec("create session binding for select * from t where a>0 limit 1 using select * from t use index(b) where a>0 limit 1")
tk.MustQuery("explain select * from t where a > 0 limit 1").Check(testkit.Rows(
"TopN_9 1.00 root test.t.a:asc, offset:0, count:1",
"└─IndexLookUp_18 1.00 root ",
" ├─TopN_17(Build) 1.00 cop[tikv] test.t.a:asc, offset:0, count:1",
" │ └─Selection_16 3333.33 cop[tikv] gt(test.t.a, 0)",
" │ └─IndexFullScan_14 10000.00 cop[tikv] table:t, index:b(b) keep order:false, stats:pseudo",
" └─TableRowIDScan_15(Probe) 1.00 cop[tikv] table:t keep order:false, stats:pseudo"))
}

type testRuleStabilizeResults struct {
store kv.Storage
dom *domain.Domain

testData testutil.TestData
}

func (s *testRuleStabilizeResults) SetUpSuite(c *C) {
var err error
s.store, s.dom, err = newStoreWithBootstrap()
c.Assert(err, IsNil)

s.testData, err = testutil.LoadTestSuiteData("testdata", "stable_result_mode_suite")
c.Assert(err, IsNil)
}

func (s *testRuleStabilizeResults) TearDownSuite(c *C) {
c.Assert(s.testData.GenerateOutputIfNeeded(), IsNil)
}

func (s *testRuleStabilizeResults) runTestData(c *C, tk *testkit.TestKit, name string) {
var input []string
var output []struct {
Plan []string
}
s.testData.GetTestCasesByName(name, c, &input, &output)
c.Assert(len(input), Equals, len(output))
for i := range input {
s.testData.OnRecord(func() {
output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + input[i]).Rows())
})
tk.MustQuery("explain " + input[i]).Check(testkit.Rows(output[i].Plan...))
}
}

func (s *testRuleStabilizeResults) TestStableResultMode(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set tidb_enable_stable_result_mode=1")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int primary key, b int, c int, d int, key(b))")
s.runTestData(c, tk, "TestStableResultMode")
}

func (s *testRuleStabilizeResults) TestStableResultModeOnDML(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set tidb_enable_stable_result_mode=1")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int primary key, b int, c int, key(b))")
s.runTestData(c, tk, "TestStableResultModeOnDML")
}

func (s *testRuleStabilizeResults) TestStableResultModeOnSubQuery(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set tidb_enable_stable_result_mode=1")
tk.MustExec("drop table if exists t1")
tk.MustExec("drop table if exists t2")
tk.MustExec("create table t1 (a int primary key, b int, c int, d int, key(b))")
tk.MustExec("create table t2 (a int primary key, b int, c int, d int, key(b))")
s.runTestData(c, tk, "TestStableResultModeOnSubQuery")
}

func (s *testRuleStabilizeResults) TestStableResultModeOnJoin(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set tidb_enable_stable_result_mode=1")
tk.MustExec("drop table if exists t1")
tk.MustExec("drop table if exists t2")
tk.MustExec("create table t1 (a int primary key, b int, c int, d int, key(b))")
tk.MustExec("create table t2 (a int primary key, b int, c int, d int, key(b))")
s.runTestData(c, tk, "TestStableResultModeOnJoin")
}

func (s *testRuleStabilizeResults) TestStableResultModeOnOtherOperators(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set tidb_enable_stable_result_mode=1")
tk.MustExec("drop table if exists t1")
tk.MustExec("drop table if exists t2")
tk.MustExec("create table t1 (a int primary key, b int, c int, d int, unique key(b))")
tk.MustExec("create table t2 (a int primary key, b int, c int, d int, unique key(b))")
s.runTestData(c, tk, "TestStableResultModeOnOtherOperators")
}

func (s *testRuleStabilizeResults) TestStableResultModeOnPartitionTable(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set tidb_enable_stable_result_mode=1")
tk.MustExec("drop table if exists thash")
tk.MustExec("drop table if exists trange")
tk.MustExec("create table thash (a int primary key, b int, c int, d int) partition by hash(a) partitions 4")
tk.MustExec(`create table trange (a int primary key, b int, c int, d int) partition by range(a) (
partition p0 values less than (100),
partition p1 values less than (200),
partition p2 values less than (300),
partition p3 values less than (400))`)
s.runTestData(c, tk, "TestStableResultModeOnPartitionTable")
}
Loading

0 comments on commit 385bea5

Please sign in to comment.