Skip to content

Commit

Permalink
planner: fix incompatibility issues between plan-cache and expr-black…
Browse files Browse the repository at this point in the history
…list (#42606)

ref #36598
  • Loading branch information
qw4990 authored Mar 27, 2023
1 parent 36f82bd commit 7442a66
Show file tree
Hide file tree
Showing 10 changed files with 60 additions and 16 deletions.
2 changes: 1 addition & 1 deletion executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (e *DeallocateExec) Next(ctx context.Context, req *chunk.Chunk) error {
if e.ctx.GetSessionVars().EnablePreparedPlanCache {
bindSQL, _ := plannercore.GetBindSQL4PlanCache(e.ctx, preparedObj)
cacheKey, err := plannercore.NewPlanCacheKey(vars, preparedObj.StmtText, preparedObj.StmtDB, prepared.SchemaVersion,
0, bindSQL)
0, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load())
if err != nil {
return err
}
Expand Down
8 changes: 3 additions & 5 deletions executor/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,16 +282,14 @@ func TestPreparePlanCache4Blacklist(t *testing.T) {
tk.MustExec("ADMIN reload expr_pushdown_blacklist;")

tk.MustExec("execute stmt;")
tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("1"))
tk.MustQuery("select @@last_plan_from_cache;").Check(testkit.Rows("0"))
tk.MustExec("execute stmt;")
tkProcess = tk.Session().ShowProcess()
ps = []*util.ProcessInfo{tkProcess}
tk.Session().SetSessionManager(&testkit.MockSessionManager{PS: ps})
res = tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID))
// The expressions can still be pushed down to tikv.
require.Equal(t, 3, len(res.Rows()))
require.Contains(t, res.Rows()[1][0], "Selection")
require.Equal(t, "gt(test.t.a, 2), lt(test.t.a, 2)", res.Rows()[1][4])
// The expressions can not be pushed down to tikv.
require.Equal(t, 4, len(res.Rows()))

res = tk.MustQuery("explain format = 'brief' SELECT * FROM t WHERE a < 2 and a > 2;")
require.Equal(t, 4, len(res.Rows()))
Expand Down
19 changes: 19 additions & 0 deletions executor/reload_expr_pushdown_blacklist.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package executor
import (
"context"
"strings"
"time"

"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -67,10 +68,28 @@ func LoadExprPushdownBlacklist(sctx sessionctx.Context) (err error) {
}
newBlocklist[name] = value
}
if isSameExprPushDownBlackList(newBlocklist, expression.DefaultExprPushDownBlacklist.Load().(map[string]uint32)) {
return nil
}
expression.ExprPushDownBlackListReloadTimeStamp.Store(time.Now().UnixNano())
expression.DefaultExprPushDownBlacklist.Store(newBlocklist)
return nil
}

// isSameExprPushDownBlackList checks whether two exprPushDownBlacklist are the same.
func isSameExprPushDownBlackList(l1, l2 map[string]uint32) bool {
if len(l1) != len(l2) {
return false
}
for k, v1 := range l1 {
v2, ok := l2[k]
if !ok || v1 != v2 {
return false
}
}
return true
}

