Skip to content

Commit

Permalink
Merge pull request flike#259 from flike/feature-show
Browse files Browse the repository at this point in the history
support show columns/fields(flike#180 flike#238 flike#253)
  • Loading branch information
Fei Chen authored Oct 15, 2016
2 parents 2e6befa + a469e15 commit af06e9f
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 18 deletions.
1 change: 1 addition & 0 deletions core/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,5 @@ var (
ErrSlaveNotExist = errors.New("slave has not exist")
ErrBlackSqlExist = errors.New("black sql has exist")
ErrBlackSqlNotExist = errors.New("black sql has not exist")
ErrSQLNULL = errors.New("sql is null")
)
4 changes: 2 additions & 2 deletions core/hack/version.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package hack

const (
Version = "2016-09-23 15:44:15 +0800 @a15fd20"
Compile = "2016-09-24 20:32:20 +0800 by go version go1.7.1 darwin/amd64"
Version = "2016-09-26 19:02:52 +0800 @2e6befa"
Compile = "2016-10-16 06:51:38 +0800 by go version go1.7.1 darwin/amd64"
)
3 changes: 3 additions & 0 deletions mysql/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ var (
TK_STR_TRANSACTION = "transaction"
TK_STR_LAST_INSERT_ID = "last_insert_id()"
TK_STR_MASTER_HINT = "*master*"
//show
TK_STR_COLUMNS = "columns"
TK_STR_FIELDS = "fields"

SET_KEY_WORDS = map[string]struct{}{
"names": struct{}{},
Expand Down
82 changes: 66 additions & 16 deletions proxy/server/conn_preshard.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ import (
"github.com/flike/kingshard/core/golog"
"github.com/flike/kingshard/core/hack"
"github.com/flike/kingshard/mysql"
"github.com/flike/kingshard/proxy/router"
"github.com/flike/kingshard/sqlparser"
)

type ExecuteDB struct {
ExecNode *backend.Node
IsSlave bool
sql string
}

func (c *ClientConn) isBlacklistSql(sql string) bool {
Expand Down Expand Up @@ -95,7 +97,8 @@ func (c *ClientConn) preHandleShard(sql string) (bool, error) {
if err != nil {
return false, err
}
rs, err = c.executeInNode(conn, sql, nil)
//execute.sql may be rewritten in getShowExecDB
rs, err = c.executeInNode(conn, executeDB.sql, nil)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -126,6 +129,7 @@ func (c *ClientConn) GetTransExecDB(tokens []string, sql string) (*ExecuteDB, er
var err error
tokensLen := len(tokens)
executeDB := new(ExecuteDB)
executeDB.sql = sql

//transaction execute in master db
executeDB.IsSlave = false
Expand Down Expand Up @@ -163,23 +167,24 @@ func (c *ClientConn) GetExecDB(tokens []string, sql string) (*ExecuteDB, error)
if ok == true {
switch tokenId {
case mysql.TK_ID_SELECT:
return c.getSelectExecDB(tokens, tokensLen)
return c.getSelectExecDB(sql, tokens, tokensLen)
case mysql.TK_ID_DELETE:
return c.getDeleteExecDB(tokens, tokensLen)
return c.getDeleteExecDB(sql, tokens, tokensLen)
case mysql.TK_ID_INSERT, mysql.TK_ID_REPLACE:
return c.getInsertOrReplaceExecDB(tokens, tokensLen)
return c.getInsertOrReplaceExecDB(sql, tokens, tokensLen)
case mysql.TK_ID_UPDATE:
return c.getUpdateExecDB(tokens, tokensLen)
return c.getUpdateExecDB(sql, tokens, tokensLen)
case mysql.TK_ID_SET:
return c.getSetExecDB(tokens, tokensLen, sql)
return c.getSetExecDB(sql, tokens, tokensLen)
case mysql.TK_ID_SHOW:
return c.getShowExecDB(tokens, tokensLen)
return c.getShowExecDB(sql, tokens, tokensLen)
default:
return nil, nil
}
}
}
executeDB := new(ExecuteDB)
executeDB.sql = sql
err := c.setExecuteNode(tokens, tokensLen, executeDB)
if err != nil {
return nil, err
Expand Down Expand Up @@ -214,12 +219,13 @@ func (c *ClientConn) setExecuteNode(tokens []string, tokensLen int, executeDB *E
}

//get the execute database for select sql
func (c *ClientConn) getSelectExecDB(tokens []string, tokensLen int) (*ExecuteDB, error) {
func (c *ClientConn) getSelectExecDB(sql string, tokens []string, tokensLen int) (*ExecuteDB, error) {
executeDB := new(ExecuteDB)
schema := c.proxy.schema
executeDB.sql = sql
executeDB.IsSlave = true

schema := c.proxy.schema
rules := schema.rule.Rules
executeDB.IsSlave = true

if len(rules) != 0 {
for i := 1; i < tokensLen; i++ {
Expand Down Expand Up @@ -256,8 +262,9 @@ func (c *ClientConn) getSelectExecDB(tokens []string, tokensLen int) (*ExecuteDB
}

//get the execute database for delete sql
func (c *ClientConn) getDeleteExecDB(tokens []string, tokensLen int) (*ExecuteDB, error) {
func (c *ClientConn) getDeleteExecDB(sql string, tokens []string, tokensLen int) (*ExecuteDB, error) {
executeDB := new(ExecuteDB)
executeDB.sql = sql
schema := c.proxy.schema
rules := schema.rule.Rules

Expand All @@ -283,8 +290,9 @@ func (c *ClientConn) getDeleteExecDB(tokens []string, tokensLen int) (*ExecuteDB
}

//get the execute database for insert or replace sql
func (c *ClientConn) getInsertOrReplaceExecDB(tokens []string, tokensLen int) (*ExecuteDB, error) {
func (c *ClientConn) getInsertOrReplaceExecDB(sql string, tokens []string, tokensLen int) (*ExecuteDB, error) {
executeDB := new(ExecuteDB)
executeDB.sql = sql
schema := c.proxy.schema
rules := schema.rule.Rules

Expand All @@ -310,8 +318,9 @@ func (c *ClientConn) getInsertOrReplaceExecDB(tokens []string, tokensLen int) (*
}

//get the execute database for update sql
func (c *ClientConn) getUpdateExecDB(tokens []string, tokensLen int) (*ExecuteDB, error) {
func (c *ClientConn) getUpdateExecDB(sql string, tokens []string, tokensLen int) (*ExecuteDB, error) {
executeDB := new(ExecuteDB)
executeDB.sql = sql
schema := c.proxy.schema
rules := schema.rule.Rules

Expand All @@ -335,8 +344,9 @@ func (c *ClientConn) getUpdateExecDB(tokens []string, tokensLen int) (*ExecuteDB
}

//get the execute database for set sql
func (c *ClientConn) getSetExecDB(tokens []string, tokensLen int, sql string) (*ExecuteDB, error) {
func (c *ClientConn) getSetExecDB(sql string, tokens []string, tokensLen int) (*ExecuteDB, error) {
executeDB := new(ExecuteDB)
executeDB.sql = sql

//handle three styles:
//set autocommit= 0
Expand Down Expand Up @@ -371,14 +381,54 @@ func (c *ClientConn) getSetExecDB(tokens []string, tokensLen int, sql string) (*

//get the execute database for show sql
//choose slave preferentially
func (c *ClientConn) getShowExecDB(tokens []string, tokensLen int) (*ExecuteDB, error) {
//tokens[0] is show
func (c *ClientConn) getShowExecDB(sql string, tokens []string, tokensLen int) (*ExecuteDB, error) {
executeDB := new(ExecuteDB)
executeDB.IsSlave = true
executeDB.sql = sql

err := c.setExecuteNode(tokens, tokensLen, executeDB)
//handle show columns/fields
err := c.handleShowColumns(sql, tokens, tokensLen, executeDB)
if err != nil {
return nil, err
}

err = c.setExecuteNode(tokens, tokensLen, executeDB)
if err != nil {
return nil, err
}

return executeDB, nil
}

//handle show columns/fields
func (c *ClientConn) handleShowColumns(sql string, tokens []string,
tokensLen int, executeDB *ExecuteDB) error {

for i := 0; i < tokensLen; i++ {
//handle SQL:
//SHOW [FULL] COLUMNS FROM tbl_name [FROM db_name] [like_or_where]
if (tokens[i] == mysql.TK_STR_FIELDS ||
tokens[i] == mysql.TK_STR_COLUMNS) &&
i+2 < tokensLen {
if tokens[i+1] == mysql.TK_STR_FROM {
tableName := strings.Trim(tokens[i+2], "`")
showRouter := c.schema.rule
showRule := showRouter.GetRule(tableName)
//this SHOW is sharding SQL
if showRule.Type != router.DefaultRuleType {
if 0 < len(showRule.SubTableIndexs) {
tableIndex := showRule.SubTableIndexs[0]
nodeIndex := showRule.TableToNode[tableIndex]
nodeName := showRule.Nodes[nodeIndex]
tokens[i+2] = fmt.Sprintf("%s_%04d", tableName, tableIndex)
executeDB.sql = strings.Join(tokens, " ")
executeDB.ExecNode = c.schema.nodes[nodeName]
return nil
}
}
}
}
}
return nil
}

0 comments on commit af06e9f

Please sign in to comment.