Skip to content

Commit a28f50b

Browse files
committed
backend refactor
1 parent 218a972 commit a28f50b

File tree

7 files changed

+170
-720
lines changed

7 files changed

+170
-720
lines changed

backend/backend_conn.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ type Conn struct {
5353
pkgErr error
5454
}
5555

56+
func (c *Conn) GetTCPConnect() net.Conn {
57+
return c.conn
58+
}
59+
5660
func (c *Conn) Connect(addr string, user string, password string, db string) error {
5761
c.addr = addr
5862
c.user = user
@@ -133,6 +137,14 @@ func (c *Conn) Close() error {
133137
return nil
134138
}
135139

140+
func (c *Conn) Read() ([]byte, error) {
141+
return c.readPacket()
142+
}
143+
144+
func (c *Conn) Write(data []byte) error {
145+
return c.writePacket(data)
146+
}
147+
136148
func (c *Conn) readPacket() ([]byte, error) {
137149
d, err := c.pkg.ReadPacket()
138150
c.pkgErr = err

backend/db.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ func (db *DB) PopConn() (*Conn, error) {
247247
if cacheConns == nil || idleConns == nil {
248248
return nil, errors.ErrDatabaseClose
249249
}
250+
250251
co = db.GetConnFromCache(cacheConns)
251252
if co == nil {
252253
co, err = db.GetConnFromIdle(cacheConns, idleConns)

mysql/charset.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,7 @@ var CollationNames = map[string]CollationId{
549549
}
550550

551551
const (
552-
DEFAULT_CHARSET = "utf8"
553-
DEFAULT_COLLATION_ID CollationId = 33
554-
DEFAULT_COLLATION_NAME string = "utf8_general_ci"
552+
DEFAULT_CHARSET = "utf8mb4"
553+
DEFAULT_COLLATION_ID CollationId = 224
554+
DEFAULT_COLLATION_NAME string = "utf8mb4_unicode_ci"
555555
)

mysql/const.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ const (
1818
MinProtocolVersion byte = 10
1919
MaxPayloadLen int = 1<<24 - 1
2020
TimeFormat string = "2006-01-02 15:04:05"
21-
ServerVersion string = "5.6.20-kingshard"
21+
ServerVersion string = "0.1-anywhere-mysql-load-balancer"
2222
)
2323

2424
const (

proxy/server/conn.go

Lines changed: 97 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ package server
1717
import (
1818
"bytes"
1919
"encoding/binary"
20-
"fmt"
2120
"net"
2221
"runtime"
2322
"sync"
@@ -57,10 +56,6 @@ type ClientConn struct {
5756

5857
lastInsertId int64
5958
affectedRows int64
60-
61-
// stmtId uint32
62-
63-
// stmts map[uint32]*Stmt //prepare相关,client端到proxy的stmt
6459
}
6560

6661
var DEFAULT_CAPABILITY uint32 = mysql.CLIENT_LONG_PASSWORD | mysql.CLIENT_LONG_FLAG |
@@ -69,28 +64,6 @@ var DEFAULT_CAPABILITY uint32 = mysql.CLIENT_LONG_PASSWORD | mysql.CLIENT_LONG_F
6964

7065
var baseConnId uint32 = 10000
7166

72-
func (c *ClientConn) IsAllowConnect() bool {
73-
clientHost, _, err := net.SplitHostPort(c.c.RemoteAddr().String())
74-
if err != nil {
75-
fmt.Println(err)
76-
}
77-
clientIP := net.ParseIP(clientHost)
78-
79-
ipVec := c.proxy.allowips[c.proxy.allowipsIndex]
80-
if ipVecLen := len(ipVec); ipVecLen == 0 {
81-
return true
82-
}
83-
for _, ip := range ipVec {
84-
if ip.Equal(clientIP) {
85-
return true
86-
}
87-
}
88-
89-
golog.Error("server", "IsAllowConnect", "error", mysql.ER_ACCESS_DENIED_ERROR,
90-
"ip address", c.c.RemoteAddr().String(), " access denied by kindshard.")
91-
return false
92-
}
93-
9467
func (c *ClientConn) Handshake() error {
9568
if err := c.writeInitialHandshake(); err != nil {
9669
golog.Error("server", "Handshake", err.Error(),
@@ -248,14 +221,49 @@ func (c *ClientConn) readHandshakeResponse() error {
248221

249222
}
250223

251-
golog.Error("handshake ", "response", "db ", 0, db)
252224
if err := c.useDB(db); err != nil {
253225
return err
254226
}
255227

256228
return nil
257229
}
258230

231+
func (c *ClientConn) DoStreamRoute() (err error) {
232+
srcConn := c.c.(*net.TCPConn)
233+
golog.Info("ClientConn", "Run", "address Info client end", 0, srcConn.RemoteAddr().String(), srcConn.LocalAddr().String())
234+
235+
//find default backend
236+
node := c.proxy.GetNode("node1")
237+
backendConn, err := c.getBackendConn(node, true)
238+
dstConn := backendConn.Conn.GetTCPConnect().(*net.TCPConn)
239+
240+
golog.Info("ClientConn", "Run", "address Info backend ", 0, dstConn.RemoteAddr().String(), dstConn.LocalAddr().String())
241+
if err != nil {
242+
golog.Error("ClientConn", "Run getBackendConn", err.Error(), 0)
243+
}
244+
245+
var buf bytes.Buffer
246+
n, err := buf.ReadFrom(srcConn)
247+
248+
if err != nil {
249+
golog.Error("ClientConn", "Run", "route send bytes error", 0, n)
250+
}
251+
252+
golog.Info("ClientConn", "Run", "proxy send server bytes", 0, n)
253+
254+
n, err = buf.WriteTo(dstConn)
255+
256+
if err != nil {
257+
golog.Error("ClientConn", "Run", "route receive bytes error", 0, n)
258+
}
259+
260+
golog.Info("ClientConn", "Run", "proxy receive server bytes", 0, n)
261+
buf.Reset()
262+
buf.ReadFrom(dstConn)
263+
buf.WriteTo(srcConn)
264+
return
265+
}
266+
259267
func (c *ClientConn) Run() {
260268
defer func() {
261269
r := recover()
@@ -268,24 +276,34 @@ func (c *ClientConn) Run() {
268276
err.Error(), 0,
269277
"stack", string(buf))
270278
}
271-
272279
c.Close()
273280
}()
274281

275282
for {
276-
data, err := c.readPacket()
283+
// if this client connection have not set the route for specific ID
284+
// TODO find route
277285

278-
if err != nil {
279-
return
280-
}
286+
// now just do default route
287+
err := c.DoStreamRoute()
281288

282-
if err := c.dispatch(data); err != nil {
283-
c.proxy.counter.IncrErrLogTotal()
284-
golog.Error("server", "Run",
285-
err.Error(), c.connectionId,
286-
)
287-
c.writeError(err)
289+
if err != nil {
290+
golog.Error("ClientConn", "Run", "route btyes error", c.connectionId, err.Error())
288291
}
292+
//
293+
// golog.Info("ClientConn", "Run", "route btyes", 0, n)
294+
295+
// data, err := c.readPacket()
296+
// if err != nil {
297+
// return
298+
// }
299+
//
300+
// if err := c.dispatch(data); err != nil {
301+
// c.proxy.counter.IncrErrLogTotal()
302+
// golog.Error("server", "Run",
303+
// err.Error(), c.connectionId,
304+
// )
305+
// c.writeError(err)
306+
// }
289307

290308
if c.closed {
291309
return
@@ -297,50 +315,48 @@ func (c *ClientConn) Run() {
297315

298316
func (c *ClientConn) dispatch(data []byte) error {
299317
c.proxy.counter.IncrClientQPS()
300-
cmd := data[0]
301-
data = data[1:]
302-
303-
switch cmd {
304-
// case mysql.COM_QUIT:
305-
// c.Close()
306-
// return nil
307-
case mysql.COM_QUERY:
308-
golog.Warn("ClientConn", "dispatch", "query", 0, hack.String(data))
309-
node := c.proxy.GetNode("node1")
310-
co, err := c.getBackendConn(node, true)
311-
res, err := co.Execute(hack.String(data))
312-
if err != nil {
313-
return err
314-
}
315-
c.writeResultset(res.Status, res.Resultset)
316-
// return c.handleQuery(hack.String(data))
317-
// case mysql.COM_PING:
318-
// return c.writeOK(nil)
319-
// case mysql.COM_INIT_DB:
320-
// if err := c.useDB(hack.String(data)); err != nil {
318+
//cmd := data[0]
319+
//data = data[1:]
320+
if len(hack.String(data)) == 0 {
321+
golog.Warn("ClientConn", "dispatch", "skip empty query", 0)
322+
}
323+
// switch cmd {
324+
// case mysql.COM_QUERY:
325+
// golog.Info("ClientConn", "dispatch", "query", 0, hack.String(data))
326+
// node := c.proxy.GetNode("node1")
327+
// co, err := c.getBackendConn(node, true)
328+
// res, err := co.Execute(hack.String(data))
329+
// if err != nil {
321330
// return err
322-
// } else {
323-
// return c.writeOK(nil)
324331
// }
325-
// case mysql.COM_FIELD_LIST:
326-
// return c.handleFieldList(data)
327-
// case mysql.COM_STMT_PREPARE:
328-
// return c.handleStmtPrepare(hack.String(data))
329-
// case mysql.COM_STMT_EXECUTE:
330-
// return c.handleStmtExecute(data)
331-
// case mysql.COM_STMT_CLOSE:
332-
// return c.handleStmtClose(data)
333-
// case mysql.COM_STMT_SEND_LONG_DATA:
334-
// return c.handleStmtSendLongData(data)
335-
// case mysql.COM_STMT_RESET:
336-
// return c.handleStmtReset(data)
337-
// case mysql.COM_SET_OPTION:
338-
// return c.writeEOF(0)
339-
default:
340-
msg := fmt.Sprintf("command %d not supported now, data is %s", cmd, string(data))
341-
golog.Error("ClientConn", "dispatch", msg, 0)
342-
return mysql.NewError(mysql.ER_UNKNOWN_ERROR, msg)
332+
// c.writeResultset(res.Status, res.Resultset)
333+
// default:
334+
// msg := fmt.Sprintf("command %d not supported now, data is %s", cmd, string(data))
335+
// golog.Error("ClientConn", "dispatch", msg, 0)
336+
// return mysql.NewError(mysql.ER_UNKNOWN_ERROR, msg)
337+
// }
338+
339+
golog.Info("ClientConn", "dispatch", "query", 0, hack.String(data))
340+
node := c.proxy.GetNode("node1")
341+
backendConn, err := c.getBackendConn(node, true)
342+
backendConn.UseDB(c.db)
343+
344+
// res, err := co.Execute(hack.String(data))
345+
err = backendConn.Write(data)
346+
if err != nil {
347+
return err
343348
}
349+
result, err := backendConn.Read()
350+
if err != nil {
351+
return err
352+
}
353+
354+
c.writePacket(result)
355+
// if res.Resultset != nil {
356+
// c.writeResultset(res.Status, res.Resultset)
357+
// } else {
358+
// c.writeOK(res)
359+
// }
344360

345361
return nil
346362
}
@@ -362,7 +378,7 @@ func (c *ClientConn) useDB(db string) error {
362378
// if err = co.UseDB(db); err != nil {
363379
// return err
364380
// }
365-
// c.db = db
381+
c.db = db
366382
return nil
367383
}
368384

0 commit comments

Comments
 (0)