Skip to content

Commit

Permalink
去掉Context参数,实践证明这个参数不是必须的,详情参考fastway项目的gateway.go
Browse files Browse the repository at this point in the history
  • Loading branch information
bg5sbk committed Oct 14, 2016
1 parent ceaac05 commit 5a1aa37
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 65 deletions.
55 changes: 40 additions & 15 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@ package link
import (
"io"
"net"
"strings"
"time"
)

type Context interface{}

type Protocol interface {
NewCodec(rw io.ReadWriter) (Codec, Context, error)
NewCodec(rw io.ReadWriter) (Codec, error)
}

type ProtocolFunc func(rw io.ReadWriter) (Codec, Context, error)
type ProtocolFunc func(rw io.ReadWriter) (Codec, error)

func (pf ProtocolFunc) NewCodec(rw io.ReadWriter) (Codec, Context, error) {
func (pf ProtocolFunc) NewCodec(rw io.ReadWriter) (Codec, error) {
return pf(rw)
}

Expand All @@ -32,26 +31,52 @@ func Serve(network, address string, protocol Protocol, sendChanSize int) (*Serve
return NewServer(listener, protocol, sendChanSize), nil
}

func Connect(network, address string, protocol Protocol, sendChanSize int) (*Session, Context, error) {
func Connect(network, address string, protocol Protocol, sendChanSize int) (*Session, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, nil, err
return nil, err
}
codec, ctx, err := protocol.NewCodec(conn)
codec, err := protocol.NewCodec(conn)
if err != nil {
return nil, nil, err
return nil, err
}
return NewSession(codec, sendChanSize), ctx, nil
return NewSession(codec, sendChanSize), nil
}

func ConnectTimeout(network, address string, timeout time.Duration, protocol Protocol, sendChanSize int) (*Session, Context, error) {
func ConnectTimeout(network, address string, timeout time.Duration, protocol Protocol, sendChanSize int) (*Session, error) {
conn, err := net.DialTimeout(network, address, timeout)
if err != nil {
return nil, nil, err
return nil, err
}
codec, ctx, err := protocol.NewCodec(conn)
codec, err := protocol.NewCodec(conn)
if err != nil {
return nil, nil, err
return nil, err
}
return NewSession(codec, sendChanSize), nil
}

func Accept(listener net.Listener) (net.Conn, error) {
var tempDelay time.Duration
for {
conn, err := listener.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
time.Sleep(tempDelay)
continue
}
if strings.Contains(err.Error(), "use of closed network connection") {
return nil, io.EOF
}
return nil, err
}
return conn, nil
}
return NewSession(codec, sendChanSize), ctx, nil
}
4 changes: 2 additions & 2 deletions codec/bufio.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type bufioProtocol struct {
writeBuf int
}

func (b *bufioProtocol) NewCodec(rw io.ReadWriter) (cc link.Codec, ctx link.Context, err error) {
func (b *bufioProtocol) NewCodec(rw io.ReadWriter) (cc link.Codec, err error) {
codec := new(bufioCodec)

if b.writeBuf > 0 {
Expand All @@ -39,7 +39,7 @@ func (b *bufioProtocol) NewCodec(rw io.ReadWriter) (cc link.Codec, ctx link.Cont

codec.stream.c, _ = rw.(io.Closer)

codec.base, ctx, err = b.base.NewCodec(&codec.stream)
codec.base, err = b.base.NewCodec(&codec.stream)
if err != nil {
return
}
Expand Down
5 changes: 2 additions & 3 deletions codec/fixlen.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,17 @@ func FixLen(base link.Protocol, n int, byteOrder binary.ByteOrder, maxRecv, maxS
return proto
}

func (p *FixLenProtocol) NewCodec(rw io.ReadWriter) (cc link.Codec, ctx link.Context, err error) {
func (p *FixLenProtocol) NewCodec(rw io.ReadWriter) (cc link.Codec, err error) {
codec := &fixlenCodec{
rw: rw,
FixLenProtocol: p,
}
codec.headBuf = codec.head[:p.n]

codec.base, ctx, err = p.base.NewCodec(&codec.fixlenReadWriter)
codec.base, err = p.base.NewCodec(&codec.fixlenReadWriter)
if err != nil {
return
}

cc = codec
return
}
Expand Down
4 changes: 2 additions & 2 deletions codec/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ func (j *JsonProtocol) RegisterName(name string, t interface{}) {
j.names[rt] = name
}

func (j *JsonProtocol) NewCodec(rw io.ReadWriter) (link.Codec, link.Context, error) {
func (j *JsonProtocol) NewCodec(rw io.ReadWriter) (link.Codec, error) {
codec := &jsonCodec{
p: j,
encoder: json.NewEncoder(rw),
decoder: json.NewDecoder(rw),
}
codec.closer, _ = rw.(io.Closer)
return codec, nil, nil
return codec, nil
}

type jsonIn struct {
Expand Down
10 changes: 5 additions & 5 deletions example/json_toy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ func main() {
server, err := link.Serve("tcp", "0.0.0.0:0", json, 0 /* sync send */)
checkErr(err)
addr := server.Listener().Addr().String()
go server.Serve(link.HandlerFunc(sessionLoop))
go server.Serve(link.HandlerFunc(serverSessionLoop))

client, _, err := link.Connect("tcp", addr, json, 0)
client, err := link.Connect("tcp", addr, json, 0)
checkErr(err)
clientLoop(client)
clientSessionLoop(client)
}

func sessionLoop(session *link.Session, _ link.Context, _ error) {
func serverSessionLoop(session *link.Session) {
for {
req, err := session.Receive()
checkErr(err)
Expand All @@ -42,7 +42,7 @@ func sessionLoop(session *link.Session, _ link.Context, _ error) {
}
}

func clientLoop(session *link.Session) {
func clientSessionLoop(session *link.Session) {
for i := 0; i < 10; i++ {
err := session.Send(&AddReq{
i, i,
Expand Down
45 changes: 7 additions & 38 deletions server.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,6 @@
package link

import (
"io"
"net"
"strings"
"time"
)

func Accept(listener net.Listener) (net.Conn, error) {
var tempDelay time.Duration
for {
conn, err := listener.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
time.Sleep(tempDelay)
continue
}
if strings.Contains(err.Error(), "use of closed network connection") {
return nil, io.EOF
}
return nil, err
}
return conn, nil
}
}
import "net"

type Server struct {
manager *Manager
Expand All @@ -41,15 +10,15 @@ type Server struct {
}

type Handler interface {
HandleSession(session *Session, ctx Context, err error)
HandleSession(*Session)
}

var _ Handler = HandlerFunc(nil)

type HandlerFunc func(session *Session, ctx Context, err error)
type HandlerFunc func(*Session)

func (hf HandlerFunc) HandleSession(session *Session, ctx Context, err error) {
hf(session, ctx, err)
func (f HandlerFunc) HandleSession(session *Session) {
f(session)
}

func NewServer(l net.Listener, p Protocol, sendChanSize int) *Server {
Expand All @@ -73,13 +42,13 @@ func (server *Server) Serve(handler Handler) error {
}

go func() {
codec, ctx, err := server.protocol.NewCodec(conn)
codec, err := server.protocol.NewCodec(conn)
if err != nil {
conn.Close()
return
}
session := server.manager.NewSession(codec, server.sendChanSize)
handler.HandleSession(session, ctx, nil)
handler.HandleSession(session)
}()
}
}
Expand Down

0 comments on commit 5a1aa37

Please sign in to comment.