Skip to content

Commit

Permalink
planner: refactor the IsolationReadEngines variable for plan cache (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Reminiscent authored Aug 3, 2022
1 parent 1a0c92b commit 3847d9e
Show file tree
Hide file tree
Showing 12 changed files with 56 additions and 51 deletions.
2 changes: 1 addition & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1921,7 +1921,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
sc.OptimizerCETrace = nil

sc.SysdateIsNow = ctx.GetSessionVars().SysdateIsNow

sc.IsReadonlyStmt = plannercore.IsReadOnly(s, vars)
if _, ok := s.(*ast.AnalyzeTableStmt); ok {
sc.InitMemTracker(memory.LabelForAnalyzeMemory, -1)
sc.MemTracker.AttachTo(GlobalAnalyzeMemoryTracker)
Expand Down
8 changes: 4 additions & 4 deletions planner/core/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,13 @@ func NewPlanCacheKey(sessionVars *variable.SessionVars, stmtText, stmtDB string,
lastUpdatedSchemaVersion: lastUpdatedSchemaVersion,
sqlMode: sessionVars.SQLMode,
timezoneOffset: timezoneOffset,
isolationReadEngines: make(map[kv.StoreType]struct{}),
isolationReadEngines: sessionVars.GetAvailableIsolationReadEngines4Plan(),
selectLimit: sessionVars.SelectLimit,
bindSQL: bindSQL,
inRestrictedSQL: sessionVars.InRestrictedSQL,
restrictedReadOnly: variable.RestrictedReadOnly.Load(),
TiDBSuperReadOnly: variable.VarTiDBSuperReadOnly.Load(),
}
for k, v := range sessionVars.IsolationReadEngines {
key.isolationReadEngines[k] = v
}
return key, nil
}

Expand Down Expand Up @@ -202,6 +199,9 @@ func (s FieldSlice) CheckTypesCompatibility4PC(tps []*types.FieldType) bool {
}

// PlanCacheValue stores the cached Statement and StmtNode.
// Note: The variables' type shouldn't be put into the planCacheKey. Because for the decimal type, the different values for the decimal type may have different precise.
// If we put it to the planCacheKey, it's hard to hit the plan cache. And if there exists lots of decimal values' statements, it will cost a lot of memory.
// You can see the `CheckTypesCompatibility4PC` for more details about the type match.
type PlanCacheValue struct {
Plan Plan
OutPutNames []*types.FieldName
Expand Down
16 changes: 3 additions & 13 deletions planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ func GetPlanFromSessionPlanCache(ctx context.Context, sctx sessionctx.Context, i
}
}

return generateNewPlan(ctx, sctx, is, preparedStmt, ignorePlanCache, cacheKey,
latestSchemaVersion, isBinProtocol, varsNum, binVarTypes, txtVarTypes, bindSQL)
return generateNewPlan(ctx, sctx, is, preparedStmt, ignorePlanCache, cacheKey, isBinProtocol, varsNum, binVarTypes, txtVarTypes)
}

