Skip to content

Commit

Permalink
Merge remote-tracking branch 'flike/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
holys committed Aug 19, 2015
2 parents 3f80dc8 + 8ad5392 commit 2898f3b
Show file tree
Hide file tree
Showing 27 changed files with 1,047 additions and 357 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ kingshard is a high-performance proxy for MySQL powered by Go. Just like other m
- splits reads and writes
- sharding table across multiple nodes
- client's ip ACL control.
- transaction in single node.
- supports prepared statement: COM_STMT_PREPARE, COM_STMT_EXECUTE, etc.
- MySQL HA

Expand Down
12 changes: 9 additions & 3 deletions README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,24 @@ kingshard是一个由Go开发高性能MySQL Proxy项目,kingshard在满足基
4. 平滑上线DB或下线DB,前端应用无感知。
5. 支持多个slave,slave之间通过权值进行负载均衡。
6. 支持强制读主库。
7. 支持将sql发送到特定的node。
8. 支持在单个node上执行事务,不支持跨多个node执行事务。

## kinshard详细说明

[1.安装kingshard](./doc/KingDoc/kingshard_install_document.md)

[2.kingshard 快速入门](./doc/KingDoc/kingshard_quick_try.md)

[3.kingshard sharding介绍](./doc/KingDoc/kingshard_sharding_introduce.md)
[3.kingshard架构设计和功能实现](./doc/KingDoc/architecture_of_kingshard_CN.md)

[4.功能FAQ](./doc/KingDoc/function_FAQ.md)
[4.kingshard sharding介绍](./doc/KingDoc/kingshard_sharding_introduce.md)

[5.ChangeLog](./doc/KingDoc/change_log_CN.md)
[5.功能FAQ](./doc/KingDoc/function_FAQ.md)

[6.管理端命令介绍](./doc/KingDoc/admin_command_introduce.md)

[7.ChangeLog](./doc/KingDoc/change_log_CN.md)

## 反馈
目前kingshard还是1.0版本,比较核心的功能已经实现了。但还有很多地方不完善。如果您在使用kingshard的过程中发现BUG或者有新的功能需求,非常欢迎您发邮件至flikecn#126.com与作者取得联系,或者加入QQ群(147926796)交流。
22 changes: 18 additions & 4 deletions backend/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package backend

