Skip to content

Commit

Permalink
完善Server.Stop(),去掉Session内置的收发加锁逻辑,需要并发收或者并发写可以在Codec内加锁,减少不必要的开销
Browse files Browse the repository at this point in the history
  • Loading branch information
bg5sbk committed Nov 21, 2015
1 parent f08d352 commit b6c4c15
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 37 deletions.
17 changes: 5 additions & 12 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package link
import (
"net"
"sync"
"sync/atomic"
)

type Server struct {
Expand All @@ -16,22 +15,19 @@ type Server struct {
sessionMutex sync.Mutex

// About server start and stop
stopFlag int32
stopChan chan int
stopOnce sync.Once
stopWait sync.WaitGroup

// Server state
State interface{}
}

func NewServer(listener net.Listener, codecType CodecType) *Server {
server := &Server{
return &Server{
listener: listener,
codecType: codecType,
sessions: make(map[uint64]*Session),
stopChan: make(chan int),
}
return server
}

func (server *Server) Listener() net.Listener {
Expand All @@ -46,15 +42,12 @@ func (server *Server) Accept() (*Session, error) {
return server.newSession(conn), nil
}

func (server *Server) Stop() bool {
if atomic.CompareAndSwapInt32(&server.stopFlag, 0, 1) {
func (server *Server) Stop() {
server.stopOnce.Do(func() {
server.listener.Close()
close(server.stopChan)
server.closeSessions()
server.stopWait.Wait()
return true
}
return false
})
}

func (server *Server) newSession(conn net.Conn) *Session {
Expand Down
8 changes: 0 additions & 8 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ type Session struct {
decoder Decoder

// About send and receive
recvMutex sync.Mutex
sendMutex sync.Mutex
sendLoopFlag int32
sendChan chan interface{}

Expand Down Expand Up @@ -62,9 +60,6 @@ func (session *Session) Close() {
}

func (session *Session) Receive(msg interface{}) (err error) {
session.recvMutex.Lock()
defer session.recvMutex.Unlock()

err = session.decoder.Decode(msg)
if err != nil {
session.Close()
Expand All @@ -73,9 +68,6 @@ func (session *Session) Receive(msg interface{}) (err error) {
}

func (session *Session) Send(msg interface{}) (err error) {
session.sendMutex.Lock()
defer session.sendMutex.Unlock()

err = session.encoder.Encode(msg)
if err != nil {
session.Close()
Expand Down
17 changes: 0 additions & 17 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"io"
"math/rand"
"runtime/pprof"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -88,8 +87,6 @@ func SessionTest(t *testing.T, codecType CodecType, test func(*testing.T, *Sessi

server.Stop()
serverWait.Wait()

MakeSureSessionGoroutineExit(t)
}

func BytesTest(t *testing.T, session *Session) {
Expand All @@ -108,17 +105,3 @@ func BytesTest(t *testing.T, session *Session) {
func Test_Bytes(t *testing.T) {
SessionTest(t, TestCodec{}, BytesTest)
}

func MakeSureSessionGoroutineExit(t *testing.T) {
buff := new(bytes.Buffer)
goroutines := pprof.Lookup("goroutine")

if err := goroutines.WriteTo(buff, 2); err != nil {
t.Fatalf("Dump goroutine failed: %v", err)
}

if n := bytes.Index(buff.Bytes(), []byte("link.HandlerFunc.Handle")); n >= 0 {
t.Log(buff.String())
t.Fatalf("Some handler goroutine running")
}
}

0 comments on commit b6c4c15

Please sign in to comment.