Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: support stable result mode (#25971) #26084

Merged
merged 3 commits into from
Jul 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,16 @@ func (s *testSerialSuite1) TestSetVar(c *C) {
tk.MustQuery(`show warnings`).Check(testkit.Rows())
tk.MustExec("set @@tidb_enable_clustered_index = 'int_only'")
tk.MustQuery(`show warnings`).Check(testkit.Rows("Warning 1287 'INT_ONLY' is deprecated and will be removed in a future release. Please use 'ON' or 'OFF' instead"))

// test for tidb_enable_stable_result_mode
tk.MustQuery(`select @@tidb_enable_stable_result_mode`).Check(testkit.Rows("0"))
tk.MustExec(`set global tidb_enable_stable_result_mode = 1`)
tk.MustQuery(`select @@global.tidb_enable_stable_result_mode`).Check(testkit.Rows("1"))
tk.MustExec(`set global tidb_enable_stable_result_mode = 0`)
tk.MustQuery(`select @@global.tidb_enable_stable_result_mode`).Check(testkit.Rows("0"))
tk.MustExec(`set tidb_enable_stable_result_mode=1`)
tk.MustQuery(`select @@global.tidb_enable_stable_result_mode`).Check(testkit.Rows("0"))
tk.MustQuery(`select @@tidb_enable_stable_result_mode`).Check(testkit.Rows("1"))
}

func (s *testSuite5) TestTruncateIncorrectIntSessionVar(c *C) {
Expand Down
11 changes: 11 additions & 0 deletions planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var IsReadOnly func(node ast.Node, vars *variable.SessionVars) bool
const (
flagGcSubstitute uint64 = 1 << iota
flagPrunColumns
flagStabilizeResults
flagBuildKeyInfo
flagDecorrelate
flagEliminateAgg
Expand All @@ -64,6 +65,7 @@ const (
var optRuleList = []logicalOptRule{
&gcSubstituter{},
&columnPruner{},
&resultsStabilizer{},
&buildKeySolver{},
&decorrelateSolver{},
&aggregationEliminator{},
Expand Down Expand Up @@ -124,12 +126,21 @@ func CheckTableLock(ctx sessionctx.Context, is infoschema.InfoSchema, vs []visit
return nil
}

func checkStableResultMode(sctx sessionctx.Context) bool {
s := sctx.GetSessionVars()
st := s.StmtCtx
return s.EnableStableResultMode && (!st.InInsertStmt && !st.InUpdateStmt && !st.InDeleteStmt && !st.InLoadDataStmt)
}

// DoOptimize optimizes a logical plan to a physical plan.
func DoOptimize(ctx context.Context, sctx sessionctx.Context, flag uint64, logic LogicalPlan) (PhysicalPlan, float64, error) {
// if there is something after flagPrunColumns, do flagPrunColumnsAgain
if flag&flagPrunColumns > 0 && flag-flagPrunColumns > flagPrunColumns {
flag |= flagPrunColumnsAgain
}
if checkStableResultMode(sctx) {
flag |= flagStabilizeResults
}
logic, err := logicalOptimize(ctx, flag, logic)
if err != nil {
return nil, 0, err
Expand Down
5 changes: 5 additions & 0 deletions planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,11 @@ type PointPlanVal struct {

// TryFastPlan tries to use the PointGetPlan for the query.
func TryFastPlan(ctx sessionctx.Context, node ast.Node) (p Plan) {
if checkStableResultMode(ctx) {
// the rule of stabilizing results has not taken effect yet, so cannot generate a plan here in this mode
return nil
}

ctx.GetSessionVars().PlanID = 0
ctx.GetSessionVars().PlanColumnID = 0
switch x := node.(type) {
Expand Down
120 changes: 120 additions & 0 deletions planner/core/rule_stabilize_results.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package core

import (
"context"

"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/planner/util"
)

/*
resultsStabilizer stabilizes query results.
NOTE: it's not a common rule for all queries, it's specially implemented for a few customers.
Results of some queries are not stable, for example:
create table t (a int); insert into t values (1), (2); select a from t;
In the case above, the result can be `1 2` or `2 1`, which is not stable.
This rule stabilizes results by modifying or injecting a Sort operator:
1. iterate the plan from the root, and ignore all input-order operators (Sel/Proj/Limit);
2. when meeting the first non-input-order operator,
2.1. if it's a Sort, update it by appending all output columns into its order-by list,
2.2. otherwise, inject a new Sort upon this operator.
*/
type resultsStabilizer struct {
}

func (rs *resultsStabilizer) optimize(ctx context.Context, lp LogicalPlan) (LogicalPlan, error) {
stable := rs.completeSort(lp)
if !stable {
lp = rs.injectSort(lp)
}
return lp, nil
}

func (rs *resultsStabilizer) completeSort(lp LogicalPlan) bool {
if rs.isInputOrderKeeper(lp) {
return rs.completeSort(lp.Children()[0])
} else if sort, ok := lp.(*LogicalSort); ok {
cols := sort.Schema().Columns // sort results by all output columns
if handleCol := rs.extractHandleCol(sort.Children()[0]); handleCol != nil {
cols = []*expression.Column{handleCol} // sort results by the handle column if we can get it
}
for _, col := range cols {
exist := false
for _, byItem := range sort.ByItems {
if col.Equal(nil, byItem.Expr) {
exist = true
break
}
}
if !exist {
sort.ByItems = append(sort.ByItems, &util.ByItems{Expr: col})
}
}
return true
}
return false
}

func (rs *resultsStabilizer) injectSort(lp LogicalPlan) LogicalPlan {
if rs.isInputOrderKeeper(lp) {
lp.SetChildren(rs.injectSort(lp.Children()[0]))
return lp
}

byItems := make([]*util.ByItems, 0, len(lp.Schema().Columns))
cols := lp.Schema().Columns
if handleCol := rs.extractHandleCol(lp); handleCol != nil {
cols = []*expression.Column{handleCol}
}
for _, col := range cols {
byItems = append(byItems, &util.ByItems{Expr: col})
}
sort := LogicalSort{
ByItems: byItems,
}.Init(lp.SCtx(), lp.SelectBlockOffset())
sort.SetChildren(lp)
return sort
}

func (rs *resultsStabilizer) isInputOrderKeeper(lp LogicalPlan) bool {
switch lp.(type) {
case *LogicalSelection, *LogicalProjection, *LogicalLimit:
return true
}
return false
}

// extractHandleCols does the best effort to get the handle column.
func (rs *resultsStabilizer) extractHandleCol(lp LogicalPlan) *expression.Column {
switch x := lp.(type) {
case *LogicalSelection, *LogicalLimit:
handleCol := rs.extractHandleCol(lp.Children()[0])
if x.Schema().Contains(handleCol) {
// some Projection Operator might be inlined, so check the column again here
return handleCol
}
case *DataSource:
handleCol := x.getPKIsHandleCol()
if handleCol != nil {
return handleCol
}
}
return nil
}

func (rs *resultsStabilizer) name() string {
return "stabilize_results"
}
189 changes: 189 additions & 0 deletions planner/core/rule_stabilize_results_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package core_test

import (
"math"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/util/kvcache"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testutil"
)

var _ = Suite(&testRuleStabilizeResults{})
var _ = SerialSuites(&testRuleStabilizeResultsSerial{})

type testRuleStabilizeResultsSerial struct {
store kv.Storage
dom *domain.Domain
}

func (s *testRuleStabilizeResultsSerial) SetUpTest(c *C) {
var err error
s.store, s.dom, err = newStoreWithBootstrap()
c.Assert(err, IsNil)
}

func (s *testRuleStabilizeResultsSerial) TestPlanCache(c *C) {
tk := testkit.NewTestKit(c, s.store)
orgEnable := plannercore.PreparedPlanCacheEnabled()
defer func() {
plannercore.SetPreparedPlanCache(orgEnable)
}()
plannercore.SetPreparedPlanCache(true)
var err error
tk.Se, err = session.CreateSession4TestWithOpt(s.store, &session.Opt{
PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64),
})
c.Assert(err, IsNil)

tk.MustExec("use test")
tk.MustExec("set tidb_enable_stable_result_mode=1")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int primary key, b int, c int, d int, key(b))")
tk.MustExec("prepare s1 from 'select * from t where a > ? limit 10'")
tk.MustExec("set @a = 10")
tk.MustQuery("execute s1 using @a").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustQuery("execute s1 using @a").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) // plan cache is still working
}

func (s *testRuleStabilizeResultsSerial) TestSQLBinding(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set tidb_enable_stable_result_mode=1")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int primary key, b int, c int, d int, key(b))")
tk.MustQuery("explain select * from t where a > 0 limit 1").Check(testkit.Rows(
"Limit_12 1.00 root offset:0, count:1",
"└─TableReader_22 1.00 root data:Limit_21",
" └─Limit_21 1.00 cop[tikv] offset:0, count:1",
" └─TableRangeScan_20 1.00 cop[tikv] table:t range:(0,+inf], keep order:true, stats:pseudo"))

tk.MustExec("create session binding for select * from t where a>0 limit 1 using select * from t use index(b) where a>0 limit 1")
tk.MustQuery("explain select * from t where a > 0 limit 1").Check(testkit.Rows(
"TopN_9 1.00 root test.t.a, offset:0, count:1",
"└─IndexLookUp_19 1.00 root ",
" ├─TopN_18(Build) 1.00 cop[tikv] test.t.a, offset:0, count:1",
" │ └─Selection_17 3333.33 cop[tikv] gt(test.t.a, 0)",
" │ └─IndexFullScan_15 10000.00 cop[tikv] table:t, index:b(b) keep order:false, stats:pseudo",
" └─TableRowIDScan_16(Probe) 1.00 cop[tikv] table:t keep order:false, stats:pseudo"))
}

type testRuleStabilizeResults struct {
store kv.Storage
dom *domain.Domain

testData testutil.TestData
}

func (s *testRuleStabilizeResults) SetUpSuite(c *C) {
var err error
s.store, s.dom, err = newStoreWithBootstrap()
c.Assert(err, IsNil)

s.testData, err = testutil.LoadTestSuiteData("testdata", "stable_result_mode_suite")
c.Assert(err, IsNil)
}

func (s *testRuleStabilizeResults) TearDownSuite(c *C) {
c.Assert(s.testData.GenerateOutputIfNeeded(), IsNil)
}

func (s *testRuleStabilizeResults) runTestData(c *C, tk *testkit.TestKit, name string) {
var input []string
var output []struct {
Plan []string
}
s.testData.GetTestCasesByName(name, c, &input, &output)
c.Assert(len(input), Equals, len(output))
for i := range input {
s.testData.OnRecord(func() {
output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + input[i]).Rows())
})
tk.MustQuery("explain " + input[i]).Check(testkit.Rows(output[i].Plan...))
}
}

func (s *testRuleStabilizeResults) TestStableResultMode(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set tidb_enable_stable_result_mode=1")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int primary key, b int, c int, d int, key(b))")
s.runTestData(c, tk, "TestStableResultMode")
}

func (s *testRuleStabilizeResults) TestStableResultModeOnDML(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set tidb_enable_stable_result_mode=1")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int primary key, b int, c int, key(b))")
s.runTestData(c, tk, "TestStableResultModeOnDML")
}

func (s *testRuleStabilizeResults) TestStableResultModeOnSubQuery(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set tidb_enable_stable_result_mode=1")
tk.MustExec("drop table if exists t1")
tk.MustExec("drop table if exists t2")
tk.MustExec("create table t1 (a int primary key, b int, c int, d int, key(b))")
tk.MustExec("create table t2 (a int primary key, b int, c int, d int, key(b))")
s.runTestData(c, tk, "TestStableResultModeOnSubQuery")
}

func (s *testRuleStabilizeResults) TestStableResultModeOnJoin(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set tidb_enable_stable_result_mode=1")
tk.MustExec("drop table if exists t1")
tk.MustExec("drop table if exists t2")
tk.MustExec("create table t1 (a int primary key, b int, c int, d int, key(b))")
tk.MustExec("create table t2 (a int primary key, b int, c int, d int, key(b))")
s.runTestData(c, tk, "TestStableResultModeOnJoin")
}

func (s *testRuleStabilizeResults) TestStableResultModeOnOtherOperators(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set tidb_enable_stable_result_mode=1")
tk.MustExec("drop table if exists t1")
tk.MustExec("drop table if exists t2")
tk.MustExec("create table t1 (a int primary key, b int, c int, d int, unique key(b))")
tk.MustExec("create table t2 (a int primary key, b int, c int, d int, unique key(b))")
s.runTestData(c, tk, "TestStableResultModeOnOtherOperators")
}

func (s *testRuleStabilizeResults) TestStableResultModeOnPartitionTable(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("set tidb_enable_stable_result_mode=1")
tk.MustExec("drop table if exists thash")
tk.MustExec("drop table if exists trange")
tk.MustExec("create table thash (a int primary key, b int, c int, d int) partition by hash(a) partitions 4")
tk.MustExec(`create table trange (a int primary key, b int, c int, d int) partition by range(a) (
partition p0 values less than (100),
partition p1 values less than (200),
partition p2 values less than (300),
partition p3 values less than (400))`)
s.runTestData(c, tk, "TestStableResultModeOnPartitionTable")
}
Loading