import (
"container/list"
"fmt"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -49,9 +48,24 @@ func (db *DB) Addr() string {
return db.addr
}

func (db *DB) String() string {
return fmt.Sprintf("%s:%s@%s/%s?maxIdleConns=%v",
db.user, db.password, db.addr, db.db, db.maxIdleConns)
func (db *DB) State() string {
var state string
switch db.state {
case Up:
state = "up"
case Down:
state = "down"
case Unknown:
state = "unknow"
}
return state
}

func (db *DB) IdleConnCount() int {
db.Lock()
defer db.Unlock()

return db.idleConns.Len()
}

func (db *DB) Close() error {
Expand Down
73 changes: 60 additions & 13 deletions backend/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ type Node struct {
LastSlavePing int64
}

func (n *Node) Run() {
func (n *Node) CheckNode() {
//to do
//1 check connection alive
//2 check remove mysql server alive

n.checkMaster()
n.checkSlave()

t := time.NewTicker(3000 * time.Second)
t := time.NewTicker(30 * time.Second)
defer t.Stop()

n.LastMasterPing = time.Now().Unix()
Expand All @@ -61,15 +61,6 @@ func (n *Node) String() string {
return n.Cfg.Name
}

func (n *Node) FormatSlave() string {
s := make([]byte, 0, 16)
for _, v := range n.Slave {
s = append(s, []byte(v.addr)...)
s = append(s, []byte("\n")...)
}
return string(s)
}

func (n *Node) GetMasterConn() (*BackendConn, error) {
db := n.Master
if db == nil {
Expand Down Expand Up @@ -114,6 +105,8 @@ func (n *Node) checkMaster() {
golog.Error("Node", "checkMaster", "Ping", 0, "db.Addr", db.Addr(), "error", err.Error())
} else {
n.LastMasterPing = time.Now().Unix()
atomic.StoreInt32(&(db.state), Up)
return
}

if int64(n.DownAfterNoAlive) > 0 && time.Now().Unix()-n.LastMasterPing > int64(n.DownAfterNoAlive) {
Expand Down Expand Up @@ -142,19 +135,73 @@ func (n *Node) checkSlave() {
golog.Error("Node", "checkSlave", "Ping", 0, "db.Addr", slaves[i].Addr(), "error", err.Error())
} else {
n.LastSlavePing = time.Now().Unix()
atomic.StoreInt32(&(slaves[i].state), Up)
continue
}

if int64(n.DownAfterNoAlive) > 0 && time.Now().Unix()-n.LastSlavePing > int64(n.DownAfterNoAlive) {
golog.Info("Node", "checkMaster", "Master down", 0,
"db.Addr", slaves[i].Addr(),
"slave_down_time", int64(n.DownAfterNoAlive/time.Second))
//If can't ping slave after DownAfterNoAlive set slave Down
//If can't ping slave after DownAfterNoAlive, set slave Down
n.DownSlave(slaves[i].addr)
}
}

}

func (n *Node) AddSlave(addr string) error {
var weight int
var err error
if len(addr) == 0 {
return errors.ErrAddressNull
}
n.Lock()
defer n.Unlock()
addrAndWeight := strings.Split(addr, WeightSplit)
if len(addrAndWeight) == 2 {
weight, err = strconv.Atoi(addrAndWeight[1])
if err != nil {
return err
}
} else {
weight = 1
}
n.SlaveWeights = append(n.SlaveWeights, weight)
db := n.OpenDB(addrAndWeight[0])
n.Slave = append(n.Slave, db)
n.InitBalancer()
return nil
}

func (n *Node) DeleteSlave(addr string) error {
n.Lock()
defer n.Unlock()
slaveCount := len(n.Slave)
if slaveCount == 0 {
return errors.ErrNoSlaveDb
} else if slaveCount == 1 {
n.Slave = nil
n.SlaveWeights = nil
n.RoundRobinQ = nil
return nil
}

s := make([]*DB, 0, slaveCount-1)
sw := make([]int, 0, slaveCount-1)
for i := 0; i < slaveCount; i++ {
if n.Slave[i].addr != addr {
s = append(s, n.Slave[i])
sw = append(sw, n.SlaveWeights[i])
}
}

n.Slave = s
n.SlaveWeights = sw
n.InitBalancer()
return nil
}

func (n *Node) OpenDB(addr string) *DB {
db := Open(addr, n.Cfg.User, n.Cfg.Password, "")

Expand Down Expand Up @@ -243,7 +290,7 @@ func (n *Node) ParseMaster(masterStr string) error {
return nil
}

//slaveStr(127.0.0.1:3306@2)
//slaveStr(127.0.0.1:3306@2,192.168.0.12:3306@3)
func (n *Node) ParseSlave(slaveStr string) error {
var db *DB
var weight int
Expand Down
50 changes: 43 additions & 7 deletions cmd/kingshard/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/flike/kingshard/proxy/server"
"os"
"os/signal"
"path"
"runtime"
"strings"
"syscall"
Expand All @@ -16,9 +17,24 @@ import (
var configFile *string = flag.String("config", "/etc/kingshard.conf", "kingshard config file")
var logLevel *string = flag.String("log-level", "", "log level [debug|info|warn|error], default error")

const (
sqlLogName = "sql.log"
sysLogName = "sys.log"
MaxLogSize = 1024 * 1024 * 1024
)

const banner string = `
__ _ __ __
/ /__(_)___ ____ ______/ /_ ____ __________/ /
/ //_/ / __ \/ __ \/ ___/ __ \ / __\/ ___/ __ /
/ ,< / / / / / /_/ (__ ) / / / /_/ / / / /_/ /
/_/|_/_/_/ /_/\__, /____/_/ /_/\__,_/_/ \__,_/
/____/
`

func main() {
fmt.Print(banner)
runtime.GOMAXPROCS(runtime.NumCPU())

flag.Parse()

if len(*configFile) == 0 {
Expand All @@ -32,6 +48,25 @@ func main() {
return
}

//when the log file size greater than 1GB, kingshard will generate a new file
if len(cfg.LogPath) != 0 {
sysFilePath := path.Join(cfg.LogPath, sysLogName)
sysFile, err := golog.NewRotatingFileHandler(sysFilePath, MaxLogSize, 1)
if err != nil {
fmt.Printf("new log file error:%v\n", err.Error())
return
}
golog.GlobalSysLogger = golog.New(sysFile, golog.Lfile|golog.Ltime|golog.Llevel)

sqlFilePath := path.Join(cfg.LogPath, sqlLogName)
sqlFile, err := golog.NewRotatingFileHandler(sqlFilePath, MaxLogSize, 1)
if err != nil {
fmt.Printf("new log file error:%v\n", err.Error())
return
}
golog.GlobalSqlLogger = golog.New(sqlFile, golog.Lfile|golog.Ltime|golog.Llevel)
}

if *logLevel != "" {
setLogLevel(*logLevel)
} else {
Expand All @@ -55,7 +90,8 @@ func main() {
go func() {
sig := <-sc
golog.Info("main", "main", "Got signal", 0, "signal", sig)
golog.GlobalLogger.Close()
golog.GlobalSysLogger.Close()
golog.GlobalSqlLogger.Close()
svr.Close()
}()

Expand All @@ -65,14 +101,14 @@ func main() {
func setLogLevel(level string) {
switch strings.ToLower(level) {
case "debug":
golog.GlobalLogger.SetLevel(golog.LevelDebug)
golog.GlobalSysLogger.SetLevel(golog.LevelDebug)
case "info":
golog.GlobalLogger.SetLevel(golog.LevelInfo)
golog.GlobalSysLogger.SetLevel(golog.LevelInfo)
case "warn":
golog.GlobalLogger.SetLevel(golog.LevelWarn)
golog.GlobalSysLogger.SetLevel(golog.LevelWarn)
case "error":
golog.GlobalLogger.SetLevel(golog.LevelError)
golog.GlobalSysLogger.SetLevel(golog.LevelError)
default:
golog.GlobalLogger.SetLevel(golog.LevelError)
golog.GlobalSysLogger.SetLevel(golog.LevelError)
}
}
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type Config struct {
Addr string `yaml:"addr"`
User string `yaml:"user"`
Password string `yaml:"password"`
LogPath string `yaml:"log_path"`
LogLevel string `yaml:"log_level"`

AllowIps string `yaml:"allow_ips"`
Expand All @@ -36,7 +37,7 @@ type NodeConfig struct {
type SchemaConfig struct {
DB string `yaml:"db"`
Nodes []string `yaml:"nodes"`
RulesConifg RulesConfig `yaml:"rules"`
RulesConfig RulesConfig `yaml:"rules"`
}

//路由规则
Expand Down
18 changes: 12 additions & 6 deletions core/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ var (
ErrNoMasterDb = errors.New("no master database")
ErrNoSlaveDb = errors.New("no slave database")
ErrNoDatabase = errors.New("no database")
ErrMasterDown = errors.New("master is down")
ErrSlaveDown = errors.New("slave is down")

ErrMasterDown = errors.New("master is down")
ErrSlaveDown = errors.New("slave is down")

ErrAddressNull = errors.New("address is nil")
ErrCmdUnsupport = errors.New("command unsupport")

ErrLocationsCount = errors.New("locations count are not equal")
ErrNoCriteria = errors.New("plan have no criteria")
Expand All @@ -23,8 +27,10 @@ var (
ErrDeleteInMulti = errors.New("delete in multi node")
ErrReplaceInMulti = errors.New("replace in multi node")
ErrExecInMulti = errors.New("exec in multi node")
ErrNoPlan = errors.New("statement have no plan")
ErrUpdateKey = errors.New("routing key in update expression")
ErrStmtConvert = errors.New("statement fail to convert")
ErrConnNotEqual = errors.New("the length of conns not equal sqls")
ErrTransInMulti = errors.New("transaction in multi node")

ErrNoPlan = errors.New("statement have no plan")
ErrUpdateKey = errors.New("routing key in update expression")
ErrStmtConvert = errors.New("statement fail to convert")
ErrConnNotEqual = errors.New("the length of conns not equal sqls")
)
9 changes: 5 additions & 4 deletions core/golog/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ func GetLevel() int {
}

//全局变量
var GlobalLogger *Logger = StdLogger()
var GlobalSysLogger *Logger = StdLogger()
var GlobalSqlLogger *Logger = GlobalSysLogger

func escape(s string, filterEqual bool) string {
dest := make([]byte, 0, 2*len(s))
Expand All @@ -237,7 +238,7 @@ func escape(s string, filterEqual bool) string {
}

func OutputSql(state string, format string, v ...interface{}) {
l := GlobalLogger
l := GlobalSqlLogger
buf := l.popBuf()

if l.flag&Ltime > 0 {
Expand All @@ -263,7 +264,7 @@ func OutputSql(state string, format string, v ...interface{}) {
}

func output(level int, module string, method string, msg string, reqId uint32, args ...interface{}) {
if level < GlobalLogger.Level() {
if level < GlobalSysLogger.Level() {
return
}

Expand All @@ -282,7 +283,7 @@ func output(level int, module string, method string, msg string, reqId uint32, a
content := fmt.Sprintf(`[%s] "%s" "%s" "%s" conn_id=%d`,
module, method, msg, argsBuff.String(), reqId)

GlobalLogger.Output(3, level, content)
GlobalSysLogger.Output(3, level, content)
}

func Trace(module string, method string, msg string, reqId uint32, args ...interface{}) {
Expand Down
6 changes: 3 additions & 3 deletions core/golog/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ func TestRotatingFileLog(t *testing.T) {
t.Fatal(err)
}

GlobalLogger = New(h, Lfile|Ltime|Llevel)
GlobalLogger.SetLevel(LevelTrace)
GlobalSysLogger = New(h, Lfile|Ltime|Llevel)
GlobalSysLogger.SetLevel(LevelTrace)
Debug("log", "hello,world", "OK", 0, "fileName", fileName, "fileName2", fileName, "fileName3", fileName)

GlobalLogger.Close()
GlobalSysLogger.Close()

//os.RemoveAll(path)
}
Loading

0 comments on commit 2898f3b

Please sign in to comment.