Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: simplify global and session binding handle #50063

Merged
merged 8 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/bindinfo/bind_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (c *bindCache) SetBinding(sqlDigest string, meta *BindRecord) (err error) {

// RemoveBinding removes the BindRecord which has same originSQL with specified BindRecord.
// The function is thread-safe.
func (c *bindCache) RemoveBinding(sqlDigest string, _ *BindRecord) {
func (c *bindCache) RemoveBinding(sqlDigest string) {
c.lock.Lock()
defer c.lock.Unlock()
c.delete(bindCacheKey(sqlDigest))
Expand Down
18 changes: 0 additions & 18 deletions pkg/bindinfo/bind_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,24 +250,6 @@ func merge(lBindRecord, rBindRecord *BindRecord) *BindRecord {
return result
}

func (br *BindRecord) remove(deleted *BindRecord) *BindRecord {
// Delete all bindings.
if len(deleted.Bindings) == 0 {
return &BindRecord{OriginalSQL: br.OriginalSQL, Db: br.Db}
}
result := br.shallowCopy()
for j := range deleted.Bindings {
deletedBind := deleted.Bindings[j]
for i, bind := range result.Bindings {
if bind.isSame(&deletedBind) {
result.Bindings = append(result.Bindings[:i], result.Bindings[i+1:]...)
break
}
}
}
return result
}

func (br *BindRecord) removeDeletedBindings() *BindRecord {
result := BindRecord{OriginalSQL: br.OriginalSQL, Db: br.Db, Bindings: make([]Binding, 0, len(br.Bindings))}
for _, binding := range br.Bindings {
Expand Down
2 changes: 1 addition & 1 deletion pkg/bindinfo/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (h *globalBindingHandle) CaptureBaselines() {
}
dbName := utilparser.GetDefaultDB(stmt, bindableStmt.Schema)
normalizedSQL, digest := parser.NormalizeDigest(utilparser.RestoreWithDefaultDB(stmt, dbName, bindableStmt.Query))
if r := h.GetGlobalBinding(digest.String()); r != nil && r.HasAvailableBinding() {
if r := h.getCache().GetBinding(digest.String()); r != nil && r.HasAvailableBinding() {
continue
}
bindSQL := GenerateBindSQL(context.TODO(), stmt, bindableStmt.PlanHint, true, dbName)
Expand Down
68 changes: 24 additions & 44 deletions pkg/bindinfo/global_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@ type GlobalBindingHandle interface {
DropGlobalBinding(sqlDigest string) (deletedRows uint64, err error)

// SetGlobalBindingStatus set a BindRecord's status to the storage and bind cache.
SetGlobalBindingStatus(originalSQL string, binding *Binding, newStatus string) (ok bool, err error)

// SetGlobalBindingStatusByDigest set a BindRecord's status to the storage and bind cache.
SetGlobalBindingStatusByDigest(newStatus, sqlDigest string) (ok bool, err error)
SetGlobalBindingStatus(newStatus, sqlDigest string) (ok bool, err error)

// AddInvalidGlobalBinding adds BindRecord which needs to be deleted into invalidBindRecordMap.
AddInvalidGlobalBinding(invalidBindRecord *BindRecord)
Expand Down Expand Up @@ -173,8 +170,12 @@ func (h *globalBindingHandle) Reset() {
h.lastUpdateTime.Store(types.ZeroTimestamp)
h.invalidBindRecordMap.Value.Store(make(map[string]*bindRecordUpdate))
h.invalidBindRecordMap.flushFunc = func(record *BindRecord) error {
_, err := h.dropGlobalBinding(record.OriginalSQL, record.Db)
return err
for _, binding := range record.Bindings {
if _, err := h.dropGlobalBinding(binding.SQLDigest); err != nil {
return err
}
}
return nil
}
h.setCache(newBindCache())
variable.RegisterStatistics(h)
Expand Down Expand Up @@ -250,7 +251,7 @@ func (h *globalBindingHandle) LoadFromStorageToCache(fullLoad bool) (err error)
logutil.BgLogger().Warn("BindHandle.Update", zap.String("category", "sql-bind"), zap.Error(err))
}
} else {
newCache.RemoveBinding(sqlDigest, newRecord)
newCache.RemoveBinding(sqlDigest)
}
updateMetrics(metrics.ScopeGlobal, oldRecord, newCache.GetBinding(sqlDigest), true)
}
Expand Down Expand Up @@ -314,13 +315,7 @@ func (h *globalBindingHandle) CreateGlobalBinding(sctx sessionctx.Context, recor
}

// dropGlobalBinding drops a BindRecord to the storage and BindRecord int the cache.
func (h *globalBindingHandle) dropGlobalBinding(originalSQL, _ string) (deletedRows uint64, err error) {
defer func() {
if err == nil {
err = h.LoadFromStorageToCache(false)
}
}()

func (h *globalBindingHandle) dropGlobalBinding(sqlDigest string) (deletedRows uint64, err error) {
err = h.callWithSCtx(false, func(sctx sessionctx.Context) error {
// Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances.
if err = lockBindInfoTable(sctx); err != nil {
Expand All @@ -329,8 +324,8 @@ func (h *globalBindingHandle) dropGlobalBinding(originalSQL, _ string) (deletedR

updateTs := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3).String()

_, err = exec(sctx, `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %? AND status != %?`,
deleted, updateTs, originalSQL, updateTs, deleted)
_, err = exec(sctx, `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE sql_digest = %? AND update_time < %? AND status != %?`,
deleted, updateTs, sqlDigest, updateTs, deleted)
if err != nil {
return err
}
Expand All @@ -345,15 +340,16 @@ func (h *globalBindingHandle) DropGlobalBinding(sqlDigest string) (deletedRows u
if sqlDigest == "" {
return 0, errors.New("sql digest is empty")
}
oldRecord := h.GetGlobalBinding(sqlDigest)
if oldRecord == nil {
return 0, errors.Errorf("can't find any binding for '%s'", sqlDigest)
}
return h.dropGlobalBinding(oldRecord.OriginalSQL, strings.ToLower(oldRecord.Db))
defer func() {
if err == nil {
err = h.LoadFromStorageToCache(false)
}
}()
return h.dropGlobalBinding(sqlDigest)
}

// SetGlobalBindingStatus set a BindRecord's status to the storage and bind cache.
func (h *globalBindingHandle) SetGlobalBindingStatus(originalSQL string, binding *Binding, newStatus string) (ok bool, err error) {
func (h *globalBindingHandle) SetGlobalBindingStatus(newStatus, sqlDigest string) (ok bool, err error) {
var (
updateTs types.Time
oldStatus0, oldStatus1 string
Expand Down Expand Up @@ -384,27 +380,13 @@ func (h *globalBindingHandle) SetGlobalBindingStatus(originalSQL string, binding
updateTs = types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3)
updateTsStr := updateTs.String()

if binding == nil {
_, err = exec(sctx, `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %? AND status IN (%?, %?)`,
newStatus, updateTsStr, originalSQL, updateTsStr, oldStatus0, oldStatus1)
} else {
_, err = exec(sctx, `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE original_sql = %? AND update_time < %? AND bind_sql = %? AND status IN (%?, %?)`,
newStatus, updateTsStr, originalSQL, updateTsStr, binding.BindSQL, oldStatus0, oldStatus1)
}
return nil
_, err = exec(sctx, `UPDATE mysql.bind_info SET status = %?, update_time = %? WHERE sql_digest = %? AND update_time < %? AND status IN (%?, %?)`,
newStatus, updateTsStr, sqlDigest, updateTsStr, oldStatus0, oldStatus1)
return err
})
return
}

// SetGlobalBindingStatusByDigest set a BindRecord's status to the storage and bind cache.
func (h *globalBindingHandle) SetGlobalBindingStatusByDigest(newStatus, sqlDigest string) (ok bool, err error) {
oldRecord := h.GetGlobalBinding(sqlDigest)
if oldRecord == nil {
return false, errors.Errorf("can't find any binding for '%s'", sqlDigest)
}
return h.SetGlobalBindingStatus(oldRecord.OriginalSQL, nil, newStatus)
}

// GCGlobalBinding physically removes the deleted bind records in mysql.bind_info.
func (h *globalBindingHandle) GCGlobalBinding() (err error) {
return h.callWithSCtx(true, func(sctx sessionctx.Context) error {
Expand Down Expand Up @@ -486,6 +468,9 @@ func (tmpMap *tmpBindRecordMap) Add(bindRecord *BindRecord) {

// DropInvalidGlobalBinding executes the drop BindRecord tasks.
func (h *globalBindingHandle) DropInvalidGlobalBinding() {
defer func() {
_ = h.LoadFromStorageToCache(false)
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
}()
h.invalidBindRecordMap.flushToStore()
}

Expand Down Expand Up @@ -514,11 +499,6 @@ func (h *globalBindingHandle) MatchGlobalBinding(currentDB string, stmt ast.Stmt
return bindingCache.GetBinding(sqlDigest), nil
}

// GetGlobalBinding returns the BindRecord of the (normalizedSQL,db) if BindRecord exist.
func (h *globalBindingHandle) GetGlobalBinding(sqlDigest string) *BindRecord {
return h.getCache().GetBinding(sqlDigest)
}

// GetAllGlobalBindings returns all bind records in cache.
func (h *globalBindingHandle) GetAllGlobalBindings() (bindRecords []*BindRecord) {
return h.getCache().GetAllBindings()
Expand Down
17 changes: 5 additions & 12 deletions pkg/bindinfo/global_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,6 @@ func TestSetBindingStatus(t *testing.T) {
tk.MustExec("select * from t where a > 10")
tk.MustQuery("select @@last_plan_from_binding").Check(testkit.Rows("1"))

tk.MustExec("set binding disabled for select * from t where a > 10 using select * from t where a > 10")
tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 There are no bindings can be set the status. Please check the SQL text"))
rows = tk.MustQuery("show global bindings").Rows()
require.Len(t, rows, 1)
require.Equal(t, bindinfo.Enabled, rows[0][3])
tk.MustExec("select * from t where a > 10")
tk.MustQuery("select @@last_plan_from_binding").Check(testkit.Rows("1"))

tk.MustExec("set binding disabled for select * from t where a > 10")
rows = tk.MustQuery("show global bindings").Rows()
require.Len(t, rows, 1)
Expand Down Expand Up @@ -269,10 +261,11 @@ func TestSetBindingStatusWithoutBindingInCache(t *testing.T) {
tk.MustQuery("show global bindings").Check(testkit.Rows())

// Simulate creating bindings on other machines
_, sqlDigest := parser.NormalizeDigestForBinding("select * from `test` . `t` where `a` > ?")
tk.MustExec("insert into mysql.bind_info values('select * from `test` . `t` where `a` > ?', 'SELECT /*+ USE_INDEX(`t` `idx_a`)*/ * FROM `test`.`t` WHERE `a` > 10', 'test', 'deleted', '2000-01-01 09:00:00', '2000-01-01 09:00:00', '', '','" +
bindinfo.Manual + "', '', '')")
bindinfo.Manual + "', '" + sqlDigest.String() + "', '')")
tk.MustExec("insert into mysql.bind_info values('select * from `test` . `t` where `a` > ?', 'SELECT /*+ USE_INDEX(`t` `idx_a`)*/ * FROM `test`.`t` WHERE `a` > 10', 'test', 'enabled', '2000-01-02 09:00:00', '2000-01-02 09:00:00', '', '','" +
bindinfo.Manual + "', '', '')")
bindinfo.Manual + "', '" + sqlDigest.String() + "', '')")
dom.BindHandle().Clear()
tk.MustExec("set binding disabled for select * from t where a > 10")
tk.MustExec("admin reload bindings")
Expand All @@ -285,9 +278,9 @@ func TestSetBindingStatusWithoutBindingInCache(t *testing.T) {

// Simulate creating bindings on other machines
tk.MustExec("insert into mysql.bind_info values('select * from `test` . `t` where `a` > ?', 'SELECT * FROM `test`.`t` WHERE `a` > 10', 'test', 'deleted', '2000-01-01 09:00:00', '2000-01-01 09:00:00', '', '','" +
bindinfo.Manual + "', '', '')")
bindinfo.Manual + "', '" + sqlDigest.String() + "', '')")
tk.MustExec("insert into mysql.bind_info values('select * from `test` . `t` where `a` > ?', 'SELECT * FROM `test`.`t` WHERE `a` > 10', 'test', 'disabled', '2000-01-02 09:00:00', '2000-01-02 09:00:00', '', '','" +
bindinfo.Manual + "', '', '')")
bindinfo.Manual + "', '" + sqlDigest.String() + "', '')")
dom.BindHandle().Clear()
tk.MustExec("set binding enabled for select * from t where a > 10")
tk.MustExec("admin reload bindings")
Expand Down
36 changes: 2 additions & 34 deletions pkg/bindinfo/session_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,45 +95,13 @@ func (h *sessionBindingHandle) CreateSessionBinding(sctx sessionctx.Context, rec
return nil
}

// dropSessionBinding drops a BindRecord in the cache.
func (h *sessionBindingHandle) dropSessionBinding(originalSQL, db string, binding *Binding) error {
db = strings.ToLower(db)
sqlDigest := parser.DigestNormalized(originalSQL).String()
oldRecord := h.GetSessionBinding(sqlDigest)
var newRecord *BindRecord
record := &BindRecord{OriginalSQL: originalSQL, Db: db}
if binding != nil {
record.Bindings = append(record.Bindings, *binding)
}
if oldRecord != nil {
newRecord = oldRecord.remove(record)
} else {
newRecord = record
}
err := h.ch.SetBinding(sqlDigest, newRecord)
if err != nil {
// Should never reach here, just return an error for safety
return err
}
updateMetrics(metrics.ScopeSession, oldRecord, newRecord, false)
return nil
}

// DropSessionBinding drop BindRecord in the cache.
func (h *sessionBindingHandle) DropSessionBinding(sqlDigest string) error {
if sqlDigest == "" {
return errors.New("sql digest is empty")
}
oldRecord := h.GetSessionBinding(sqlDigest)
if oldRecord == nil {
return errors.Errorf("can't find any binding for '%s'", sqlDigest)
}
return h.dropSessionBinding(oldRecord.OriginalSQL, strings.ToLower(oldRecord.Db), nil)
}

// GetSessionBinding return all BindMeta corresponding to sqlDigest.
func (h *sessionBindingHandle) GetSessionBinding(sqlDigest string) *BindRecord {
return h.ch.GetBinding(sqlDigest)
h.ch.RemoveBinding(sqlDigest)
return nil
}

// MatchSessionBinding returns the matched binding for this statement.
Expand Down
16 changes: 4 additions & 12 deletions pkg/bindinfo/session_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,7 @@ func TestSessionBinding(t *testing.T) {
require.NoError(t, err)
bindData, err = handle.MatchSessionBinding("test", stmt)
require.NoError(t, err)
require.NotNil(t, bindData)
require.Equal(t, testSQL.originSQL, bindData.OriginalSQL)
require.Len(t, bindData.Bindings, 0)

err = metrics.BindTotalGauge.WithLabelValues(metrics.ScopeSession, bindinfo.Enabled).Write(pb)
require.NoError(t, err)
require.Equal(t, float64(0), pb.GetGauge().GetValue())
err = metrics.BindMemoryUsage.WithLabelValues(metrics.ScopeSession, bindinfo.Enabled).Write(pb)
require.NoError(t, err)
require.Equal(t, float64(0), pb.GetGauge().GetValue())
require.Nil(t, bindData) // dropped
}
}

Expand Down Expand Up @@ -214,8 +205,9 @@ func TestBaselineDBLowerCase(t *testing.T) {
internal.UtilCleanBindingEnv(tk, dom)

// Simulate existing bindings with upper case default_db.
_, sqlDigest := parser.NormalizeDigestForBinding("select * from `spm` . `t`")
tk.MustExec("insert into mysql.bind_info values('select * from `spm` . `t`', 'select * from `spm` . `t`', 'SPM', 'enabled', '2000-01-01 09:00:00', '2000-01-01 09:00:00', '', '','" +
bindinfo.Manual + "', '', '')")
bindinfo.Manual + "', '" + sqlDigest.String() + "', '')")
tk.MustQuery("select original_sql, default_db from mysql.bind_info where original_sql = 'select * from `spm` . `t`'").Check(testkit.Rows(
"select * from `spm` . `t` SPM",
))
Expand All @@ -233,7 +225,7 @@ func TestBaselineDBLowerCase(t *testing.T) {
internal.UtilCleanBindingEnv(tk, dom)
// Simulate existing bindings with upper case default_db.
tk.MustExec("insert into mysql.bind_info values('select * from `spm` . `t`', 'select * from `spm` . `t`', 'SPM', 'enabled', '2000-01-01 09:00:00', '2000-01-01 09:00:00', '', '','" +
bindinfo.Manual + "', '', '')")
bindinfo.Manual + "', '" + sqlDigest.String() + "', '')")
tk.MustQuery("select original_sql, default_db from mysql.bind_info where original_sql = 'select * from `spm` . `t`'").Check(testkit.Rows(
"select * from `spm` . `t` SPM",
))
Expand Down
1 change: 0 additions & 1 deletion pkg/bindinfo/tests/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,6 @@ func TestDropBindBySQLDigest(t *testing.T) {
}

// exception cases
tk.MustGetErrMsg(fmt.Sprintf("drop binding for sql digest '%s'", "1"), "can't find any binding for '1'")
tk.MustGetErrMsg(fmt.Sprintf("drop binding for sql digest '%s'", ""), "sql digest is empty")
}

Expand Down
14 changes: 4 additions & 10 deletions pkg/executor/bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/pkg/bindinfo"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/util/chunk"
Expand Down Expand Up @@ -99,15 +100,8 @@ func (e *SQLBindExec) dropSQLBindByDigest() error {
}

func (e *SQLBindExec) setBindingStatus() error {
var bindInfo *bindinfo.Binding
if e.bindSQL != "" {
bindInfo = &bindinfo.Binding{
BindSQL: e.bindSQL,
Charset: e.charset,
Collation: e.collation,
}
}
ok, err := domain.GetDomain(e.Ctx()).BindHandle().SetGlobalBindingStatus(e.normdOrigSQL, bindInfo, e.newStatus)
_, sqlDigest := parser.NormalizeDigestForBinding(e.normdOrigSQL)
ok, err := domain.GetDomain(e.Ctx()).BindHandle().SetGlobalBindingStatus(e.newStatus, sqlDigest.String())
if err == nil && !ok {
warningMess := errors.NewNoStackError("There are no bindings can be set the status. Please check the SQL text")
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(warningMess)
Expand All @@ -116,7 +110,7 @@ func (e *SQLBindExec) setBindingStatus() error {
}

func (e *SQLBindExec) setBindingStatusByDigest() error {
ok, err := domain.GetDomain(e.Ctx()).BindHandle().SetGlobalBindingStatusByDigest(e.newStatus, e.sqlDigest)
ok, err := domain.GetDomain(e.Ctx()).BindHandle().SetGlobalBindingStatus(e.newStatus, e.sqlDigest)
if err == nil && !ok {
warningMess := errors.NewNoStackError("There are no bindings can be set the status. Please check the SQL text")
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(warningMess)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1328,7 +1328,6 @@ func TestSetBindingStatusBySQLDigest(t *testing.T) {
tk.MustExec(fmt.Sprintf("set binding enabled for sql digest '%s'", sqlDigest[0][9]))
tk.MustExec(sql)
tk.MustQuery("select @@last_plan_from_binding").Check(testkit.Rows("1"))
tk.MustGetErrMsg("set binding enabled for sql digest '2'", "can't find any binding for '2'")
tk.MustGetErrMsg("set binding enabled for sql digest ''", "sql digest is empty")
tk.MustGetErrMsg("set binding disabled for sql digest ''", "sql digest is empty")
}
Expand Down
33 changes: 0 additions & 33 deletions pkg/infoschema/test/clustertablestest/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,39 +1011,6 @@ func TestStmtSummaryInternalQuery(t *testing.T) {
// Disable refreshing summary.
tk.MustExec("set global tidb_stmt_summary_refresh_interval = 999999999")
tk.MustQuery("select @@global.tidb_stmt_summary_refresh_interval").Check(testkit.Rows("999999999"))

// Test Internal

// Create a new session to test.
tk = newTestKitWithRoot(t, store)

tk.MustExec("select * from t where t.a = 1")
tk.MustQuery(`select exec_count, digest_text
from information_schema.statements_summary
where digest_text like "select original_sql , bind_sql , default_db , status%"`).Check(testkit.Rows())

// Enable internal query and evolve baseline.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plan evolution is deprecated.

tk.MustExec("set global tidb_stmt_summary_internal_query = 1")
defer tk.MustExec("set global tidb_stmt_summary_internal_query = false")

// Create a new session to test.
tk = newTestKitWithRoot(t, store)

tk.MustExec("admin flush bindings")
tk.MustExec("admin evolve bindings")

// `exec_count` may be bigger than 1 because other cases are also running.
sql := "select digest_text " +
"from information_schema.statements_summary " +
"where digest_text like \"select `original_sql` , `bind_sql` , `default_db` , status%\""
tk.MustQuery(sql).Check(testkit.Rows(
"select `original_sql` , `bind_sql` , `default_db` , status , `create_time` , `update_time` , charset , " +
"collation , source , `sql_digest` , `plan_digest` from `mysql` . `bind_info` where `update_time` > ? order by `update_time` , `create_time`"))

// Test for issue #21642.
tk.MustQuery(`select tidb_version()`)
rows := tk.MustQuery("select plan from information_schema.statements_summary where digest_text like \"select `tidb_version`%\"").Rows()
require.Contains(t, rows[0][0].(string), "Projection")
}

// TestSimpleStmtSummaryEvictedCount test stmtSummaryEvictedCount
Expand Down
Loading