From 61bb850699a62c792268dbb6056ea0313957b671 Mon Sep 17 00:00:00 2001 From: dada Date: Thu, 20 Oct 2016 01:22:22 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AE=A9Codec=E5=8F=AF=E9=80=89=E6=8B=A9?= =?UTF-8?q?=E6=80=A7=E7=9A=84=E5=AE=9E=E7=8E=B0SendChan=E7=9A=84=E6=B8=85?= =?UTF-8?q?=E7=90=86=E5=B7=A5=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api.go | 4 ++++ session.go | 42 +++++++++++++++++++++++++++++++----------- 2 files changed, 35 insertions(+), 11 deletions(-) diff --git a/api.go b/api.go index fc34fc4..2890888 100644 --- a/api.go +++ b/api.go @@ -23,6 +23,10 @@ type Codec interface { Close() error } +type ClearSendChan interface { + ClearSendChan(<-chan interface{}) +} + func Serve(network, address string, protocol Protocol, sendChanSize int) (*Server, error) { listener, err := net.Listen(network, address) if err != nil { diff --git a/session.go b/session.go index dfeec00..0cbd417 100644 --- a/session.go +++ b/session.go @@ -13,10 +13,11 @@ var SessionBlockedError = errors.New("Session Blocked") var globalSessionId uint64 type Session struct { - id uint64 - codec Codec - manager *Manager - sendChan chan interface{} + id uint64 + codec Codec + manager *Manager + sendChan chan interface{} + sendMutex sync.RWMutex closeFlag int32 closeChan chan int @@ -54,8 +55,18 @@ func (session *Session) IsClosed() bool { func (session *Session) Close() error { if atomic.CompareAndSwapInt32(&session.closeFlag, 0, 1) { - err := session.codec.Close() close(session.closeChan) + + if session.sendChan != nil { + session.sendMutex.Lock() + close(session.sendChan) + if clear, ok := session.codec.(ClearSendChan); ok { + clear.ClearSendChan(session.sendChan) + } + session.sendMutex.Unlock() + } + + err := session.codec.Close() if session.manager != nil { session.manager.delSession(session) } @@ -91,25 +102,34 @@ func (session *Session) sendLoop() { } } -func (session *Session) Send(msg interface{}) (err error) { +func (session *Session) Send(msg interface{}) error { if session.IsClosed() { return SessionClosedError } + if session.sendChan == nil { - return session.codec.Send(msg) + err := session.codec.Send(msg) + if err != nil { + session.Close() + } + return err } + + session.sendMutex.RLock() select { case session.sendChan <- msg: + session.sendMutex.RUnlock() return nil + case <-session.closeChan: + session.sendMutex.RUnlock() + return SessionClosedError default: + session.sendMutex.RUnlock() + session.Close() return SessionBlockedError } } -func (session *Session) SendChan() chan interface{} { - return session.sendChan -} - type closeCallback struct { Handler interface{} Func func()