Skip to content

Commit 138f5e8

Browse files
committed
bug fix
1 parent a28f50b commit 138f5e8

File tree

6 files changed

+122
-80
lines changed

6 files changed

+122
-80
lines changed

backend/backend_conn.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,18 @@ func (c *Conn) writeCommand(command byte) error {
294294
})
295295
}
296296

297+
func (c *Conn) SendRawBytes(raw []byte) error {
298+
c.pkg.Sequence = 0
299+
length := len(raw)
300+
data := make([]byte, length+4)
301+
copy(data[4:], raw)
302+
return c.writePacket(data)
303+
}
304+
305+
func (c *Conn) ReadRawBytes() ([]byte, error) {
306+
return c.readPacket()
307+
}
308+
297309
func (c *Conn) writeCommandBuf(command byte, arg []byte) error {
298310
c.pkg.Sequence = 0
299311

build.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
go build mysql-load-balancer.go
1+
go build -gcflags "-N -l" mysql-load-balancer.go

ks.yaml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,25 @@ nodes :
4444
# master represents a real mysql master server
4545
master : mycnpvgvb1ep016.sapanywhere.sap.corp:3306
4646

47+
# slave represents a real mysql salve server,and the number after '@' is
48+
# read load weight of this slave.
49+
slaveuser : root
50+
slavepassword : Initial0
51+
slave : cnpvgvb1ep118.pvgl.sap.corp:3306
52+
down_after_noalive : 32
53+
-
54+
name : node2
55+
56+
# default max conns for mysql server
57+
max_conns_limit : 32
58+
59+
# master user and password
60+
user : root
61+
password : Initial0
62+
63+
# master represents a real mysql master server
64+
master : mycnpvgvb1ep016.sapanywhere.sap.corp:3306
65+
4766
# slave represents a real mysql salve server,and the number after '@' is
4867
# read load weight of this slave.
4968
slaveuser : root

mysql/packetio.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@ func (p *PacketIO) ReadPacket() ([]byte, error) {
5454
sequence := uint8(header[3])
5555

5656
if sequence != p.Sequence {
57-
return nil, fmt.Errorf("invalid sequence %d != %d", sequence, p.Sequence)
57+
fmt.Printf("now invalid sequence %d != %d", sequence, p.Sequence)
58+
//return nil, fmt.Errorf("invalid sequence %d != %d", sequence, p.Sequence)
5859
}
59-
6060
p.Sequence++
6161

6262
data := make([]byte, length)

proxy/server/conn.go

Lines changed: 83 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"github.com/compasses/mysql-load-balancer/backend"
2525
"github.com/compasses/mysql-load-balancer/core/golog"
2626
"github.com/compasses/mysql-load-balancer/mysql"
27-
"github.com/siddontang/mixer/hack"
2827
)
2928

3029
//client <-> proxy
@@ -229,38 +228,54 @@ func (c *ClientConn) readHandshakeResponse() error {
229228
}
230229

231230
func (c *ClientConn) DoStreamRoute() (err error) {
232-
srcConn := c.c.(*net.TCPConn)
233-
golog.Info("ClientConn", "Run", "address Info client end", 0, srcConn.RemoteAddr().String(), srcConn.LocalAddr().String())
231+
data, err := c.readPacket()
232+
if err != nil {
233+
return err
234+
}
235+
c.proxy.counter.IncrClientQPS()
236+
//process speical command
237+
cmd := data[0]
238+
switch cmd {
239+
case mysql.COM_QUIT:
240+
c.Close()
241+
return nil
242+
case mysql.COM_PING:
243+
c.writeOK(nil)
244+
return nil
245+
}
234246

235-
//find default backend
236-
node := c.proxy.GetNode("node1")
237-
backendConn, err := c.getBackendConn(node, true)
238-
dstConn := backendConn.Conn.GetTCPConnect().(*net.TCPConn)
247+
golog.Info("ClientConn", "Do Stream Route", "client read packet", c.connectionId, string(data))
248+
// get default
249+
backConn, err := c.GetBackendConn("node1")
250+
if err != nil {
251+
return err
252+
}
239253

240-
golog.Info("ClientConn", "Run", "address Info backend ", 0, dstConn.RemoteAddr().String(), dstConn.LocalAddr().String())
254+
err = backConn.SendRawBytes(data)
241255
if err != nil {
242-
golog.Error("ClientConn", "Run getBackendConn", err.Error(), 0)
256+
return err
243257
}
258+
golog.Info("ClientConn", "Do Stream Route", "backend write packet", 0, string(data))
244259

245-
var buf bytes.Buffer
246-
n, err := buf.ReadFrom(srcConn)
260+
//read Response from Server
261+
data, err = backConn.ReadRawBytes()
262+
backConn.Close()
247263

248264
if err != nil {
249-
golog.Error("ClientConn", "Run", "route send bytes error", 0, n)
265+
return err
250266
}
267+
golog.Info("ClientConn", "Do Stream Route", "backend read packet", 0, string(data))
251268

252-
golog.Info("ClientConn", "Run", "proxy send server bytes", 0, n)
253-
254-
n, err = buf.WriteTo(dstConn)
269+
//send to client
270+
// apend header
271+
withheader := make([]byte, len(data)+4)
272+
copy(withheader[4:], data)
255273

274+
err = c.writePacket(withheader)
256275
if err != nil {
257-
golog.Error("ClientConn", "Run", "route receive bytes error", 0, n)
276+
return err
258277
}
259-
260-
golog.Info("ClientConn", "Run", "proxy receive server bytes", 0, n)
261-
buf.Reset()
262-
buf.ReadFrom(dstConn)
263-
buf.WriteTo(srcConn)
278+
golog.Info("ClientConn", "Do Stream Route", "client write packet", 0, string(data))
264279
return
265280
}
266281

@@ -288,22 +303,13 @@ func (c *ClientConn) Run() {
288303

289304
if err != nil {
290305
golog.Error("ClientConn", "Run", "route btyes error", c.connectionId, err.Error())
306+
c.proxy.counter.IncrErrLogTotal()
307+
golog.Error("server", "Run",
308+
err.Error(), c.connectionId,
309+
)
310+
c.writeError(err)
311+
c.closed = true
291312
}
292-
//
293-
// golog.Info("ClientConn", "Run", "route btyes", 0, n)
294-
295-
// data, err := c.readPacket()
296-
// if err != nil {
297-
// return
298-
// }
299-
//
300-
// if err := c.dispatch(data); err != nil {
301-
// c.proxy.counter.IncrErrLogTotal()
302-
// golog.Error("server", "Run",
303-
// err.Error(), c.connectionId,
304-
// )
305-
// c.writeError(err)
306-
// }
307313

308314
if c.closed {
309315
return
@@ -314,49 +320,49 @@ func (c *ClientConn) Run() {
314320
}
315321

316322
func (c *ClientConn) dispatch(data []byte) error {
317-
c.proxy.counter.IncrClientQPS()
318-
//cmd := data[0]
319-
//data = data[1:]
320-
if len(hack.String(data)) == 0 {
321-
golog.Warn("ClientConn", "dispatch", "skip empty query", 0)
322-
}
323-
// switch cmd {
324-
// case mysql.COM_QUERY:
325-
// golog.Info("ClientConn", "dispatch", "query", 0, hack.String(data))
326-
// node := c.proxy.GetNode("node1")
327-
// co, err := c.getBackendConn(node, true)
328-
// res, err := co.Execute(hack.String(data))
329-
// if err != nil {
330-
// return err
331-
// }
332-
// c.writeResultset(res.Status, res.Resultset)
333-
// default:
334-
// msg := fmt.Sprintf("command %d not supported now, data is %s", cmd, string(data))
335-
// golog.Error("ClientConn", "dispatch", msg, 0)
336-
// return mysql.NewError(mysql.ER_UNKNOWN_ERROR, msg)
323+
// c.proxy.counter.IncrClientQPS()
324+
// //cmd := data[0]
325+
// //data = data[1:]
326+
// if len(hack.String(data)) == 0 {
327+
// golog.Warn("ClientConn", "dispatch", "skip empty query", 0)
337328
// }
338-
339-
golog.Info("ClientConn", "dispatch", "query", 0, hack.String(data))
340-
node := c.proxy.GetNode("node1")
341-
backendConn, err := c.getBackendConn(node, true)
342-
backendConn.UseDB(c.db)
343-
344-
// res, err := co.Execute(hack.String(data))
345-
err = backendConn.Write(data)
346-
if err != nil {
347-
return err
348-
}
349-
result, err := backendConn.Read()
350-
if err != nil {
351-
return err
352-
}
353-
354-
c.writePacket(result)
355-
// if res.Resultset != nil {
356-
// c.writeResultset(res.Status, res.Resultset)
357-
// } else {
358-
// c.writeOK(res)
329+
// // switch cmd {
330+
// // case mysql.COM_QUERY:
331+
// // golog.Info("ClientConn", "dispatch", "query", 0, hack.String(data))
332+
// // node := c.proxy.GetNode("node1")
333+
// // co, err := c.getBackendConn(node, true)
334+
// // res, err := co.Execute(hack.String(data))
335+
// // if err != nil {
336+
// // return err
337+
// // }
338+
// // c.writeResultset(res.Status, res.Resultset)
339+
// // default:
340+
// // msg := fmt.Sprintf("command %d not supported now, data is %s", cmd, string(data))
341+
// // golog.Error("ClientConn", "dispatch", msg, 0)
342+
// // return mysql.NewError(mysql.ER_UNKNOWN_ERROR, msg)
343+
// // }
344+
//
345+
// golog.Info("ClientConn", "dispatch", "query", 0, hack.String(data))
346+
// node := c.proxy.GetNode("node1")
347+
// backendConn, err := c.getBackendConn(node, true)
348+
// backendConn.UseDB(c.db)
349+
//
350+
// // res, err := co.Execute(hack.String(data))
351+
// err = backendConn.Write(data)
352+
// if err != nil {
353+
// return err
359354
// }
355+
// result, err := backendConn.Read()
356+
// if err != nil {
357+
// return err
358+
// }
359+
//
360+
// c.writePacket(result)
361+
// // if res.Resultset != nil {
362+
// // c.writeResultset(res.Status, res.Resultset)
363+
// // } else {
364+
// // c.writeOK(res)
365+
// // }
360366

361367
return nil
362368
}

proxy/server/conn_query.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@ import (
1919
"github.com/flike/kingshard/core/golog"
2020
)
2121

22+
func (c *ClientConn) GetBackendConn(nodeName string) (co *backend.BackendConn, err error) {
23+
node := c.proxy.GetNode(nodeName)
24+
return c.getBackendConn(node, false)
25+
}
26+
2227
func (c *ClientConn) getBackendConn(n *backend.Node, fromSlave bool) (co *backend.BackendConn, err error) {
2328
co, err = n.GetMasterConn()
2429
if err != nil {

0 commit comments

Comments
 (0)