-
Notifications
You must be signed in to change notification settings - Fork 5
/
messagequeue.go
205 lines (172 loc) · 4.88 KB
/
messagequeue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
/*
Copyright 2013-Present Couchbase, Inc.
Use of this software is governed by the Business Source License included in
the file licenses/BSL-Couchbase.txt. As of the Change Date specified in that
file, in accordance with the Business Source License, use of this software will
be governed by the Apache License, Version 2.0, included in the file
licenses/APL2.txt.
*/
package blip
import (
"sync"
)
const kInitialQueueCapacity = 10
// A queue of outgoing messages. Used by Sender to schedule which frames to send.
type messageQueue struct {
logContext LogContext
maxCount int
queue []*Message
numRequestsSent MessageNumber
cond *sync.Cond
}
func newMessageQueue(logContext LogContext, maxCount int) *messageQueue {
return &messageQueue{
logContext: logContext,
queue: make([]*Message, 0, kInitialQueueCapacity),
cond: sync.NewCond(&sync.Mutex{}),
maxCount: maxCount,
}
}
func (q *messageQueue) _push(msg *Message, new bool) bool { // requires lock
if !msg.Outgoing {
panic("Not an outgoing message")
}
if q.isStopped() {
return false
}
q.logContext.logFrame("Push %v", msg)
index := 0
n := len(q.queue)
if msg.Urgent() && n > 1 {
// High-priority gets queued after the last existing high-priority message,
// leaving one regular-priority message in between if possible.
for index = n - 1; index >= 0; index-- {
if q.queue[index].Urgent() {
index += 2
break
} else if new && q.queue[index].encoder == nil {
// But have to keep message starts in order
index += 1
break
}
}
if index <= 0 {
index = 1
} else if index > n {
index = n
}
} else {
// Regular priority goes at the end of the queue:
index = n
}
// Insert msg at index:
q.queue = append(q.queue, nil)
copy(q.queue[index+1:n+1], q.queue[index:n])
q.queue[index] = msg
if len(q.queue) == 1 {
q.cond.Signal() // It's non-empty now, so unblock a waiting pop()
}
return true
}
// Push an item into the queue
func (q *messageQueue) push(msg *Message) bool {
return q.pushWithCallback(msg, nil)
}
// Push an item into the queue, also providing a callback function that will be invoked
// after the number is assigned to the message, but before pushing into the queue.
func (q *messageQueue) pushWithCallback(msg *Message, prepushCallback MessageCallback) bool {
q.cond.L.Lock()
defer q.cond.L.Unlock()
// Exit early if we know the queue has already been stopped
if q.isStopped() {
return false
}
isNew := msg.number == 0
if isNew {
// When adding a new message, block till the queue is under its maxCount:
for q.maxCount > 0 && len(q.queue) >= q.maxCount && q.queue != nil {
q.cond.Wait()
}
if msg.Type() != RequestType {
panic("Response has no number")
}
q.numRequestsSent++
msg.number = q.numRequestsSent
q.logContext.logMessage("Queued %s", msg)
}
if prepushCallback != nil {
prepushCallback(msg)
}
return q._push(msg, isNew)
}
func (q *messageQueue) _maybePop(actuallyPop bool) *Message {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.queue) == 0 && q.queue != nil {
q.cond.Wait()
}
if q.queue == nil {
return nil
}
msg := q.queue[0]
if actuallyPop {
q.queue = q.queue[1:]
if len(q.queue) == q.maxCount-1 {
q.cond.Signal()
}
}
return msg
}
func (q *messageQueue) first() *Message { return q._maybePop(false) }
func (q *messageQueue) pop() *Message { return q._maybePop(true) }
func (q *messageQueue) find(msgNo MessageNumber, msgType MessageType) *Message {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for _, message := range q.queue {
if message.number == msgNo && message.Type() == msgType {
return message
}
}
return nil
}
// Stops the sender's goroutine.
func (q *messageQueue) stop() {
q.cond.L.Lock()
defer q.cond.L.Unlock()
// Iterate over messages and call close on every message's readcloser, since it's possible that
// a goroutine may be blocked on the reader, thus causing a resource leak. Added during SG #3268
// diagnosis, but does not fix any reproducible issues.
for _, message := range q.queue {
err := message.Close()
if err != nil {
q.logContext.logMessage("Warning: messageQueue encountered error closing message while stopping. Error: %v", err)
}
}
q.queue = nil
q.cond.Broadcast()
}
func (q *messageQueue) isStopped() bool {
return q.queue == nil
}
func (q *messageQueue) nextMessageIsUrgent() bool {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return len(q.queue) > 0 && q.queue[0].Urgent()
}
func (q *messageQueue) length() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
return len(q.queue)
}
// Returns statistics about the number of incoming and outgoing messages queued.
func (q *messageQueue) backlog() (outgoingRequests, outgoingResponses int) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for _, message := range q.queue {
if message.Type() == RequestType {
outgoingRequests++
}
}
outgoingResponses = len(q.queue) - outgoingRequests
return
}