Skip to content

Commit

Permalink
benchmark: add io test case
Browse files Browse the repository at this point in the history
  • Loading branch information
lonng committed Aug 3, 2017
1 parent 4a6eb8e commit e6014a8
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 25 deletions.
65 changes: 52 additions & 13 deletions benchmark/io/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io
import (
"log"
"net"
"sync"

"github.com/golang/protobuf/proto"
"github.com/lonnng/nano/codec"
Expand Down Expand Up @@ -32,13 +33,19 @@ type (
Callback func(data interface{})

Connector struct {
conn net.Conn // low-level connection
codec *codec.Decoder // decoder
die chan struct{} // connector close channel
chSend chan []byte // send queue
mid uint // message id
events map[string]Callback
responses map[uint]Callback
conn net.Conn // low-level connection
codec *codec.Decoder // decoder
die chan struct{} // connector close channel
chSend chan []byte // send queue
mid uint // message id

// events handler
muEvents sync.RWMutex
events map[string]Callback

// response handler
muResponses sync.RWMutex
responses map[uint]Callback

connectedCallback func() // connected callback
}
Expand Down Expand Up @@ -91,9 +98,9 @@ func (c *Connector) Request(route string, v proto.Message, callback Callback) er
Data: data,
}

c.responses[c.mid] = callback
c.setResponseHandler(c.mid, callback)
if err := c.sendMessage(msg); err != nil {
delete(c.responses, c.mid)
c.setResponseHandler(c.mid, nil)
return err
}

Expand All @@ -115,6 +122,9 @@ func (c *Connector) Notify(route string, v proto.Message) error {
}

func (c *Connector) On(event string, callback Callback) {
c.muEvents.Lock()
defer c.muEvents.Unlock()

c.events[event] = callback
}

Expand All @@ -123,12 +133,41 @@ func (c *Connector) Close() {
close(c.die)
}

func (c *Connector) eventHandler(event string) (Callback, bool) {
c.muEvents.RLock()
defer c.muEvents.RUnlock()

cb, ok := c.events[event]
return cb, ok
}

func (c *Connector) responseHandler(mid uint) (Callback, bool) {
c.muResponses.RLock()
defer c.muResponses.RUnlock()

cb, ok := c.responses[mid]
return cb, ok
}

func (c *Connector) setResponseHandler(mid uint, cb Callback) {
c.muResponses.Lock()
defer c.muResponses.Unlock()

if cb == nil {
delete(c.responses, mid)
} else {
c.responses[mid] = cb
}
}

func (c *Connector) sendMessage(msg *message.Message) error {
data, err := msg.Encode()
if err != nil {
return err
}

//log.Printf("%+v",msg)

payload, err := codec.Encode(packet.Data, data)
if err != nil {
return err
Expand Down Expand Up @@ -162,7 +201,7 @@ func (c *Connector) send(data []byte) {
}

func (c *Connector) read() {
buf := make([]byte, 512)
buf := make([]byte, 2048)

for {
n, err := c.conn.Read(buf)
Expand Down Expand Up @@ -207,7 +246,7 @@ func (c *Connector) processPacket(p *packet.Packet) {
func (c *Connector) processMessage(msg *message.Message) {
switch msg.Type {
case message.Push:
cb, ok := c.events[msg.Route]
cb, ok := c.eventHandler(msg.Route)
if !ok {
log.Println("event handler not found", msg.Route)
return
Expand All @@ -216,14 +255,14 @@ func (c *Connector) processMessage(msg *message.Message) {
cb(msg.Data)

case message.Response:
cb, ok := c.responses[msg.ID]
cb, ok := c.responseHandler(msg.ID)
if !ok {
log.Println("response handler not found", msg.ID)
return
}

cb(msg.Data)
delete(c.responses, msg.ID)
c.setResponseHandler(msg.ID, nil)
}
}

Expand Down
29 changes: 18 additions & 11 deletions benchmark/io/io_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io

import (
"log"
"os"
"os/signal"
"sync/atomic"
Expand All @@ -13,12 +14,11 @@ import (
"github.com/lonnng/nano/component"
"github.com/lonnng/nano/serialize/protobuf"
"github.com/lonnng/nano/session"
"log"
)

const (
addr = "127.0.0.1:3250" // local address
conc = 100 // concurrent client count
conc = 1000 // concurrent client count
)

type TestHandler struct {
Expand All @@ -40,7 +40,7 @@ func (h *TestHandler) AfterInit() {

func (h *TestHandler) Ping(s *session.Session, data *testdata.Ping) error {
atomic.AddInt32(&h.metrics, 1)
return s.Response(&testdata.Pong{Content: data.Content})
return s.Push("pong", &testdata.Pong{Content: data.Content})
}

func server() {
Expand All @@ -57,21 +57,28 @@ func client() {
panic(err)
}

chReady := make(chan struct{})
c.OnConnected(func() {
var i = 0
for i < 10 {
i++
c.Request("TestHandler.Ping", &testdata.Ping{}, func(data interface{}) {
println("pong")
})
}
chReady <- struct{}{}
})

c.On("pong", func(data interface{}) {})

<-chReady
for {
c.Notify("TestHandler.Ping", &testdata.Ping{})
time.Sleep(10 * time.Millisecond)
}
}

func TestIO(t *testing.T) {
go server()
go client()

for i := 0; i < conc; i++ {
go client()
}

//nano.EnableDebug()
log.SetFlags(log.LstdFlags | log.Llongfile)

sg := make(chan os.Signal)
Expand Down
2 changes: 1 addition & 1 deletion handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (h *handlerService) processMessage(session *session.Session, msg *message.M
}

if env.debug {
log.Println(fmt.Sprintf("Uid=%d, Message={%s}, Data=%+v", session.Uid, msg.String(), data))
log.Println(fmt.Sprintf("Uid=%d, Message={%s}, Data=%+v", session.Uid(), msg.String(), data))
}

args := []reflect.Value{handler.Receiver, reflect.ValueOf(session), reflect.ValueOf(data)}
Expand Down

0 comments on commit e6014a8

Please sign in to comment.