// funcName2Alias indicates map of the origin function name to the name used in TiDB.
var funcName2Alias = map[string]string{
"and": ast.LogicAnd,
Expand Down
5 changes: 5 additions & 0 deletions expression/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -1346,9 +1346,14 @@ func IsPushDownEnabled(name string, storeType kv.StoreType) bool {
// DefaultExprPushDownBlacklist indicates the expressions which can not be pushed down to TiKV.
var DefaultExprPushDownBlacklist *atomic.Value

// ExprPushDownBlackListReloadTimeStamp is used to record the last time when the push-down black list is reloaded.
// This is for plan cache, when the push-down black list is updated, we invalid all cached plans to avoid error.
var ExprPushDownBlackListReloadTimeStamp *atomic.Int64

func init() {
DefaultExprPushDownBlacklist = new(atomic.Value)
DefaultExprPushDownBlacklist.Store(make(map[string]uint32))
ExprPushDownBlackListReloadTimeStamp = new(atomic.Int64)
}

func canScalarFuncPushDown(scalarFunc *ScalarFunction, pc PbConverter, storeType kv.StoreType) bool {
Expand Down
4 changes: 2 additions & 2 deletions planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func GetPlanFromSessionPlanCache(ctx context.Context, sctx sessionctx.Context,
latestSchemaVersion = domain.GetDomain(sctx).InfoSchema().SchemaMetaVersion()
}
if cacheKey, err = NewPlanCacheKey(sctx.GetSessionVars(), stmt.StmtText,
stmt.StmtDB, stmtAst.SchemaVersion, latestSchemaVersion, bindSQL); err != nil {
stmt.StmtDB, stmtAst.SchemaVersion, latestSchemaVersion, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load()); err != nil {
return nil, nil, err
}
}
Expand Down Expand Up @@ -306,7 +306,7 @@ func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared
if _, isolationReadContainTiFlash := sessVars.IsolationReadEngines[kv.TiFlash]; isolationReadContainTiFlash && !IsReadOnly(stmtAst.Stmt, sessVars) {
delete(sessVars.IsolationReadEngines, kv.TiFlash)
if cacheKey, err = NewPlanCacheKey(sessVars, stmt.StmtText, stmt.StmtDB,
stmtAst.SchemaVersion, latestSchemaVersion, bindSQL); err != nil {
stmtAst.SchemaVersion, latestSchemaVersion, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load()); err != nil {
return nil, nil, err
}
sessVars.IsolationReadEngines[kv.TiFlash] = struct{}{}
Expand Down
19 changes: 19 additions & 0 deletions planner/core/plan_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,25 @@ func TestIssue38710(t *testing.T) {
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) // can not use the cache because the types for @a and @b are not equal to the cached plan
}

func TestPlanCacheExprBlacklistCompatibility(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int)")

tk.MustExec("prepare st from 'select * from t where mod(a, 2)=1'")
tk.MustExec("execute st")
tk.MustExec("execute st")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

tk.MustExec("insert into mysql.expr_pushdown_blacklist(name) values('mod')")
tk.MustExec(`admin reload expr_pushdown_blacklist`)
tk.MustExec("execute st") // no `mod can not be pushed-down` error
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) // expr blacklist is updated
tk.MustExec("execute st")
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
}