// parseParamTypes get parameters' types in PREPARE statement
Expand Down Expand Up @@ -198,8 +197,8 @@ func getGeneralPlan(ctx context.Context, sctx sessionctx.Context, cacheKey kvcac
// generateNewPlan call the optimizer to generate a new plan for current statement
// and try to add it to cache
func generateNewPlan(ctx context.Context, sctx sessionctx.Context, is infoschema.InfoSchema, preparedStmt *CachedPrepareStmt,
ignorePlanCache bool, cacheKey kvcache.Key, latestSchemaVersion int64, isBinProtocol bool, varsNum int, binVarTypes []byte,
txtVarTypes []*types.FieldType, bindSQL string) (Plan, []*types.FieldName, error) {
ignorePlanCache bool, cacheKey kvcache.Key, isBinProtocol bool, varsNum int, binVarTypes []byte,
txtVarTypes []*types.FieldType) (Plan, []*types.FieldName, error) {
prepared := preparedStmt.PreparedAst
sessVars := sctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx
Expand All @@ -220,15 +219,6 @@ func generateNewPlan(ctx context.Context, sctx sessionctx.Context, is infoschema
stmtCtx.SkipPlanCache = true
}
if prepared.UseCache && !stmtCtx.SkipPlanCache && !ignorePlanCache {
// rebuild key to exclude kv.TiFlash when stmt is not read only
if _, isolationReadContainTiFlash := sessVars.IsolationReadEngines[kv.TiFlash]; isolationReadContainTiFlash && !IsReadOnly(stmt, sessVars) {
delete(sessVars.IsolationReadEngines, kv.TiFlash)
if cacheKey, err = NewPlanCacheKey(sessVars, preparedStmt.StmtText, preparedStmt.StmtDB,
prepared.SchemaVersion, latestSchemaVersion, bindSQL); err != nil {
return nil, nil, err
}
sessVars.IsolationReadEngines[kv.TiFlash] = struct{}{}
}
cached := NewPlanCacheValue(p, names, stmtCtx.TblInfo2UnionScan, isBinProtocol, binVarTypes, txtVarTypes)
preparedStmt.NormalizedPlan, preparedStmt.PlanDigest = NormalizePlan(p)
stmtCtx.SetPlan(p)
Expand Down
4 changes: 2 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1180,7 +1180,7 @@ func getPossibleAccessPaths(ctx sessionctx.Context, tableHints *tableHintInfo, i
}
}

_, isolationReadEnginesHasTiKV := ctx.GetSessionVars().GetIsolationReadEngines()[kv.TiKV]
isolationReadEnginesHasTiKV := ctx.GetSessionVars().ContainSpecialIsolationRead(kv.TiKV)
for i, hint := range indexHints {
if hint.HintScope != ast.HintForScan {
continue
Expand Down Expand Up @@ -1256,7 +1256,7 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath,
if dbName.L == mysql.SystemDB {
return paths, nil
}
isolationReadEngines := ctx.GetSessionVars().GetIsolationReadEngines()
isolationReadEngines := ctx.GetSessionVars().GetAvailableIsolationReadEngines4Plan()
availableEngine := map[kv.StoreType]struct{}{}
var availableEngineStr string
for i := len(paths) - 1; i >= 0; i-- {
Expand Down
11 changes: 1 addition & 10 deletions planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,6 @@ func Optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in
}
}

// Because for write stmt, TiFlash has a different results when lock the data in point get plan. We ban the TiFlash
// engine in not read only stmt.
if _, isolationReadContainTiFlash := sessVars.IsolationReadEngines[kv.TiFlash]; isolationReadContainTiFlash && !IsReadOnly(node, sessVars) {
delete(sessVars.IsolationReadEngines, kv.TiFlash)
defer func() {
sessVars.IsolationReadEngines[kv.TiFlash] = struct{}{}
}()
}

tableHints := hint.ExtractTableHintsFromStmtNode(node, sctx)
originStmtHints, originStmtHintsOffs, warns := handleStmtHints(tableHints)
sessVars.StmtCtx.StmtHints = originStmtHints
Expand All @@ -112,7 +103,7 @@ func Optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in
}

txnManger := sessiontxn.GetTxnManager(sctx)
if _, isolationReadContainTiKV := sessVars.IsolationReadEngines[kv.TiKV]; isolationReadContainTiKV {
if sessVars.ContainSpecialIsolationRead(kv.TiKV) {
var fp core.Plan
if fpv, ok := sctx.Value(core.PointPlanKey).(core.PointPlanVal); ok {
// point plan is already tried in a multi-statement query.
Expand Down
4 changes: 2 additions & 2 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1935,9 +1935,9 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {
// When the TiFlash server seems down, we append a warning to remind the user to check the status of the TiFlash
// server and fallback to TiKV.
warns := append(parserWarns, stmtctx.SQLWarn{Level: stmtctx.WarnLevelError, Err: err})
delete(cc.ctx.GetSessionVars().IsolationReadEngines, kv.TiFlash)
cc.ctx.GetSessionVars().StmtCtx.InTiFlashFallBack2TiKV = true
_, err = cc.handleStmt(ctx, stmt, warns, i == len(stmts)-1)
cc.ctx.GetSessionVars().IsolationReadEngines[kv.TiFlash] = struct{}{}
cc.ctx.GetSessionVars().StmtCtx.InTiFlashFallBack2TiKV = false
if err != nil {
break
}
Expand Down
6 changes: 2 additions & 4 deletions server/conn_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,9 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e
// When the TiFlash server seems down, we append a warning to remind the user to check the status of the TiFlash
// server and fallback to TiKV.
prevErr := err
delete(cc.ctx.GetSessionVars().IsolationReadEngines, kv.TiFlash)
defer func() {
cc.ctx.GetSessionVars().IsolationReadEngines[kv.TiFlash] = struct{}{}
}()
cc.ctx.GetSessionVars().StmtCtx.InTiFlashFallBack2TiKV = true
_, err = cc.executePreparedStmtAndWriteResult(ctx, stmt, args, useCursor)
cc.ctx.GetSessionVars().StmtCtx.InTiFlashFallBack2TiKV = false
// We append warning after the retry because `ResetContextOfStmt` may be called during the retry, which clears warnings.
cc.ctx.GetSessionVars().StmtCtx.AppendError(prevErr)
}
Expand Down
2 changes: 1 addition & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (s *session) cleanRetryInfo() {
for i, stmtID := range retryInfo.DroppedPreparedStmtIDs {
if planCacheEnabled {
if i > 0 && preparedAst != nil {
plannercore.SetPstmtIDSchemaVersion(cacheKey, stmtText, preparedAst.SchemaVersion, s.sessionVars.IsolationReadEngines)
plannercore.SetPstmtIDSchemaVersion(cacheKey, stmtText, preparedAst.SchemaVersion, s.sessionVars.GetAvailableIsolationReadEngines4Plan())
}
if !s.sessionVars.IgnorePreparedCacheCloseStmt { // keep the plan in cache
s.PreparedPlanCache().Delete(cacheKey)
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ type StatementContext struct {
SkipASCIICheck bool
SkipUTF8MB4Check bool
MultiSchemaInfo *model.MultiSchemaInfo
IsReadonlyStmt bool
InTiFlashFallBack2TiKV bool
// If the select statement was like 'select * from t as of timestamp ...' or in a stale read transaction
// or is affected by the tidb_read_staleness session variable, then the statement will be makred as isStaleness
// in stmtCtx
Expand Down
38 changes: 31 additions & 7 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,8 +979,8 @@ type SessionVars struct {
// this variable only take effect when `tidb_follower_read` = 'closest-adaptive'
ReplicaClosestReadThreshold int64

// IsolationReadEngines is used to isolation read, tidb only read from the stores whose engine type is in the engines.
IsolationReadEngines map[kv.StoreType]struct{}
// isolationReadEngines is used to isolation read, tidb only read from the stores whose engine type is in the engines.
isolationReadEngines map[kv.StoreType]struct{}

PlannerSelectBlockAsName []ast.HintTable

Expand Down Expand Up @@ -1410,7 +1410,7 @@ func NewSessionVars() *SessionVars {
UsePlanBaselines: DefTiDBUsePlanBaselines,
EvolvePlanBaselines: DefTiDBEvolvePlanBaselines,
EnableExtendedStats: false,
IsolationReadEngines: make(map[kv.StoreType]struct{}),
isolationReadEngines: make(map[kv.StoreType]struct{}),
LockWaitTimeout: DefInnodbLockWaitTimeout * 1000,
MetricSchemaStep: DefTiDBMetricSchemaStep,
MetricSchemaRangeDuration: DefTiDBMetricSchemaRangeDuration,
Expand Down Expand Up @@ -1485,11 +1485,11 @@ func NewSessionVars() *SessionVars {
for _, engine := range config.GetGlobalConfig().IsolationRead.Engines {
switch engine {
case kv.TiFlash.Name():
vars.IsolationReadEngines[kv.TiFlash] = struct{}{}
vars.isolationReadEngines[kv.TiFlash] = struct{}{}
case kv.TiKV.Name():
vars.IsolationReadEngines[kv.TiKV] = struct{}{}
vars.isolationReadEngines[kv.TiKV] = struct{}{}
case kv.TiDB.Name():
vars.IsolationReadEngines[kv.TiDB] = struct{}{}
vars.isolationReadEngines[kv.TiDB] = struct{}{}
}
}
if !EnableLocalTxn.Load() {
Expand Down Expand Up @@ -1578,9 +1578,33 @@ func (s *SessionVars) GetSplitRegionTimeout() time.Duration {
return time.Duration(s.WaitSplitRegionTimeout) * time.Second
}

// ContainSpecialIsolationRead checks whether we can access the special read engines.
func (s *SessionVars) ContainSpecialIsolationRead(engineType kv.StoreType) bool {
_, ok := s.isolationReadEngines[engineType]
return ok
}

// GetIsolationReadEngines gets isolation read engines.
// We copy the s.isolationReadEngines and return the copied one.
// So the change in the outside will not affect the origin value.
func (s *SessionVars) GetIsolationReadEngines() map[kv.StoreType]struct{} {
return s.IsolationReadEngines
readEngines := make(map[kv.StoreType]struct{})
for isolationReadEngine := range s.isolationReadEngines {
readEngines[isolationReadEngine] = struct{}{}
}
return readEngines
}

// GetAvailableIsolationReadEngines4Plan gets the available read engines for the current statement.
// We should use this function to get the isolation read engines in the optimize phase.
// For example, for write stmt, TiFlash has a different results when lock the data in point get plan.
// So we ban the TiFlash engine in not read only stmt.
func (s *SessionVars) GetAvailableIsolationReadEngines4Plan() map[kv.StoreType]struct{} {
availableReadEngines := s.GetIsolationReadEngines()
if _, isolationReadContainTiFlash := availableReadEngines[kv.TiFlash]; isolationReadContainTiFlash && (s.StmtCtx.IsReadonlyStmt || s.StmtCtx.InTiFlashFallBack2TiKV) {
delete(availableReadEngines, kv.TiFlash)
}
return availableReadEngines
}

// CleanBuffers cleans the temporary bufs
Expand Down
6 changes: 3 additions & 3 deletions sessionctx/variable/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,11 @@ func TestIsolationRead(t *testing.T) {
conf.IsolationRead.Engines = []string{"tiflash", "tidb"}
})
sessVars := variable.NewSessionVars()
_, ok := sessVars.IsolationReadEngines[kv.TiDB]
ok := sessVars.ContainSpecialIsolationRead(kv.TiDB)
require.True(t, ok)
_, ok = sessVars.IsolationReadEngines[kv.TiKV]
ok = sessVars.ContainSpecialIsolationRead(kv.TiKV)
require.False(t, ok)
_, ok = sessVars.IsolationReadEngines[kv.TiFlash]
ok = sessVars.ContainSpecialIsolationRead(kv.TiFlash)
require.True(t, ok)
}

Expand Down
8 changes: 4 additions & 4 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,15 +264,15 @@ var defaultSysVars = []*SysVar{
}
return formatVal, nil
}, SetSession: func(s *SessionVars, val string) error {
s.IsolationReadEngines = make(map[kv.StoreType]struct{})
s.isolationReadEngines = make(map[kv.StoreType]struct{})
for _, engine := range strings.Split(val, ",") {
switch engine {
case kv.TiKV.Name():
s.IsolationReadEngines[kv.TiKV] = struct{}{}
s.isolationReadEngines[kv.TiKV] = struct{}{}
case kv.TiFlash.Name():
s.IsolationReadEngines[kv.TiFlash] = struct{}{}
s.isolationReadEngines[kv.TiFlash] = struct{}{}
case kv.TiDB.Name():
s.IsolationReadEngines[kv.TiDB] = struct{}{}
s.isolationReadEngines[kv.TiDB] = struct{}{}
}
}
return nil
Expand Down

0 comments on commit 3847d9e

Please sign in to comment.