Skip to content

Commit

Permalink
refactor: move message/packet/codec to internal package
Browse files Browse the repository at this point in the history
  • Loading branch information
lonng committed Aug 3, 2017
1 parent d1168c6 commit 9a2ddbe
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 81 deletions.
144 changes: 74 additions & 70 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import (
"sync/atomic"
"time"

"github.com/lonnng/nano/codec"
"github.com/lonnng/nano/message"
"github.com/lonnng/nano/packet"
"github.com/lonnng/nano/internal/codec"
"github.com/lonnng/nano/internal/message"
"github.com/lonnng/nano/internal/packet"
"github.com/lonnng/nano/session"
)

Expand Down Expand Up @@ -79,6 +79,77 @@ func newAgent(conn net.Conn) *agent {
return a
}

// Push, implementation for session.NetworkEntity interface
func (a *agent) Push(route string, v interface{}) error {
if a.status() == statusClosed {
return ErrBrokenPipe
}

if len(a.chSend) >= agentWriteBacklog {
return ErrBufferExceed
}

if env.debug {
log.Println(fmt.Sprintf("Type=Push, UID=%d, Route=%s, Data=%+v", a.session.Uid(), route, v))
}

a.chSend <- pendingMessage{typ: message.Push, route: route, payload: v}
return nil
}

// Response, implementation for session.NetworkEntity interface
// Response message to session
func (a *agent) Response(v interface{}) error {
if a.status() == statusClosed {
return ErrBrokenPipe
}

mid := a.session.LastRID
if mid <= 0 {
return ErrSessionOnNotify
}

if len(a.chSend) >= agentWriteBacklog {
return ErrBufferExceed
}

if env.debug {
log.Println(fmt.Sprintf("Type=Response, UID=%d, MID=%d, Data=%+v", a.session.Uid(), mid, v))
}

a.chSend <- pendingMessage{typ: message.Response, mid: mid, payload: v}
return nil
}

// Close, implementation for session.NetworkEntity interface
// Close closes the agent, clean inner state and close low-level connection.
// Any blocked Read or Write operations will be unblocked and return errors.
func (a *agent) Close() error {
if a.status() == statusClosed {
return ErrClosedSession
}
a.setStatus(statusClosed)

if env.debug {
log.Println(fmt.Sprintf("Session closed, Id=%d, IP=%s", a.session.ID(), a.conn.RemoteAddr()))
}

// close all channel
close(a.chDie)
return a.conn.Close()
}

// RemoteAddr, implementation for session.NetworkEntity interface
// returns the remote network address.
func (a *agent) RemoteAddr() net.Addr {
return a.conn.RemoteAddr()
}

// String, implementation for Stringer interface
func (a *agent) String() string {
return fmt.Sprintf("Remote=%s, LastTime=%d", a.conn.RemoteAddr().String(), a.lastAt)
}

func (a *agent) status() int32 {
return atomic.LoadInt32(&a.state)
}
Expand Down Expand Up @@ -151,70 +222,3 @@ func (a *agent) write() {
}
}
}

func (a *agent) Push(route string, v interface{}) error {
if a.status() == statusClosed {
return ErrBrokenPipe
}

if len(a.chSend) >= agentWriteBacklog {
return ErrBufferExceed
}

if env.debug {
log.Println(fmt.Sprintf("Type=Push, UID=%d, Route=%s, Data=%+v", a.session.Uid(), route, v))
}

a.chSend <- pendingMessage{typ: message.Push, route: route, payload: v}
return nil
}

// Response message to session
func (a *agent) Response(v interface{}) error {
if a.status() == statusClosed {
return ErrBrokenPipe
}

mid := a.session.LastRID
if mid <= 0 {
return ErrSessionOnNotify
}

if len(a.chSend) >= agentWriteBacklog {
return ErrBufferExceed
}

if env.debug {
log.Println(fmt.Sprintf("Type=Response, UID=%d, MID=%d, Data=%+v", a.session.Uid(), mid, v))
}

a.chSend <- pendingMessage{typ: message.Response, mid: mid, payload: v}
return nil
}

// Close closes the agent, clean inner state and close low-level connection.
// Any blocked Read or Write operations will be unblocked and return errors.
func (a *agent) Close() error {
if a.status() == statusClosed {
return ErrClosedSession
}
a.setStatus(statusClosed)

if env.debug {
log.Println(fmt.Sprintf("Session closed, Id=%d, IP=%s", a.session.ID(), a.conn.RemoteAddr()))
}

// close all channel
close(a.chDie)
return a.conn.Close()
}

// RemoteAddr returns the remote network address.
func (a *agent) RemoteAddr() net.Addr {
return a.conn.RemoteAddr()
}

// String, implementation for Stringer interface
func (a *agent) String() string {
return fmt.Sprintf("Remote=%s, LastTime=%d", a.conn.RemoteAddr().String(), a.lastAt)
}
6 changes: 3 additions & 3 deletions benchmark/io/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"sync"

"github.com/golang/protobuf/proto"
"github.com/lonnng/nano/codec"
"github.com/lonnng/nano/message"
"github.com/lonnng/nano/packet"
"github.com/lonnng/nano/internal/codec"
"github.com/lonnng/nano/internal/message"
"github.com/lonnng/nano/internal/packet"
)

var (
Expand Down
7 changes: 4 additions & 3 deletions benchmark/io/io_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
)

const (
addr = "127.0.0.1:3250" // local address
conc = 1000 // concurrent client count
addr = "127.0.0.1:13250" // local address
conc = 200 // concurrent client count
)

type TestHandler struct {
Expand Down Expand Up @@ -74,11 +74,12 @@ func client() {
func TestIO(t *testing.T) {
go server()

// wait server startup
time.Sleep(1 * time.Second)
for i := 0; i < conc; i++ {
go client()
}

//nano.EnableDebug()
log.SetFlags(log.LstdFlags | log.Llongfile)

sg := make(chan os.Signal)
Expand Down
6 changes: 3 additions & 3 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (
"reflect"
"time"

"github.com/lonnng/nano/codec"
"github.com/lonnng/nano/internal/codec"
"github.com/lonnng/nano/component"
"github.com/lonnng/nano/message"
"github.com/lonnng/nano/packet"
"github.com/lonnng/nano/internal/message"
"github.com/lonnng/nano/internal/packet"
"github.com/lonnng/nano/session"
)

Expand Down
2 changes: 1 addition & 1 deletion handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/golang/protobuf/proto"
"github.com/lonnng/nano/component"
"github.com/lonnng/nano/message"
"github.com/lonnng/nano/internal/message"
"github.com/lonnng/nano/serialize/json"
"github.com/lonnng/nano/serialize/protobuf"
"github.com/lonnng/nano/session"
Expand Down
2 changes: 1 addition & 1 deletion codec/codec.go → internal/codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"bytes"
"errors"

"github.com/lonnng/nano/packet"
"github.com/lonnng/nano/internal/packet"
)

const HeadLength = 4
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 comments on commit 9a2ddbe

Please sign in to comment.