-
Notifications
You must be signed in to change notification settings - Fork 33
/
Copy pathconfig.go
606 lines (520 loc) · 20.4 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
package amqp
import (
"crypto/tls"
"time"
multierror "github.com/hashicorp/go-multierror"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/cenkalti/backoff/v3"
"github.com/pkg/errors"
)
// NewDurablePubSubConfig creates config for durable PubSub.
// generateQueueName is optional, when passing to the publisher.
// Exchange name is set to the topic name and routing key is empty.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-three-go.html
// with durable added for exchange, queue and amqp.Persistent DeliveryMode.
// Thanks to this, we don't lose messages on broker restart.
func NewDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenerator) Config {
return Config{
Connection: ConnectionConfig{
AmqpURI: amqpURI,
},
Marshaler: DefaultMarshaler{},
Exchange: ExchangeConfig{
GenerateName: GenerateExchangeNameTopicName,
Type: "fanout",
Durable: true,
},
Queue: QueueConfig{
GenerateName: generateQueueName,
Durable: true,
},
QueueBind: QueueBindConfig{
GenerateRoutingKey: func(topic string) string {
return ""
},
},
Publish: PublishConfig{
GenerateRoutingKey: func(topic string) string {
return ""
},
},
Consume: ConsumeConfig{
Qos: QosConfig{
PrefetchCount: 1,
},
},
TopologyBuilder: &DefaultTopologyBuilder{},
}
}
// NewNonDurablePubSubConfig creates config for non-durable PubSub.
// generateQueueName is optional, when passing to the publisher.
// Exchange name is set to the topic name and the routing key is empty.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how the topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-three-go.html.
// This config is not durable, so on the restart of the broker all messages will be lost.
func NewNonDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenerator) Config {
return Config{
Connection: ConnectionConfig{
AmqpURI: amqpURI,
},
Marshaler: DefaultMarshaler{NotPersistentDeliveryMode: true},
Exchange: ExchangeConfig{
GenerateName: GenerateExchangeNameTopicName,
Type: "fanout",
},
Queue: QueueConfig{
GenerateName: generateQueueName,
},
QueueBind: QueueBindConfig{
GenerateRoutingKey: func(topic string) string {
return ""
},
},
Publish: PublishConfig{
GenerateRoutingKey: func(topic string) string {
return ""
},
},
Consume: ConsumeConfig{
Qos: QosConfig{
PrefetchCount: 1,
},
},
TopologyBuilder: &DefaultTopologyBuilder{},
}
}
// NewDurableQueueConfig creates config for durable Queue.
// Queue name and routing key is set to the topic name by default. Default ("") exchange is used.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html
// with durable added for exchange, queue and amqp.Persistent DeliveryMode.
// Thanks to this, we don't lose messages on broker restart.
func NewDurableQueueConfig(amqpURI string) Config {
return Config{
Connection: ConnectionConfig{
AmqpURI: amqpURI,
},
Marshaler: DefaultMarshaler{},
Exchange: ExchangeConfig{
GenerateName: GenerateExchangeNameConstant(""),
},
Queue: QueueConfig{
GenerateName: GenerateQueueNameTopicName,
Durable: true,
},
QueueBind: QueueBindConfig{
GenerateRoutingKey: func(topic string) string {
return ""
},
},
Publish: PublishConfig{
GenerateRoutingKey: func(topic string) string {
return topic
},
},
Consume: ConsumeConfig{
Qos: QosConfig{
PrefetchCount: 1,
},
},
TopologyBuilder: &DefaultTopologyBuilder{},
}
}
// NewNonDurableQueueConfig creates config for non-durable Queue.
// Queue name and routing key is set to the topic name by default. Default ("") exchange is used.
//
// IMPORTANT: Watermill's topic is not mapped directly to the AMQP's topic exchange type.
// It is used to generate exchange name, routing key and queue name, depending on the context.
// To check how the topic is mapped, please check Exchange.GenerateName, Queue.GenerateName and Publish.GenerateRoutingKey.
//
// This config is based on this example: https://www.rabbitmq.com/tutorials/tutorial-two-go.html.
// This config is not durable, so on the restart of the broker all messages will be lost.
func NewNonDurableQueueConfig(amqpURI string) Config {
return Config{
Connection: ConnectionConfig{
AmqpURI: amqpURI,
},
Marshaler: DefaultMarshaler{NotPersistentDeliveryMode: true},
Exchange: ExchangeConfig{
GenerateName: GenerateExchangeNameConstant(""),
},
Queue: QueueConfig{
GenerateName: GenerateQueueNameTopicName,
},
QueueBind: QueueBindConfig{
GenerateRoutingKey: func(topic string) string {
return ""
},
},
Publish: PublishConfig{
GenerateRoutingKey: func(topic string) string {
return topic
},
},
Consume: ConsumeConfig{
Qos: QosConfig{
PrefetchCount: 1,
},
},
TopologyBuilder: &DefaultTopologyBuilder{},
}
}
// NewDurableTopicConfig creates config for topic exchange for durable Queue.
// Queue name and Exchange are set to the parameters.
func NewDurableTopicConfig(amqpURI string, exchange string, queue string) Config {
return Config{
Connection: ConnectionConfig{
AmqpURI: amqpURI,
},
Marshaler: DefaultMarshaler{},
Exchange: ExchangeConfig{
GenerateName: GenerateExchangeNameConstant(exchange),
Type: "topic",
},
Queue: QueueConfig{
GenerateName: GenerateQueueNameConstant(queue),
Durable: true,
},
QueueBind: QueueBindConfig{
GenerateRoutingKey: func(topic string) string {
return topic
},
},
Publish: PublishConfig{
GenerateRoutingKey: func(topic string) string {
return topic
},
},
Consume: ConsumeConfig{
Qos: QosConfig{
PrefetchCount: 1,
},
},
TopologyBuilder: &DefaultTopologyBuilder{},
}
}
// NewNonDurableTopicConfig creates config for topic exchange for non-durable Queue.
// Queue name and Exchange are set to the parameters.
func NewNonDurableTopicConfig(amqpURI string, exchange string, queue string) Config {
return Config{
Connection: ConnectionConfig{
AmqpURI: amqpURI,
},
Marshaler: DefaultMarshaler{NotPersistentDeliveryMode: true},
Exchange: ExchangeConfig{
GenerateName: GenerateExchangeNameConstant(exchange),
Type: "topic",
},
Queue: QueueConfig{
GenerateName: GenerateQueueNameConstant(queue),
},
QueueBind: QueueBindConfig{
GenerateRoutingKey: func(topic string) string {
return topic
},
},
Publish: PublishConfig{
GenerateRoutingKey: func(topic string) string {
return topic
},
},
Consume: ConsumeConfig{
Qos: QosConfig{
PrefetchCount: 1,
},
},
TopologyBuilder: &DefaultTopologyBuilder{},
}
}
type Config struct {
Connection ConnectionConfig
Marshaler Marshaler
Exchange ExchangeConfig
Queue QueueConfig
QueueBind QueueBindConfig
Publish PublishConfig
Consume ConsumeConfig
TopologyBuilder TopologyBuilder
}
func (c Config) validate(validateConnection bool) error {
var err error
if validateConnection {
if c.Connection.AmqpURI == "" {
err = multierror.Append(err, errors.New("empty Config.AmqpURI"))
}
}
if c.Marshaler == nil {
err = multierror.Append(err, errors.New("missing Config.Marshaler"))
}
if c.Exchange.GenerateName == nil {
err = multierror.Append(err, errors.New("missing Config.GenerateName"))
}
return err
}
func (c Config) validatePublisher(validateConnection bool) error {
err := c.validate(validateConnection)
if c.Publish.GenerateRoutingKey == nil {
err = multierror.Append(err, errors.New("missing Config.GenerateRoutingKey"))
}
return err
}
func (c Config) validateSubscriber(validateConnection bool) error {
err := c.validate(validateConnection)
if c.Queue.GenerateName == nil {
err = multierror.Append(err, errors.New("missing Config.Queue.GenerateName"))
}
return err
}
func (c Config) ValidatePublisher() error {
return c.validatePublisher(true)
}
func (c Config) ValidatePublisherWithConnection() error {
return c.validatePublisher(false)
}
func (c Config) ValidateSubscriber() error {
return c.validateSubscriber(true)
}
func (c Config) ValidateSubscriberWithConnection() error {
return c.validateSubscriber(false)
}
type ConnectionConfig struct {
AmqpURI string
TLSConfig *tls.Config
AmqpConfig *amqp.Config
Reconnect *ReconnectConfig
}
// QueueNameGenerator generates QueueName based on the topic.
type ExchangeNameGenerator func(topic string) string
// GenerateExchangeNameTopicName generates exchangeName equal to the topic.
func GenerateExchangeNameTopicName(topic string) string {
return topic
}
// GenerateExchangeNameConstant generates exchangeName equal to exchangeName.
func GenerateExchangeNameConstant(exchangeName string) ExchangeNameGenerator {
return func(topic string) string {
return exchangeName
}
}
// Config descriptions are based on descriptions from: https://github.com/streadway/amqp
// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
// BSD 2-Clause "Simplified" License
type ExchangeConfig struct {
// GenerateName is generated based on the topic provided for Publish or Subscribe method.
//
// Exchange names starting with "amq." are reserved for pre-declared and
// standardized exchanges. The client MAY declare an exchange starting with
// "amq." if the passive option is set, or the exchange already exists. Names can
// consist of a non-empty sequence of letters, digits, hyphen, underscore,
// period, or colon.
GenerateName ExchangeNameGenerator
// Each exchange belongs to one of a set of exchange kinds/types implemented by
// the server. The exchange types define the functionality of the exchange - i.e.
// how messages are routed through it. Once an exchange is declared, its type
// cannot be changed. The common types are "direct", "fanout", "topic" and
// "headers".
Type string
// Durable and Non-Auto-Deleted exchanges will survive server restarts and remain
// declared when there are no remaining bindings. This is the best lifetime for
// long-lived exchange configurations like stable routes and default exchanges.
Durable bool
// Non-Durable and Auto-Deleted exchanges will be deleted when there are no
// remaining bindings and not restored on server restart. This lifetime is
// useful for temporary topologies that should not pollute the virtual host on
// failure or after the consumers have completed.
//
// Non-Durable and Non-Auto-deleted exchanges will remain as long as the server is
// running including when there are no remaining bindings. This is useful for
// temporary topologies that may have long delays between bindings.
//
AutoDeleted bool
// Exchanges declared as `internal` do not accept accept publishings. Internal
// exchanges are useful when you wish to implement inter-exchange topologies
// that should not be exposed to users of the broker.
Internal bool
// When noWait is true, declare without waiting for a confirmation from the server.
// The channel may be closed as a result of an error. Add a NotifyClose listener-
// to respond to any exceptions.
NoWait bool
// Optional amqp.Table of arguments that are specific to the server's implementation of
// the exchange can be sent for exchange types that require extra parameters.
Arguments amqp.Table
}
// QueueNameGenerator generates QueueName based on the topic.
type QueueNameGenerator func(topic string) string
// GenerateQueueNameTopicName generates queueName equal to the topic.
func GenerateQueueNameTopicName(topic string) string {
return topic
}
// GenerateQueueNameConstant generate queue name equal to queueName.
func GenerateQueueNameConstant(queueName string) QueueNameGenerator {
return func(topic string) string {
return queueName
}
}
// GenerateQueueNameTopicNameWithSuffix generates queue name equal to:
//
// topic + "_" + suffix
func GenerateQueueNameTopicNameWithSuffix(suffix string) QueueNameGenerator {
return func(topic string) string {
return topic + "_" + suffix
}
}
type QueueConfig struct {
// GenerateName generates the queue name based on the topic provided for Publish or Subscribe method.
GenerateName QueueNameGenerator
// Durable and Non-Auto-Deleted queues will survive server restarts and remain
// when there are no remaining consumers or bindings. Persistent publishings will
// be restored in this queue on server restart. These queues are only able to be
// bound to durable exchanges.
Durable bool
// Non-Durable and Auto-Deleted exchanges will be deleted when there are no
// remaining bindings and not restored on server restart. This lifetime is
// useful for temporary topologies that should not pollute the virtual host on
// failure or after the consumers have completed.
//
// Non-Durable and Non-Auto-deleted exchanges will remain as long as the server is
// running including when there are no remaining bindings. This is useful for
// temporary topologies that may have long delays between bindings.
AutoDelete bool
// Exclusive queues are only accessible by the connection that declares them and
// will be deleted when the connection closes. Channels on other connections
// will receive an error when attempting to declare, bind, consume, purge or
// delete a queue with the same name.
Exclusive bool
// When NoWait is true, the queue is assumed to be declared on the server.
// A channel exception will arrive if the conditions are met for existing queues
// or attempting to modify an existing queue from a different connection.
NoWait bool
// Optional amqp.Table of arguments that are specific to the server's implementation of
// the queue can be sent for queue types that require extra parameters.
Arguments amqp.Table
}
// QueueBind binds an exchange to a queue so that publishings to the exchange will
// be routed to the queue when the publishing routing key matches the binding
// routing key.
type QueueBindConfig struct {
// GenerateRoutingKey generates the routing key based on the topic provided Subscribe.
GenerateRoutingKey func(topic string) string
// When noWait is false and the queue could not be bound, the channel will be
// closed with an error.
NoWait bool
// Optional amqpe.Table of arguments that are specific to the server's implementation of
// the queue bind can be sent for queue bind types that require extra parameters.
Arguments amqp.Table
}
type PublishConfig struct {
// GenerateRoutingKey generates the routing key based on the topic provided for Publish.
GenerateRoutingKey func(topic string) string
// Publishings can be undeliverable when the mandatory flag is true and no queue is
// bound that matches the routing key, or when the immediate flag is true and no
// consumer on the matched queue is ready to accept the delivery.
Mandatory bool
// Publishings can be undeliverable when the mandatory flag is true and no queue is
// bound that matches the routing key, or when the immediate flag is true and no
// consumer on the matched queue is ready to accept the delivery.
Immediate bool
// With transactional enabled, all messages wil be added in transaction.
Transactional bool
// ChannelPoolSize specifies the size of a channel pool. All channels in the pool are opened when the publisher is
// created. When a Publish operation is performed then a channel is taken from the pool to perform the operation and
// then returned to the pool once the operation has finished. If all channels are in use then the Publish operation
// waits until a channel is returned to the pool.
// If this value is set to 0 (default) then channels are not pooled and a new channel is opened/closed for every
// Publish operation.
ChannelPoolSize int
// ConfirmDelivery indicates whether the Publish function should wait until a confirmation is received from
// the AMQP server in order to guarantee that the message is delivered. Setting this value to true may
// negatively impact performance but will increase reliability.
ConfirmDelivery bool
}
type ConsumeConfig struct {
// When true, message will be not requeued when nacked.
NoRequeueOnNack bool
// The consumer is identified by a string that is unique and scoped for all
// consumers on this channel. If you wish to eventually cancel the consumer, use
// the same non-empty identifier in Channel.Cancel. An empty string will cause
// the library to generate a unique identity. The consumer identity will be
// included in every Delivery in the ConsumerTag field
Consumer string
// When exclusive is true, the server will ensure that this is the sole consumer
// from this queue. When exclusive is false, the server will fairly distribute
// deliveries across multiple consumers.
Exclusive bool
// The noLocal flag is not supported by RabbitMQ.
NoLocal bool
// When noWait is true, do not wait for the server to confirm the request and
// immediately begin deliveries. If it is not possible to consume, a channel
// exception will be raised and the channel will be closed.
NoWait bool
Qos QosConfig
// Optional arguments can be provided that have specific semantics for the queue
// or server.
Arguments amqp.Table
}
// QosConfig controls how many messages or how many bytes the server will try to keep on
// the network for consumers before receiving delivery acks. The intent of Qos is
// to make sure the network buffers stay full between the server and client.
type QosConfig struct {
// With a prefetch count greater than zero, the server will deliver that many
// messages to consumers before acknowledgments are received. The server ignores
// this option when consumers are started with noAck because no acknowledgments
// are expected or sent.
//
// In order to defeat that we can set the prefetch count with the value of 1.
// This tells RabbitMQ not to give more than one message to a worker at a time.
// Or, in other words, don't dispatch a new message to a worker until it has
// processed and acknowledged the previous one.
// Instead, it will dispatch it to the next worker that is not still busy.
PrefetchCount int
// With a prefetch size greater than zero, the server will try to keep at least
// that many bytes of deliveries flushed to the network before receiving
// acknowledgments from the consumers. This option is ignored when consumers are
// started with noAck.
PrefetchSize int
// When global is true, these Qos settings apply to all existing and future
// consumers on all channels on the same connection. When false, the Channel.Qos
// settings will apply to all existing and future consumers on this channel.
//
// Please see the RabbitMQ Consumer Prefetch documentation for an explanation of
// how the global flag is implemented in RabbitMQ, as it differs from the
// AMQP 0.9.1 specification in that global Qos settings are limited in scope to
// channels, not connections (https://www.rabbitmq.com/consumer-prefetch.html).
Global bool
}
type ReconnectConfig struct {
BackoffInitialInterval time.Duration
BackoffRandomizationFactor float64
BackoffMultiplier float64
BackoffMaxInterval time.Duration
}
func DefaultReconnectConfig() *ReconnectConfig {
return &ReconnectConfig{
BackoffInitialInterval: 500 * time.Millisecond,
BackoffRandomizationFactor: 0.5,
BackoffMultiplier: 1.5,
BackoffMaxInterval: 60 * time.Second,
}
}
func (r ReconnectConfig) backoffConfig() *backoff.ExponentialBackOff {
return &backoff.ExponentialBackOff{
InitialInterval: r.BackoffInitialInterval,
RandomizationFactor: r.BackoffRandomizationFactor,
Multiplier: r.BackoffMultiplier,
MaxInterval: r.BackoffMaxInterval,
MaxElapsedTime: 0, // no support for disabling reconnect, only close of Pub/Sub can stop reconnecting
Clock: backoff.SystemClock,
}
}