Skip to content

Commit

Permalink
完善Session.EnableAsync()的流程
Browse files Browse the repository at this point in the history
  • Loading branch information
bg5sbk committed Nov 21, 2015
1 parent b6c4c15 commit c040e4b
Showing 1 changed file with 25 additions and 14 deletions.
39 changes: 25 additions & 14 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
)

var (
ErrClosed = errors.New("Session closed")
ErrBlocking = errors.New("Operation blocking")
ErrClosed = errors.New("link.Session closed")
ErrBlocking = errors.New("link.Session operation blocking")
)

type Session struct {
Expand All @@ -20,8 +20,8 @@ type Session struct {
decoder Decoder

// About send and receive
sendLoopFlag int32
sendChan chan interface{}
enableAsyncOnce sync.Once
sendChan chan interface{}

// About session close
closeChan chan int
Expand All @@ -41,7 +41,6 @@ func NewSession(conn net.Conn, codecType CodecType) *Session {
conn: conn,
encoder: codecType.NewEncoder(conn),
decoder: codecType.NewDecoder(conn),
closeChan: make(chan int),
closeCallbacks: list.New(),
}
return session
Expand All @@ -54,12 +53,17 @@ func (session *Session) IsClosed() bool { return atomic.LoadInt32(&session.close
func (session *Session) Close() {
if atomic.CompareAndSwapInt32(&session.closeFlag, 0, 1) {
session.invokeCloseCallbacks()
close(session.closeChan)
if session.closeChan != nil {
close(session.closeChan)
}
session.conn.Close()
}
}

func (session *Session) Receive(msg interface{}) (err error) {
if session.IsClosed() {
return ErrClosed
}
err = session.decoder.Decode(msg)
if err != nil {
session.Close()
Expand All @@ -68,20 +72,27 @@ func (session *Session) Receive(msg interface{}) (err error) {
}

func (session *Session) Send(msg interface{}) (err error) {
if session.IsClosed() {
return ErrClosed
}
err = session.encoder.Encode(msg)
if err != nil {
session.Close()
}
return
}

func (session *Session) EnableAsyncSend(sendChanSize int) {
func (session *Session) EnableAsyncSend(sendChanSize int) error {
if session.IsClosed() {
return
return ErrClosed
}
if atomic.CompareAndSwapInt32(&session.sendLoopFlag, 0, 1) {
session.enableAsyncOnce.Do(func() {
session.sendChan = make(chan interface{}, sendChanSize)
session.closeChan = make(chan int)
var wait sync.WaitGroup
wait.Add(1)
go func() {
wait.Done()
for {
select {
case msg := <-session.sendChan:
Expand All @@ -93,18 +104,18 @@ func (session *Session) EnableAsyncSend(sendChanSize int) {
}
}
}()
}
wait.Wait()
})
return nil
}

func (session *Session) AsyncSend(msg interface{}) error {
if session.IsClosed() {
return ErrClosed
}

if session.sendLoopFlag != 1 {
panic("AsyncSend not enable")
if session.sendChan == nil {
panic("Use link/Session.AsyncSend() you need invoke link/Session.EnableAsyncSend() first")
}

select {
case session.sendChan <- msg:
default:
Expand Down

0 comments on commit c040e4b

Please sign in to comment.