func TestPlanCacheDiagInfo(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
5 changes: 4 additions & 1 deletion planner/core/plan_cache_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ type planCacheKey struct {
inRestrictedSQL bool
restrictedReadOnly bool
TiDBSuperReadOnly bool
ExprBlacklistTS int64 // expr-pushdown-blacklist can affect query optimization, so we need to consider it in plan cache.

memoryUsage int64 // Do not include in hash
hash []byte
Expand Down Expand Up @@ -233,6 +234,7 @@ func (key *planCacheKey) Hash() []byte {
key.hash = append(key.hash, hack.Slice(strconv.FormatBool(key.inRestrictedSQL))...)
key.hash = append(key.hash, hack.Slice(strconv.FormatBool(key.restrictedReadOnly))...)
key.hash = append(key.hash, hack.Slice(strconv.FormatBool(key.TiDBSuperReadOnly))...)
key.hash = codec.EncodeInt(key.hash, key.ExprBlacklistTS)
}
return key.hash
}
Expand Down Expand Up @@ -274,7 +276,7 @@ func SetPstmtIDSchemaVersion(key kvcache.Key, stmtText string, schemaVersion int
// Note: lastUpdatedSchemaVersion will only be set in the case of rc or for update read in order to
// differentiate the cache key. In other cases, it will be 0.
func NewPlanCacheKey(sessionVars *variable.SessionVars, stmtText, stmtDB string, schemaVersion int64,
lastUpdatedSchemaVersion int64, bindSQL string) (kvcache.Key, error) {
lastUpdatedSchemaVersion int64, bindSQL string, exprBlacklistTS int64) (kvcache.Key, error) {
if stmtText == "" {
return nil, errors.New("no statement text")
}
Expand Down Expand Up @@ -302,6 +304,7 @@ func NewPlanCacheKey(sessionVars *variable.SessionVars, stmtText, stmtDB string,
inRestrictedSQL: sessionVars.InRestrictedSQL,
restrictedReadOnly: variable.RestrictedReadOnly.Load(),
TiDBSuperReadOnly: variable.VarTiDBSuperReadOnly.Load(),
ExprBlacklistTS: exprBlacklistTS,
}
for k, v := range sessionVars.IsolationReadEngines {
key.isolationReadEngines[k] = v
Expand Down
10 changes: 5 additions & 5 deletions planner/core/plan_cache_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,22 @@ func TestCacheKey(t *testing.T) {
ctx.GetSessionVars().InRestrictedSQL = false
variable.RestrictedReadOnly.Store(false)
variable.VarTiDBSuperReadOnly.Store(false)
key, err := NewPlanCacheKey(ctx.GetSessionVars(), "", "test", 1, 1, "")
key, err := NewPlanCacheKey(ctx.GetSessionVars(), "", "test", 1, 1, "", 0)
if err.Error() != "no statement text" {
t.Fail() // no statement text
}
key, err = NewPlanCacheKey(ctx.GetSessionVars(), "select 1", "", 1, 1, "")
key, err = NewPlanCacheKey(ctx.GetSessionVars(), "select 1", "", 1, 1, "", 0)
if err != nil {
t.Fail() // schema can be nil
}
key, err = NewPlanCacheKey(ctx.GetSessionVars(), "select 1", "test", 1, 1,
"select /*+ ignore_plan_cache() */ * from t")
"select /*+ ignore_plan_cache() */ * from t", 0)
if err != nil {
t.Fail()
}
key, err = NewPlanCacheKey(ctx.GetSessionVars(), "select 1", "test", 1, 1, "")
key, err = NewPlanCacheKey(ctx.GetSessionVars(), "select 1", "test", 1, 1, "", 0)
if err != nil {
t.Fail()
}
require.Equal(t, []byte{0x74, 0x65, 0x73, 0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x73, 0x65, 0x6c, 0x65, 0x63, 0x74, 0x20, 0x31, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x74, 0x69, 0x64, 0x62, 0x74, 0x69, 0x6b, 0x76, 0x74, 0x69, 0x66, 0x6c, 0x61, 0x73, 0x68, 0x7f, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x66, 0x61, 0x6c, 0x73, 0x65, 0x66, 0x61, 0x6c, 0x73, 0x65, 0x66, 0x61, 0x6c, 0x73, 0x65}, key.Hash())
require.Equal(t, []byte{0x74, 0x65, 0x73, 0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x73, 0x65, 0x6c, 0x65, 0x63, 0x74, 0x20, 0x31, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x74, 0x69, 0x64, 0x62, 0x74, 0x69, 0x6b, 0x76, 0x74, 0x69, 0x66, 0x6c, 0x61, 0x73, 0x68, 0x7f, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x66, 0x61, 0x6c, 0x73, 0x65, 0x66, 0x61, 0x6c, 0x73, 0x65, 0x66, 0x61, 0x6c, 0x73, 0x65, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, key.Hash())
}
2 changes: 1 addition & 1 deletion server/driver_tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (ts *TiDBStatement) Close() error {
}
bindSQL, _ := core.GetBindSQL4PlanCache(ts.ctx, preparedObj)
cacheKey, err := core.NewPlanCacheKey(ts.ctx.GetSessionVars(), preparedObj.StmtText, preparedObj.StmtDB,
preparedObj.PreparedAst.SchemaVersion, 0, bindSQL)
preparedObj.PreparedAst.SchemaVersion, 0, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func (s *session) cleanRetryInfo() {
stmtText, stmtDB = preparedObj.StmtText, preparedObj.StmtDB
bindSQL, _ := plannercore.GetBindSQL4PlanCache(s, preparedObj)
cacheKey, err = plannercore.NewPlanCacheKey(s.sessionVars, stmtText, stmtDB, preparedAst.SchemaVersion,
0, bindSQL)
0, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load())
if err != nil {
logutil.Logger(s.currentCtx).Warn("clean cached plan failed", zap.Error(err))
return
Expand Down

0 comments on commit 7442a66

Please sign in to comment.