From a469e15104a31117c6b389041ae405ba1c57aa4a Mon Sep 17 00:00:00 2001 From: flike Date: Sun, 16 Oct 2016 06:57:23 +0800 Subject: [PATCH] support show columns/fields(#180 #238 #253) --- core/errors/errors.go | 1 + core/hack/version.go | 4 +- mysql/const.go | 3 ++ proxy/server/conn_preshard.go | 82 ++++++++++++++++++++++++++++------- 4 files changed, 72 insertions(+), 18 deletions(-) diff --git a/core/errors/errors.go b/core/errors/errors.go index 14f24696..497987ff 100644 --- a/core/errors/errors.go +++ b/core/errors/errors.go @@ -69,4 +69,5 @@ var ( ErrSlaveNotExist = errors.New("slave has not exist") ErrBlackSqlExist = errors.New("black sql has exist") ErrBlackSqlNotExist = errors.New("black sql has not exist") + ErrSQLNULL = errors.New("sql is null") ) diff --git a/core/hack/version.go b/core/hack/version.go index 812f1486..096c4725 100644 --- a/core/hack/version.go +++ b/core/hack/version.go @@ -1,6 +1,6 @@ package hack const ( - Version = "2016-09-23 15:44:15 +0800 @a15fd20" - Compile = "2016-09-24 20:32:20 +0800 by go version go1.7.1 darwin/amd64" + Version = "2016-09-26 19:02:52 +0800 @2e6befa" + Compile = "2016-10-16 06:51:38 +0800 by go version go1.7.1 darwin/amd64" ) diff --git a/mysql/const.go b/mysql/const.go index a410a277..30f09d93 100644 --- a/mysql/const.go +++ b/mysql/const.go @@ -205,6 +205,9 @@ var ( TK_STR_TRANSACTION = "transaction" TK_STR_LAST_INSERT_ID = "last_insert_id()" TK_STR_MASTER_HINT = "*master*" + //show + TK_STR_COLUMNS = "columns" + TK_STR_FIELDS = "fields" SET_KEY_WORDS = map[string]struct{}{ "names": struct{}{}, diff --git a/proxy/server/conn_preshard.go b/proxy/server/conn_preshard.go index 3007183c..2eae1810 100644 --- a/proxy/server/conn_preshard.go +++ b/proxy/server/conn_preshard.go @@ -23,12 +23,14 @@ import ( "github.com/flike/kingshard/core/golog" "github.com/flike/kingshard/core/hack" "github.com/flike/kingshard/mysql" + "github.com/flike/kingshard/proxy/router" "github.com/flike/kingshard/sqlparser" ) type ExecuteDB struct { ExecNode *backend.Node IsSlave bool + sql string } func (c *ClientConn) isBlacklistSql(sql string) bool { @@ -95,7 +97,8 @@ func (c *ClientConn) preHandleShard(sql string) (bool, error) { if err != nil { return false, err } - rs, err = c.executeInNode(conn, sql, nil) + //execute.sql may be rewritten in getShowExecDB + rs, err = c.executeInNode(conn, executeDB.sql, nil) if err != nil { return false, err } @@ -126,6 +129,7 @@ func (c *ClientConn) GetTransExecDB(tokens []string, sql string) (*ExecuteDB, er var err error tokensLen := len(tokens) executeDB := new(ExecuteDB) + executeDB.sql = sql //transaction execute in master db executeDB.IsSlave = false @@ -163,23 +167,24 @@ func (c *ClientConn) GetExecDB(tokens []string, sql string) (*ExecuteDB, error) if ok == true { switch tokenId { case mysql.TK_ID_SELECT: - return c.getSelectExecDB(tokens, tokensLen) + return c.getSelectExecDB(sql, tokens, tokensLen) case mysql.TK_ID_DELETE: - return c.getDeleteExecDB(tokens, tokensLen) + return c.getDeleteExecDB(sql, tokens, tokensLen) case mysql.TK_ID_INSERT, mysql.TK_ID_REPLACE: - return c.getInsertOrReplaceExecDB(tokens, tokensLen) + return c.getInsertOrReplaceExecDB(sql, tokens, tokensLen) case mysql.TK_ID_UPDATE: - return c.getUpdateExecDB(tokens, tokensLen) + return c.getUpdateExecDB(sql, tokens, tokensLen) case mysql.TK_ID_SET: - return c.getSetExecDB(tokens, tokensLen, sql) + return c.getSetExecDB(sql, tokens, tokensLen) case mysql.TK_ID_SHOW: - return c.getShowExecDB(tokens, tokensLen) + return c.getShowExecDB(sql, tokens, tokensLen) default: return nil, nil } } } executeDB := new(ExecuteDB) + executeDB.sql = sql err := c.setExecuteNode(tokens, tokensLen, executeDB) if err != nil { return nil, err @@ -214,12 +219,13 @@ func (c *ClientConn) setExecuteNode(tokens []string, tokensLen int, executeDB *E } //get the execute database for select sql -func (c *ClientConn) getSelectExecDB(tokens []string, tokensLen int) (*ExecuteDB, error) { +func (c *ClientConn) getSelectExecDB(sql string, tokens []string, tokensLen int) (*ExecuteDB, error) { executeDB := new(ExecuteDB) - schema := c.proxy.schema + executeDB.sql = sql + executeDB.IsSlave = true + schema := c.proxy.schema rules := schema.rule.Rules - executeDB.IsSlave = true if len(rules) != 0 { for i := 1; i < tokensLen; i++ { @@ -256,8 +262,9 @@ func (c *ClientConn) getSelectExecDB(tokens []string, tokensLen int) (*ExecuteDB } //get the execute database for delete sql -func (c *ClientConn) getDeleteExecDB(tokens []string, tokensLen int) (*ExecuteDB, error) { +func (c *ClientConn) getDeleteExecDB(sql string, tokens []string, tokensLen int) (*ExecuteDB, error) { executeDB := new(ExecuteDB) + executeDB.sql = sql schema := c.proxy.schema rules := schema.rule.Rules @@ -283,8 +290,9 @@ func (c *ClientConn) getDeleteExecDB(tokens []string, tokensLen int) (*ExecuteDB } //get the execute database for insert or replace sql -func (c *ClientConn) getInsertOrReplaceExecDB(tokens []string, tokensLen int) (*ExecuteDB, error) { +func (c *ClientConn) getInsertOrReplaceExecDB(sql string, tokens []string, tokensLen int) (*ExecuteDB, error) { executeDB := new(ExecuteDB) + executeDB.sql = sql schema := c.proxy.schema rules := schema.rule.Rules @@ -310,8 +318,9 @@ func (c *ClientConn) getInsertOrReplaceExecDB(tokens []string, tokensLen int) (* } //get the execute database for update sql -func (c *ClientConn) getUpdateExecDB(tokens []string, tokensLen int) (*ExecuteDB, error) { +func (c *ClientConn) getUpdateExecDB(sql string, tokens []string, tokensLen int) (*ExecuteDB, error) { executeDB := new(ExecuteDB) + executeDB.sql = sql schema := c.proxy.schema rules := schema.rule.Rules @@ -335,8 +344,9 @@ func (c *ClientConn) getUpdateExecDB(tokens []string, tokensLen int) (*ExecuteDB } //get the execute database for set sql -func (c *ClientConn) getSetExecDB(tokens []string, tokensLen int, sql string) (*ExecuteDB, error) { +func (c *ClientConn) getSetExecDB(sql string, tokens []string, tokensLen int) (*ExecuteDB, error) { executeDB := new(ExecuteDB) + executeDB.sql = sql //handle three styles: //set autocommit= 0 @@ -371,14 +381,54 @@ func (c *ClientConn) getSetExecDB(tokens []string, tokensLen int, sql string) (* //get the execute database for show sql //choose slave preferentially -func (c *ClientConn) getShowExecDB(tokens []string, tokensLen int) (*ExecuteDB, error) { +//tokens[0] is show +func (c *ClientConn) getShowExecDB(sql string, tokens []string, tokensLen int) (*ExecuteDB, error) { executeDB := new(ExecuteDB) executeDB.IsSlave = true + executeDB.sql = sql - err := c.setExecuteNode(tokens, tokensLen, executeDB) + //handle show columns/fields + err := c.handleShowColumns(sql, tokens, tokensLen, executeDB) + if err != nil { + return nil, err + } + + err = c.setExecuteNode(tokens, tokensLen, executeDB) if err != nil { return nil, err } return executeDB, nil } + +//handle show columns/fields +func (c *ClientConn) handleShowColumns(sql string, tokens []string, + tokensLen int, executeDB *ExecuteDB) error { + + for i := 0; i < tokensLen; i++ { + //handle SQL: + //SHOW [FULL] COLUMNS FROM tbl_name [FROM db_name] [like_or_where] + if (tokens[i] == mysql.TK_STR_FIELDS || + tokens[i] == mysql.TK_STR_COLUMNS) && + i+2 < tokensLen { + if tokens[i+1] == mysql.TK_STR_FROM { + tableName := strings.Trim(tokens[i+2], "`") + showRouter := c.schema.rule + showRule := showRouter.GetRule(tableName) + //this SHOW is sharding SQL + if showRule.Type != router.DefaultRuleType { + if 0 < len(showRule.SubTableIndexs) { + tableIndex := showRule.SubTableIndexs[0] + nodeIndex := showRule.TableToNode[tableIndex] + nodeName := showRule.Nodes[nodeIndex] + tokens[i+2] = fmt.Sprintf("%s_%04d", tableName, tableIndex) + executeDB.sql = strings.Join(tokens, " ") + executeDB.ExecNode = c.schema.nodes[nodeName] + return nil + } + } + } + } + } + return nil +}