@@ -31,7 +31,9 @@ type Conn struct {
3131 releaseOnClose func ()
3232 releaseOnMessage func ()
3333
34- readch chan wsjs.MessageEvent
34+ readSignal chan struct {}
35+ readBufMu sync.Mutex
36+ readBuf []wsjs.MessageEvent
3537}
3638
3739func (c * Conn ) close (err error ) {
@@ -45,7 +47,7 @@ func (c *Conn) close(err error) {
4547
4648func (c * Conn ) init () {
4749 c .closed = make (chan struct {})
48- c .readch = make (chan wsjs. MessageEvent , 1 )
50+ c .readSignal = make (chan struct {} , 1 )
4951 c .msgReadLimit = 32768
5052
5153 c .releaseOnClose = c .ws .OnClose (func (e wsjs.CloseEvent ) {
@@ -61,7 +63,16 @@ func (c *Conn) init() {
6163 })
6264
6365 c .releaseOnMessage = c .ws .OnMessage (func (e wsjs.MessageEvent ) {
64- c .readch <- e
66+ c .readBufMu .Lock ()
67+ defer c .readBufMu .Unlock ()
68+
69+ c .readBuf = append (c .readBuf , e )
70+
71+ // Lets the read goroutine know there is definitely something in readBuf.
72+ select {
73+ case c .readSignal <- struct {}{}:
74+ default :
75+ }
6576 })
6677
6778 runtime .SetFinalizer (c , func (c * Conn ) {
@@ -89,16 +100,29 @@ func (c *Conn) Read(ctx context.Context) (MessageType, []byte, error) {
89100}
90101
91102func (c * Conn ) read (ctx context.Context ) (MessageType , []byte , error ) {
92- var me wsjs.MessageEvent
93103 select {
94104 case <- ctx .Done ():
95105 c .Close (StatusPolicyViolation , "read timed out" )
96106 return 0 , nil , ctx .Err ()
97- case me = <- c .readch :
107+ case <- c .readSignal :
98108 case <- c .closed :
99109 return 0 , nil , c .closeErr
100110 }
101111
112+ c .readBufMu .Lock ()
113+ defer c .readBufMu .Unlock ()
114+
115+ me := c .readBuf [0 ]
116+ c .readBuf = c .readBuf [1 :]
117+
118+ if len (c .readBuf ) > 0 {
119+ // Next time we read, we'll grab the message.
120+ select {
121+ case c .readSignal <- struct {}{}:
122+ default :
123+ }
124+ }
125+
102126 switch p := me .Data .(type ) {
103127 case string :
104128 return MessageText , []byte (p ), nil
0 commit comments