Skip to content

Commit

Permalink
*: support create global binding (#9846)
Browse files Browse the repository at this point in the history
  • Loading branch information
iamzhoug37 authored and zz-jason committed Apr 17, 2019
1 parent abeddab commit 63d720c
Show file tree
Hide file tree
Showing 12 changed files with 535 additions and 166 deletions.
79 changes: 64 additions & 15 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"os"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/parser"
Expand Down Expand Up @@ -115,22 +116,70 @@ func (s *testSuite) TestBindParse(c *C) {
sql := fmt.Sprintf(`INSERT INTO mysql.bind_info(original_sql,bind_sql,default_db,status,create_time,update_time,charset,collation) VALUES ('%s', '%s', '%s', '%s', NOW(), NOW(),'%s', '%s')`,
originSQL, bindSQL, defaultDb, status, charset, collation)
tk.MustExec(sql)
bindHandle := bindinfo.NewHandle()
bindCacheUpdater := bindinfo.NewBindCacheUpdater(tk.Se, bindHandle, s.Parser)
err := bindCacheUpdater.Update(true)
bindHandle := bindinfo.NewBindHandle(tk.Se, s.Parser)
err := bindHandle.Update(true)
c.Check(err, IsNil)
c.Check(len(bindHandle.Get()), Equals, 1)
c.Check(bindHandle.Size(), Equals, 1)

hash := parser.DigestHash("select * from t")
bindData := bindHandle.Get()[hash]
bindData := bindHandle.GetBindRecord("select * from t", "test")
c.Check(bindData, NotNil)
c.Check(len(bindData), Equals, 1)
c.Check(bindData[0].OriginalSQL, Equals, "select * from t")
c.Check(bindData[0].BindSQL, Equals, "select * from t use index(index_t)")
c.Check(bindData[0].Db, Equals, "test")
c.Check(bindData[0].Status, Equals, "using")
c.Check(bindData[0].Charset, Equals, "utf8mb4")
c.Check(bindData[0].Collation, Equals, "utf8mb4_bin")
c.Check(bindData[0].CreateTime, NotNil)
c.Check(bindData[0].UpdateTime, NotNil)
c.Check(bindData.OriginalSQL, Equals, "select * from t")
c.Check(bindData.BindSQL, Equals, "select * from t use index(index_t)")
c.Check(bindData.Db, Equals, "test")
c.Check(bindData.Status, Equals, "using")
c.Check(bindData.Charset, Equals, "utf8mb4")
c.Check(bindData.Collation, Equals, "utf8mb4_bin")
c.Check(bindData.CreateTime, NotNil)
c.Check(bindData.UpdateTime, NotNil)
}

func (s *testSuite) TestGlobalBinding(c *C) {
tk := testkit.NewTestKit(c, s.store)
s.cleanBindingEnv(tk)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t(i int, s varchar(20))")
tk.MustExec("create table t1(i int, s varchar(20))")
tk.MustExec("create index index_t on t(i,s)")

_, err := tk.Exec("create global binding for select * from t where i>100 using select * from t use index(index_t) where i>100")
c.Assert(err, IsNil, Commentf("err %v", err))

time.Sleep(time.Second * 1)
_, err = tk.Exec("create global binding for select * from t where i>99 using select * from t use index(index_t) where i>99")
c.Assert(err, IsNil)

bindData := s.domain.BindHandle().GetBindRecord("select * from t where i > ?", "test")
c.Check(bindData, NotNil)
c.Check(bindData.OriginalSQL, Equals, "select * from t where i > ?")
c.Check(bindData.BindSQL, Equals, "select * from t use index(index_t) where i>99")
c.Check(bindData.Db, Equals, "test")
c.Check(bindData.Status, Equals, "using")
c.Check(bindData.Charset, NotNil)
c.Check(bindData.Collation, NotNil)
c.Check(bindData.CreateTime, NotNil)
c.Check(bindData.UpdateTime, NotNil)

bindHandle := bindinfo.NewBindHandle(tk.Se, s.Parser)
err = bindHandle.Update(true)
c.Check(err, IsNil)
c.Check(bindHandle.Size(), Equals, 1)

bindData = bindHandle.GetBindRecord("select * from t where i > ?", "test")
c.Check(bindData, NotNil)
c.Check(bindData.OriginalSQL, Equals, "select * from t where i > ?")
c.Check(bindData.BindSQL, Equals, "select * from t use index(index_t) where i>99")
c.Check(bindData.Db, Equals, "test")
c.Check(bindData.Status, Equals, "using")
c.Check(bindData.Charset, NotNil)
c.Check(bindData.Collation, NotNil)
c.Check(bindData.CreateTime, NotNil)
c.Check(bindData.UpdateTime, NotNil)

_, err = tk.Exec("delete from mysql.bind_info")
c.Assert(err, IsNil)

_, err = tk.Exec("create global binding for select * from t using select * from t1 use index for join(index_t)")
c.Assert(err, NotNil, Commentf("err %v", err))
}
149 changes: 10 additions & 139 deletions bindinfo/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,145 +14,44 @@
package bindinfo

import (
"context"
"fmt"
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
)

const (
using = "using"
// using is the bind info's in use status.
using = "using"
// deleted is the bind info's deleted status.
deleted = "deleted"
)

// bindMeta stores the basic bind info and bindSql astNode.
type bindMeta struct {
*bindRecord
*BindRecord
ast ast.StmtNode //ast will be used to do query sql bind check
}

// cache is a k-v map, key is original sql, value is a slice of bindMeta.
type cache map[string][]*bindMeta

// Handle holds an atomic cache.
type Handle struct {
atomic.Value
}

// BindCacheUpdater is used to update the global cache.
// BindCacheUpdater will update the bind cache per 3 seconds in domain
// gorountine loop. When the tidb server first startup, the updater will load
// all bind info into memory; then load diff bind info per 3 second.
type BindCacheUpdater struct {
ctx sessionctx.Context

parser *parser.Parser
lastUpdateTime types.Time
globalHandle *Handle
}

type bindRecord struct {
// BindRecord represents a sql bind record retrieved from the storage.
type BindRecord struct {
OriginalSQL string
BindSQL string
Db string
// Status represents the status of the binding. It can only be one of the following values:
// 1. deleted: bindRecord is deleted, can not be used anymore.
// 2. using: bindRecord is in the normal active mode.
// 1. deleted: BindRecord is deleted, can not be used anymore.
// 2. using: BindRecord is in the normal active mode.
Status string
CreateTime types.Time
UpdateTime types.Time
Charset string
Collation string
}

// NewBindCacheUpdater creates a new BindCacheUpdater.
func NewBindCacheUpdater(ctx sessionctx.Context, handle *Handle, parser *parser.Parser) *BindCacheUpdater {
return &BindCacheUpdater{
ctx: ctx,
parser: parser,
globalHandle: handle,
}
}

// NewHandle creates a Handle with a cache.
func NewHandle() *Handle {
handle := &Handle{}
return handle
}

// Get gets cache from a Handle.
func (h *Handle) Get() cache {
bc := h.Load()
if bc != nil {
return bc.(map[string][]*bindMeta)
}
return make(map[string][]*bindMeta)
}

// LoadDiff is used to load new bind info to cache bc.
func (bindCacheUpdater *BindCacheUpdater) loadDiff(sql string, bc cache) error {
recordSets, err := bindCacheUpdater.ctx.(sqlexec.SQLExecutor).Execute(context.Background(), sql)
if err != nil {
return errors.Trace(err)
}

rs := recordSets[0]
defer terror.Call(rs.Close)
chkBatch := rs.NewRecordBatch()
for {
err = rs.Next(context.TODO(), chkBatch)
if err != nil || chkBatch.NumRows() == 0 {
return errors.Trace(err)
}

it := chunk.NewIterator4Chunk(chkBatch.Chunk)
for row := it.Begin(); row != it.End(); row = it.Next() {
record := newBindMeta(row)
err = bc.appendNode(record, bindCacheUpdater.parser)
if err != nil {
return err
}
if record.UpdateTime.Compare(bindCacheUpdater.lastUpdateTime) == 1 {
bindCacheUpdater.lastUpdateTime = record.UpdateTime
}
}
}
}

// Update updates the BindCacheUpdater's cache.
// The `fullLoad` is true only when tidb first startup, otherwise it is false.
func (bindCacheUpdater *BindCacheUpdater) Update(fullLoad bool) (err error) {
var sql string
bc := bindCacheUpdater.globalHandle.Get()
newBc := make(map[string][]*bindMeta, len(bc))
for hash, bindDataArr := range bc {
newBc[hash] = append(newBc[hash], bindDataArr...)
}

if fullLoad {
sql = "select original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation from mysql.bind_info"
} else {
sql = fmt.Sprintf("select original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation from mysql.bind_info where update_time > \"%s\"", bindCacheUpdater.lastUpdateTime.String())
}
err = bindCacheUpdater.loadDiff(sql, newBc)
if err != nil {
return errors.Trace(err)
}

bindCacheUpdater.globalHandle.Store(newBc)
return nil
}

func newBindMeta(row chunk.Row) *bindRecord {
return &bindRecord{
func newBindRecord(row chunk.Row) *BindRecord {
return &BindRecord{
OriginalSQL: row.GetString(0),
BindSQL: row.GetString(1),
Db: row.GetString(2),
Expand All @@ -163,31 +62,3 @@ func newBindMeta(row chunk.Row) *bindRecord {
Collation: row.GetString(7),
}
}

func (b cache) appendNode(newBindRecord *bindRecord, sparser *parser.Parser) error {
hash := parser.DigestHash(newBindRecord.OriginalSQL)
if bindArr, ok := b[hash]; ok {
for idx, v := range bindArr {
if v.OriginalSQL == newBindRecord.OriginalSQL && v.Db == newBindRecord.Db {
b[hash] = append(b[hash][:idx], b[hash][idx+1:]...)
if len(b[hash]) == 0 {
delete(b, hash)
}
break
}
}
}
if newBindRecord.Status == deleted {
return nil
}
stmtNodes, _, err := sparser.Parse(newBindRecord.BindSQL, newBindRecord.Charset, newBindRecord.Collation)
if err != nil {
return errors.Trace(err)
}
newNode := &bindMeta{
bindRecord: newBindRecord,
ast: stmtNodes[0],
}
b[hash] = append(b[hash], newNode)
return nil
}
Loading

0 comments on commit 63d720c

Please sign in to comment.