-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreschedule.go
118 lines (95 loc) · 2.98 KB
/
reschedule.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package rabbitmq_x_delay
import (
"errors"
"github.com/google/uuid"
"github.com/streadway/amqp"
"math"
"time"
)
const RetryLimit = 12
type DelayFunc func(retryCounter int32) (delayMillis int64)
type RescheduleOptions struct {
Channel *amqp.Channel
ExchangeName string
RoutingKey string
}
/*
Increments delays exponentially
*/
func DefaultDelayFunc(retryCounter int32) int64 {
return int64(math.Pow(2, float64(retryCounter)) * 1000)
}
/*
Auxiliary method for rescheduling using the DefaultDelayFunc
*/
func RescheduleMessage(delivery amqp.Delivery, opts *RescheduleOptions) error {
return RescheduleMessageWithDelayFunc(DefaultDelayFunc, delivery, opts)
}
/*
Behaves the same as RescheduleMessageDeclaringDelay, working with a DelayFunc
*/
func RescheduleMessageWithDelayFunc(delayFunc DelayFunc, delivery amqp.Delivery, opts *RescheduleOptions) error {
var retryCounter int32
if mapVal, ok := delivery.Headers["retries"]; ok {
retryCounter = mapVal.(int32)
}
return RescheduleMessageDeclaringDelay(delayFunc(retryCounter), delivery, opts)
}
/*
Copies a message body and headers, updating the headers used by the library: "x-delay", "retries" and "message-uuid".
Once setup, the message is encapsulated in an amqp.Publishing and resent to the exchange.
A message will be available again in the queue after the given delayMillis
Delivery represents the message that should be rescheduled
Channels must be active for rescheduling
The exchangeName must follow the one given on the CreateDefaultDelayExchange method
The routingKey is present inside the amqp.Delivery structure, still it does not always seem to match. This will also
allow custom routing
*/
func RescheduleMessageDeclaringDelay(delayMillis int64, delivery amqp.Delivery, opts *RescheduleOptions) error {
if opts == nil {
return errors.New("reschedule options are invalid")
}
var retries int32
if delivery.Headers != nil {
if mapVal, ok := delivery.Headers["retries"]; ok {
retries = mapVal.(int32)
retries = retries + 1
}
}
if retries > RetryLimit {
return errors.New("retry limit")
}
headers := setupHeaders(delayMillis, delivery.Headers, retries)
publishing := amqp.Publishing{
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
ContentType: delivery.ContentType,
Body: delivery.Body,
Headers: headers,
}
if opts.Channel == nil {
return errors.New("RabbitMQ not available")
}
// Both flags bellow set as false helps guaranteeing the message will be delivered
return opts.Channel.Publish(
opts.ExchangeName,
opts.RoutingKey,
false,
false,
publishing,
)
}
func setupHeaders(delayMillis int64, deliveryHeaders amqp.Table, retries int32) amqp.Table {
var headers amqp.Table
if deliveryHeaders != nil && len(deliveryHeaders) != 0 {
headers = deliveryHeaders
} else {
headers = make(amqp.Table)
}
headers["x-delay"] = delayMillis
headers["retries"] = retries
if _, ok := headers["message-uuid"]; !ok {
headers["message-uuid"] = uuid.New().String()
}
return headers
}