-
Notifications
You must be signed in to change notification settings - Fork 1
/
rabbitmq.go
274 lines (246 loc) · 7.52 KB
/
rabbitmq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
/* RabbitMQ handling for gopenqa */
package gopenqa
import (
"encoding/json"
"fmt"
"strconv"
"strings"
amqp "github.com/rabbitmq/amqp091-go"
)
// JobStatus is the returns struct for job status updates from RabbitMQ
type JobStatus struct {
Type string // Type of the update. Currently "job.done" and "job.restarted" are set
Arch string `json:"ARCH"`
Build string `json:"BUILD"`
Flavor string `json:"FLAVOR"`
Machine string `json:"MACHINE"`
Test string `json:"TEST"`
BugRef string `json:"bugref"`
GroupID int `json:"group_id"`
ID int64 `json:"id"`
NewBuild string `json:"newbuild"`
Reason string `json:"reason"`
Remaining int `json:"remaining"`
Result interface{} `json:"result"`
}
// RabbitMQ comment
type CommentMQ struct {
ID int `json:"id"`
Created string `json:"created"`
Updates string `json:"updated"`
Text string `json:"text"`
User string `json:"user"`
}
// RabbitMQ struct is the object which handles the connection to a RabbitMQ instance
type RabbitMQ struct {
remote string
con *amqp.Connection
closed bool
}
// Callback when the connection was closed
type RabbitMQCloseCallback func(error)
// Close connection
func (mq *RabbitMQ) Close() {
mq.closed = true
if mq.con != nil {
mq.con.Close()
}
}
// Connected returns true if RabbitMQ is connected
func (mq *RabbitMQ) Connected() bool {
return !mq.closed && (mq.con != nil) && !mq.con.IsClosed()
}
// Connected returns true if RabbitMQ is closing or if it is closed.
func (mq *RabbitMQ) Closed() bool {
if mq.closed {
return true
}
if mq.con == nil || mq.con.IsClosed() {
mq.closed = true
return true
}
return false
}
// Reconnect to the RabbitMQ server. This will close any previous connections and channels
func (mq *RabbitMQ) Reconnect() error {
var err error
if mq.con != nil {
mq.con.Close()
}
mq.closed = false
mq.con, err = amqp.Dial(mq.remote)
return err
}
// NotifyClose registeres a defined callback function for when the RabbitMQ connection is closed
func (mq *RabbitMQ) NotifyClose(callback RabbitMQCloseCallback) {
go func() {
recvChannel := make(chan *amqp.Error, 1)
mq.con.NotifyClose(recvChannel)
for err := range recvChannel {
callback(fmt.Errorf(err.Error()))
}
}()
}
// RabbitMQSubscription handles a single subscription
type RabbitMQSubscription struct {
channel *amqp.Channel
key string
obs <-chan amqp.Delivery
mq *RabbitMQ
con *amqp.Connection // Keep a reference to the connection to check if it is still connected. This is necessary because mq can reconnect and therefore have another new mq.con instance
}
// Connected returns true if RabbitMQ is connected
func (sub *RabbitMQSubscription) Connected() bool {
return !sub.con.IsClosed()
}
// Receive receives a raw non-empty RabbitMQ messages
func (sub *RabbitMQSubscription) Receive() (amqp.Delivery, error) {
for msg, ok := <-sub.obs; ok; {
if len(msg.Body) > 0 {
return msg, nil
}
}
if sub.mq == nil || sub.mq.closed || sub.con == nil || sub.con.IsClosed() {
return amqp.Delivery{}, fmt.Errorf("EOF")
}
return amqp.Delivery{}, fmt.Errorf("channel unexpectedly closed")
}
// ReceiveJob receives the next message and try to parse it as job
func (sub *RabbitMQSubscription) ReceiveJob() (Job, error) {
var job Job
d, err := sub.Receive()
if err != nil {
return job, err
}
// Try to unmarshall to json
err = json.Unmarshal(d.Body, &job)
if err != nil {
return job, err
}
// Fix missing job state on job state listener
if strings.HasSuffix(d.RoutingKey, ".job.done") && job.State == "" {
job.State = "done"
}
return job, err
}
// ReceiveJobStatus receives the next message and try to parse it as JobStatus. Use this for job status updates
func (sub *RabbitMQSubscription) ReceiveJobStatus() (JobStatus, error) {
var status JobStatus
d, err := sub.Receive()
if err != nil {
return status, err
}
// Required due to poo#114529
type IJobStatus struct {
Type string // Type of the update. Currently "job.done" and "job.restarted" are set
Arch string `json:"ARCH"`
Build string `json:"BUILD"`
Flavor string `json:"FLAVOR"`
Machine string `json:"MACHINE"`
Test string `json:"TEST"`
BugRef string `json:"bugref"`
GroupID int `json:"group_id"`
ID interface{} `json:"id"`
NewBuild string `json:"newbuild"`
Reason string `json:"reason"`
Remaining int `json:"remaining"`
Result interface{} `json:"result"`
}
// Try to unmarshall to json
var istatus IJobStatus
err = json.Unmarshal(d.Body, &istatus)
if err != nil {
return status, err
}
status.Arch = istatus.Arch
status.Build = istatus.Build
status.Flavor = istatus.Flavor
status.Machine = istatus.Machine
status.Test = istatus.Test
status.BugRef = istatus.BugRef
status.GroupID = istatus.GroupID
status.NewBuild = istatus.NewBuild
status.Reason = istatus.Reason
status.Remaining = istatus.Remaining
status.Result = istatus.Result
// Due to poo#114529 we need to do a bit of magic with the ID
if unboxed, ok := istatus.ID.(string); ok {
status.ID, _ = strconv.ParseInt(unboxed, 10, 64) // ignore error
} else if unboxed, ok := istatus.ID.(int64); ok {
status.ID = unboxed
} else if unboxed, ok := istatus.ID.(int); ok {
status.ID = int64(unboxed)
} else if unboxed, ok := istatus.ID.(float64); ok {
// Values larger than int are sometimes parsed as float64
status.ID = int64(float64(unboxed))
} else {
return status, fmt.Errorf("invalid ID type")
}
// Determine type based on routing key
key := d.RoutingKey
if strings.HasSuffix(key, ".job.done") {
status.Type = "job.done"
} else if strings.HasSuffix(key, ".job.restart") {
status.Type = "job.restarted"
}
return status, nil
}
// ReceiveJobStatus receives the next message and try to parse it as Comment. Use this for job status updates
func (sub *RabbitMQSubscription) ReceiveComment() (CommentMQ, error) {
var comment CommentMQ
d, err := sub.Receive()
if err != nil {
return comment, err
}
// Try to unmarshall to json
err = json.Unmarshal(d.Body, &comment)
if err != nil {
return comment, err
}
return comment, err
}
// Close subscription channel
func (sub *RabbitMQSubscription) Close() {
sub.channel.Close()
}
// ConnectRabbitMQ connects to a RabbitMQ instance and returns the RabbitMQ object
func ConnectRabbitMQ(remote string) (RabbitMQ, error) {
var err error
rmq := RabbitMQ{remote: remote, closed: false}
rmq.con, err = amqp.Dial(remote)
if err != nil {
return rmq, err
}
return rmq, nil
}
// Subscribe to a given key and get the messages via the callback function.
// This method will return after establishing the channel and call the callback function when a new message arrives
// This message returns a RabbitMQSubscription object, which in turn can be used to receive the incoming messages
func (mq *RabbitMQ) Subscribe(key string) (RabbitMQSubscription, error) {
var sub RabbitMQSubscription
ch, err := mq.con.Channel()
if err != nil {
return sub, err
}
// Create message queue and receive channel
// Create a new exclusive queue with auto-delete
q, err := ch.QueueDeclare("", false, false, true, true, nil)
if err != nil {
ch.Close()
return sub, err
}
if err := ch.QueueBind(q.Name, key, "pubsub", false, nil); err != nil {
ch.Close()
return sub, err
}
obs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
if err != nil {
ch.Close()
return sub, err
}
sub.channel = ch
sub.key = key
sub.obs = obs
sub.con = mq.con
return sub, nil
}