Skip to content

Commit

Permalink
planner: support stable result mode (#25971)
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Jul 6, 2021
1 parent 2ce464b commit c24a90f
Show file tree
Hide file tree
Showing 10 changed files with 948 additions and 0 deletions.
10 changes: 10 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,16 @@ func (s *testSerialSuite1) TestSetVar(c *C) {
tk.MustQuery(`show warnings`).Check(testkit.Rows())
tk.MustExec("set @@tidb_enable_clustered_index = 'int_only'")
tk.MustQuery(`show warnings`).Check(testkit.Rows("Warning 1287 'INT_ONLY' is deprecated and will be removed in a future release. Please use 'ON' or 'OFF' instead"))

// test for tidb_enable_stable_result_mode
tk.MustQuery(`select @@tidb_enable_stable_result_mode`).Check(testkit.Rows("0"))
tk.MustExec(`set global tidb_enable_stable_result_mode = 1`)
tk.MustQuery(`select @@global.tidb_enable_stable_result_mode`).Check(testkit.Rows("1"))
tk.MustExec(`set global tidb_enable_stable_result_mode = 0`)
tk.MustQuery(`select @@global.tidb_enable_stable_result_mode`).Check(testkit.Rows("0"))
tk.MustExec(`set tidb_enable_stable_result_mode=1`)
tk.MustQuery(`select @@global.tidb_enable_stable_result_mode`).Check(testkit.Rows("0"))
tk.MustQuery(`select @@tidb_enable_stable_result_mode`).Check(testkit.Rows("1"))
}

func (s *testSuite5) TestTruncateIncorrectIntSessionVar(c *C) {
Expand Down
11 changes: 11 additions & 0 deletions planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var IsReadOnly func(node ast.Node, vars *variable.SessionVars) bool
const (
flagGcSubstitute uint64 = 1 << iota
flagPrunColumns
flagStabilizeResults
flagBuildKeyInfo
flagDecorrelate
flagEliminateAgg
Expand All @@ -65,6 +66,7 @@ const (
var optRuleList = []logicalOptRule{
&gcSubstituter{},
&columnPruner{},
&resultsStabilizer{},
&buildKeySolver{},
&decorrelateSolver{},
&aggregationEliminator{},
Expand Down Expand Up @@ -132,12 +134,21 @@ func CheckTableLock(ctx sessionctx.Context, is infoschema.InfoSchema, vs []visit
return nil
}

func checkStableResultMode(sctx sessionctx.Context) bool {
s := sctx.GetSessionVars()
st := s.StmtCtx
return s.EnableStableResultMode && (!st.InInsertStmt && !st.InUpdateStmt && !st.InDeleteStmt && !st.InLoadDataStmt)
}

// DoOptimize optimizes a logical plan to a physical plan.
func DoOptimize(ctx context.Context, sctx sessionctx.Context, flag uint64, logic LogicalPlan) (PhysicalPlan, float64, error) {
// if there is something after flagPrunColumns, do flagPrunColumnsAgain
if flag&flagPrunColumns > 0 && flag-flagPrunColumns > flagPrunColumns {
flag |= flagPrunColumnsAgain
}
if checkStableResultMode(sctx) {
flag |= flagStabilizeResults
}
logic, err := logicalOptimize(ctx, flag, logic)
if err != nil {
return nil, 0, err
Expand Down
5 changes: 5 additions & 0 deletions planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,11 @@ type PointPlanVal struct {

// TryFastPlan tries to use the PointGetPlan for the query.
func TryFastPlan(ctx sessionctx.Context, node ast.Node) (p Plan) {
if checkStableResultMode(ctx) {
// the rule of stabilizing results has not taken effect yet, so cannot generate a plan here in this mode
return nil
}

ctx.GetSessionVars().PlanID = 0
ctx.GetSessionVars().PlanColumnID = 0
switch x := node.(type) {
Expand Down
125 changes: 125 additions & 0 deletions planner/core/rule_stabilize_results.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package core

import (
"context"

"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/planner/util"
)

/*
resultsStabilizer stabilizes query results.
NOTE: it's not a common rule for all queries, it's specially implemented for a few customers.
Results of some queries are not stable, for example:
create table t (a int); insert into t values (1), (2); select a from t;
In the case above, the result can be `1 2` or `2 1`, which is not stable.
This rule stabilizes results by modifying or injecting a Sort operator:
1. iterate the plan from the root, and ignore all input-order operators (Sel/Proj/Limit);
2. when meeting the first non-input-order operator,
2.1. if it's a Sort, update it by appending all output columns into its order-by list,
2.2. otherwise, inject a new Sort upon this operator.
*/
type resultsStabilizer struct {
}

func (rs *resultsStabilizer) optimize(ctx context.Context, lp LogicalPlan) (LogicalPlan, error) {
stable := rs.completeSort(lp)
if !stable {
lp = rs.injectSort(lp)
}
return lp, nil
}

func (rs *resultsStabilizer) completeSort(lp LogicalPlan) bool {
if rs.isInputOrderKeeper(lp) {
return rs.completeSort(lp.Children()[0])
} else if sort, ok := lp.(*LogicalSort); ok {
cols := sort.Schema().Columns // sort results by all output columns
if handleCol := rs.extractHandleCol(sort.Children()[0]); handleCol != nil {
cols = []*expression.Column{handleCol} // sort results by the handle column if we can get it
}
for _, col := range cols {
exist := false
for _, byItem := range sort.ByItems {
if col.Equal(nil, byItem.Expr) {
exist = true
break
}
}
if !exist {
sort.ByItems = append(sort.ByItems, &util.ByItems{Expr: col})
}
}
return true
}
return false
}

func (rs *resultsStabilizer) injectSort(lp LogicalPlan) LogicalPlan {
if rs.isInputOrderKeeper(lp) {
lp.SetChildren(rs.injectSort(lp.Children()[0]))
return lp
}

byItems := make([]*util.ByItems, 0, len(lp.Schema().Columns))
cols := lp.Schema().Columns
if handleCol := rs.extractHandleCol(lp); handleCol != nil {
cols = []*expression.Column{handleCol}
}
for _, col := range cols {
byItems = append(byItems, &util.ByItems{Expr: col})
}
sort := LogicalSort{
ByItems: byItems,
}.Init(lp.SCtx(), lp.SelectBlockOffset())
sort.SetChildren(lp)
return sort
}

func (rs *resultsStabilizer) isInputOrderKeeper(lp LogicalPlan) bool {
switch lp.(type) {
case *LogicalSelection, *LogicalProjection, *LogicalLimit:
return true
}
return false
}

// extractHandleCols does the best effort to get the handle column.
func (rs *resultsStabilizer) extractHandleCol(lp LogicalPlan) *expression.Column {
switch x := lp.(type) {
case *LogicalSelection, *LogicalLimit:
handleCol := rs.extractHandleCol(lp.Children()[0])
if x.Schema().Contains(handleCol) {
// some Projection Operator might be inlined, so check the column again here
return handleCol
}
case *DataSource:
if x.tableInfo.IsCommonHandle {
// Currently we deliberately don't support common handle case for simplicity.
return nil
}
handleCol := x.getPKIsHandleCol()
if handleCol != nil {
return handleCol
}
}
return nil
}

func (rs *resultsStabilizer) name() string {
return "stabilize_results"
}
Loading

0 comments on commit c24a90f

Please sign in to comment.