-
Notifications
You must be signed in to change notification settings - Fork 0
/
channel.go
149 lines (124 loc) · 2.6 KB
/
channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package wsrpc
import (
"errors"
"runtime"
"sync"
)
var (
// ErrChanClosed is returned when writing to or reading from a closed channel
ErrChanClosed = errors.New("channel closed")
)
// InfChannel is used to pass data of unspecified types inside the wsrpc server.
// It is not meant to be made available inside a request handler like it's sibling ResponseChannel is
type InfChannel struct {
ch chan interface{}
once sync.Once
closed chan struct{}
}
// NewInfChannel returns a new InfChannel
func NewInfChannel() *InfChannel {
return &InfChannel{
ch: make(chan interface{}),
closed: make(chan struct{}),
}
}
// Close closes the channel.
func (c *InfChannel) Close() {
c.once.Do(func() {
close(c.closed)
close(c.ch)
})
}
func (c *InfChannel) write(msg interface{}) (err error) {
defer func() {
if recover() != nil {
err = ErrChanClosed
}
}()
select {
case <-c.closed:
return ErrChanClosed
default:
}
select {
case <-c.closed:
return ErrChanClosed
case c.ch <- msg:
}
return nil
}
func (c *InfChannel) read() (interface{}, error) {
msg, ok := <-c.ch
if !ok {
return nil, ErrChanClosed
}
return msg, nil
}
func (c *InfChannel) clear() {
c.Close()
for {
runtime.Gosched()
_, err := c.read()
if err != nil {
break
}
}
}
// ResponseChannel is passed to stream handlers to allow the implementer an easy way of sending responses back to the requester.
type ResponseChannel struct {
ch chan *Response
once sync.Once
mutex sync.Mutex
closed chan struct{}
}
// NewResponseChannel returns a new ResponseChannel.
func NewResponseChannel(size int) *ResponseChannel {
return &ResponseChannel{
ch: make(chan *Response, size),
closed: make(chan struct{}),
}
}
// Close closes the ResponseChannel.
func (c *ResponseChannel) Close() {
c.once.Do(func() {
c.mutex.Lock()
close(c.closed)
close(c.ch)
c.mutex.Unlock()
})
}
// Closed checks wether or not a ResponseChannel is closed.
func (c *ResponseChannel) Closed() bool {
select {
case <-c.closed:
return true
default:
}
return false
}
// Write sends a message through the ResponseChannel.
// Stream handlers are configured to pass this message back to the requester if possible.
func (c *ResponseChannel) Write(msg *Response) (err error) {
defer func() {
if recover() != nil {
err = ErrChanClosed
}
}()
for !c.Closed() {
select {
case <-c.closed:
return ErrChanClosed
case c.ch <- msg:
return nil
default:
}
}
return ErrChanClosed
}
func (c *ResponseChannel) read() (*Response, error) {
msg, ok := <-c.ch
if !ok {
return nil, ErrChanClosed
}
return msg, nil
}