Skip to content

Commit fb9cf8e

Browse files
committed
refactor code
1 parent 1a65ef3 commit fb9cf8e

File tree

5 files changed

+87
-61
lines changed

5 files changed

+87
-61
lines changed

mysql-load-balancer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ var logLevel *string = flag.String("log-level", "", "log level [debug|info|warn|
3434
var version *bool = flag.Bool("v", false, "the version of kingshard")
3535

3636
const (
37-
sqlLogName = "sql.log"
38-
sysLogName = "sys.log"
37+
sqlLogName = "slow_query.log"
38+
sysLogName = "proxy.log"
3939
MaxLogSize = 1024 * 1024 * 10
4040
)
4141

mysql/util.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ func Pstack() string {
3131
return string(buf[0:n])
3232
}
3333

34+
func IsEOFPacket(data []byte) bool {
35+
return data[0] == EOF_HEADER && len(data) <= 5
36+
}
37+
3438
func CalcPassword(scramble, password []byte) []byte {
3539
if len(password) == 0 {
3640
return nil

proxy/conn.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ func (c *ClientConn) Run() {
262262
}
263263

264264
trans.Run()
265-
golog.Error("ClientConn", "Run", "Transport Finish............", c.connectionId)
265+
golog.Info("ClientConn", "Run", "Transport Finish............", c.connectionId)
266266
}
267267

268268
func (c *ClientConn) useDB(db string) error {

proxy/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ func (s *Server) onConn(c net.Conn) {
172172
}()
173173

174174
if err := conn.Handshake(); err != nil {
175-
golog.Error("server", "onConn", err.Error(), 0)
175+
golog.Info("server", "onConn", err.Error(), 0)
176176
c.Close()
177177
return
178178
}

proxy/transport.go

Lines changed: 79 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ func (trans *Transport) Run() {
7373
golog.Warn("Transport", "Run", "client error", trans.Client.cid, err.Error())
7474
return
7575
}
76-
isQuery := false
7776

7877
if len(data) > 4 {
7978
cmd := data[4]
@@ -85,16 +84,15 @@ func (trans *Transport) Run() {
8584
trans.clientend.writeOK(nil)
8685
golog.Warn("Transport", "Run", "client ping", uint32(cmd))
8786
continue
88-
// case mysql.COM_INIT_DB:
89-
// if err := trans.clientend.useDB(hack.String(data)); err != nil {
90-
// return //err
91-
// } else {
92-
// trans.clientend.writeOK(nil)
93-
// }
94-
// golog.Warn("Transport", "Run", "client change DB", uint32(cmd), string(data[5:]))
95-
// continue
96-
case mysql.COM_QUERY:
97-
isQuery = true
87+
// case mysql.COM_INIT_DB:
88+
// if err := trans.clientend.useDB(hack.String(data)); err != nil {
89+
// return //err
90+
// } else {
91+
// trans.clientend.writeOK(nil)
92+
// }
93+
// golog.Warn("Transport", "Run", "client change DB", uint32(cmd), string(data[5:]))
94+
// continue
95+
//case mysql.COM_QUERY:
9896
}
9997
golog.Debug("Transport", "Run", "client command", uint32(cmd), string(data[5:]))
10098
}
@@ -107,24 +105,14 @@ func (trans *Transport) Run() {
107105
}
108106

109107
//read response from server
110-
data, err = trans.Server.ReadServerRaw(false)
108+
data, err = trans.Server.ReadServerRaw()
111109
golog.Debug("Transport", "Run", "server read ", trans.Server.cid, data)
112110

113111
if err != nil {
114112
golog.Warn("Transport", "Run", "server read error", trans.Server.cid, err.Error())
115113
return
116114
}
117115

118-
if isQuery && data[4] != mysql.OK_HEADER {
119-
result, err := trans.Server.ReadServerRaw(true)
120-
if err != nil {
121-
golog.Warn("Transport", "Run", "server read error", trans.Server.cid, err.Error())
122-
return
123-
}
124-
data = append(data, result...)
125-
golog.Debug("Transport", "Run", "nest server read ", trans.Server.cid, data)
126-
}
127-
128116
// send to client
129117
err = trans.Client.Write(data)
130118
if err != nil {
@@ -134,20 +122,12 @@ func (trans *Transport) Run() {
134122
}
135123
}
136124

137-
func (trans *TransPipe) ReadHeader() ([]byte, error) {
125+
func (trans *TransPipe) ReadPacket() ([]byte, error) {
138126
header := []byte{0, 0, 0, 0}
139127

140128
if _, err := io.ReadFull(trans.pipe, header); err != nil {
141129
return nil, mysql.ErrBadConn
142130
}
143-
return header[:], nil
144-
}
145-
146-
func (trans *TransPipe) ReadClientRaw() ([]byte, error) {
147-
header, err := trans.ReadHeader()
148-
if err != nil {
149-
return nil, mysql.ErrBadConn
150-
}
151131
length := int(uint32(header[0]) | uint32(header[1])<<8 | uint32(header[2])<<16)
152132
if length < 1 {
153133
return nil, fmt.Errorf("invalid payload length %d", length)
@@ -159,41 +139,83 @@ func (trans *TransPipe) ReadClientRaw() ([]byte, error) {
159139
} else {
160140
if length < mysql.MaxPayloadLen {
161141
return append(header[:], data...), nil
142+
}
143+
var buf []byte
144+
buf, err = trans.ReadPacket()
145+
if err != nil {
146+
return nil, mysql.ErrBadConn
162147
} else {
163-
return nil, fmt.Errorf("invalid payload length %d", length)
148+
header = append(header[:], data...)
149+
return append(header, buf...), nil
164150
}
165151
}
166152
}
167153

168-
func (trans *TransPipe) ReadServerRaw(isNested bool) ([]byte, error) {
169-
header, err := trans.ReadHeader()
170-
if err != nil {
154+
func (trans *TransPipe) ReadHeader() ([]byte, error) {
155+
header := []byte{0, 0, 0, 0}
156+
157+
if _, err := io.ReadFull(trans.pipe, header); err != nil {
171158
return nil, mysql.ErrBadConn
172159
}
173-
length := int(uint32(header[0]) | uint32(header[1])<<8 | uint32(header[2])<<16)
174-
if length < 1 {
175-
return nil, fmt.Errorf("invalid payload length %d", length)
176-
}
177-
data := make([]byte, length)
178-
if _, err := io.ReadFull(trans.pipe, data); err != nil {
179-
return nil, mysql.ErrBadConn
180-
} else {
181-
if data[0] == mysql.OK_HEADER && !isNested {
182-
return append(header[:], data...), nil
183-
} else if data[0] == mysql.EOF_HEADER && len(data) <= 5 {
184-
return append(header[:], data...), nil
185-
} else {
186-
//need continue read until EOF
187-
var buf []byte
188-
buf, err = trans.ReadServerRaw(true)
189-
if err != nil {
190-
return nil, mysql.ErrBadConn
191-
} else {
192-
header = append(header[:], data...)
193-
return append(header, buf...), nil
194-
}
160+
return header[:], nil
161+
}
162+
163+
func (trans *TransPipe) ReadClientRaw() ([]byte, error) {
164+
return trans.ReadPacket()
165+
}
166+
167+
func (trans *TransPipe) ReadServerColumns() ([]byte, error) {
168+
//just read packet
169+
var result []byte
170+
for {
171+
data, err := trans.ReadPacket()
172+
if err != nil {
173+
return nil, err
174+
}
175+
176+
// EOF Packet
177+
if mysql.IsEOFPacket(data[4:]) {
178+
result = append(result, data...)
179+
return result, nil
195180
}
181+
result = append(result, data...)
182+
}
183+
}
184+
185+
func (trans *TransPipe) ReadServerRows() ([]byte, error) {
186+
//now just same to read columns
187+
return trans.ReadServerColumns()
188+
}
189+
190+
func (trans *TransPipe) ReadServerRaw() ([]byte, error) {
191+
data, err := trans.ReadPacket()
192+
if err != nil {
193+
return nil, err
194+
}
195+
196+
if data[4] == mysql.OK_HEADER {
197+
return data, nil
198+
}
199+
200+
// must be a result set
201+
//get column count
202+
_, _, n := mysql.LengthEncodedInt(data[4:])
203+
if n-len(data[4:]) != 0 {
204+
return nil, mysql.ErrMalformPacket
205+
}
206+
//read result columns
207+
cols, err := trans.ReadServerColumns()
208+
if err != nil {
209+
return nil, err
210+
}
211+
212+
//read result rows
213+
rows, err := trans.ReadServerRows()
214+
if err != nil {
215+
return nil, err
196216
}
217+
data = append(data, cols...)
218+
return append(data, rows...), nil
197219
}
198220

199221
func (trans *TransPipe) Write(data []byte) error {

0 commit comments

Comments
 (0)