
Description
I've noticed that DeferredConfirmation.Wait()
can hang indefinitely under some circumstances. Some experimentation has revealed that RabbitMQ is receiving the published message (and delivering it to any queues/consumers), but the publishing channel does not always get a confirmation.
I have not yet been able to isolate whether RabbitMQ is indeed not confirming the message, or whether I might have found an issue in this library that is only revealed under particular circumstances. I have a packet capture which would likely reveal this, but I am not knowledgeable enough to analyze it fruitfully.
This behaviour can be reproduced by creating multiple channels on a single connection, each in confirm mode, and proceeding to publish rapidly on each of them. I'm publishing using Channel.PublishWithDeferredConfirmWithContext()
, followed by Wait()
on the returned DeferredConfirmation
.
I'm aware that I can pass a context with a timeout to work around this, and indeed that's the only thing that reliably works. However, I then have to deal with the possibility of a message getting published multiple times (when I retry).
Things that have not worked:
- Creating a separate connection for publishers and consumers
- Using
Channel.NotifyPublish
in addition toDeferredConfirmation.Wait()
Things that do work:
- Limiting myself to a single channel for publishing
- Avoid using
DeferredConfirmation.Wait()
and only useChannel.NotifyPublish()
- Giving up the psychological need for confirmation and assume it works
I have a program I've used to demonstrate the issue. The default settings usually cause at least one indefinite hang on DeferredConfirmation.Wait()
, or at least it has done so using at least 2 servers I've tried (one Docker container, one installed via package manager).
Code:
package main
import (
"context"
"flag"
"log"
"os"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/rabbitmq/amqp091-go"
)
func quitOn(err error) {
if err != nil {
log.Printf("%T: %v", err, err)
os.Exit(1)
}
}
func main() {
var serverURL string
var messageCount int
var exchangeName string
var routingKey string
var publisherCount int
var bufferSize int
var useNotifyPublish bool
var skipWait bool
// Load Variables
flag.StringVar(&serverURL, "url", "amqp://guest:guest@localhost/", "AMQP Server URL")
flag.IntVar(&messageCount, "n", 10000, "Number of messages")
flag.StringVar(&exchangeName, "exchange", "amq.direct", "Exchange for publishing")
flag.StringVar(&routingKey, "routing-key", "foo.bar.baz", "Routing Key for publishing")
flag.IntVar(&publisherCount, "publishers", 10, "Number of publishers")
flag.IntVar(&bufferSize, "buffer-size", 100, "Internal pre-publisher chan buffer size")
flag.BoolVar(&useNotifyPublish, "notify-publish", false, "Use Channel.NotifyPublish()")
flag.BoolVar(&skipWait, "skip-wait", false, "Skip DeferredConfirmation.Wait()")
pHelp := flag.Bool("h", false, "Show Help")
flag.Parse()
if *pHelp {
flag.Usage()
os.Exit(0)
}
// Connect to the server (one for publish, one for consume)
log.Printf("Connecting to %s\n", serverURL)
connPub, err := amqp091.Dial(serverURL)
quitOn(err)
defer connPub.Close()
connCon, err := amqp091.Dial(serverURL)
quitOn(err)
defer connCon.Close()
// Chan for publishers to receive from
toSend := make(chan int, bufferSize)
// Create a WaitGroup for publishers/consumers to finish their work
wg := new(sync.WaitGroup)
// Declare and bind a Queue (default direct queue)
consumer, err := connCon.Channel()
quitOn(err)
queue, err := consumer.QueueDeclare("", false, true, true, false, nil)
quitOn(err)
err = consumer.QueueBind(queue.Name, routingKey, exchangeName, false, nil)
quitOn(err)
// Consume the Queue
wg.Add(1)
go func() {
defer wg.Done()
messages, err := consumer.Consume(queue.Name, "", true, true, false, false, nil)
quitOn(err)
deliveryCount := 0
for range messages {
deliveryCount++
if deliveryCount == messageCount {
break
}
}
log.Printf("Consumer: %d messages received\n", messageCount)
if err := consumer.Close(); err != nil {
log.Printf("Consumer: %s\n", err)
}
}()
// Keep track of total confirms on NotifyPublish
totalNotifyConfirms := new(atomic.Int32)
// Create all the publishers
for i := 0; i < publisherCount; i++ {
pubNumber := i + 1
pub, err := connPub.Channel()
quitOn(err)
err = pub.Confirm(false)
quitOn(err)
chConfirm := make(chan amqp091.Confirmation)
// It doesn't seem to matter whether we use this or not!
if useNotifyPublish {
pub.NotifyPublish(chConfirm)
wg.Add(1)
go func() {
defer wg.Done()
confirmCount := 0
for range chConfirm {
confirmCount += 1
}
log.Printf("Publisher %d got %d confirms (NotifyConfirm)\n", pubNumber, confirmCount)
totalNotifyConfirms.Add(int32(confirmCount))
}()
}
wg.Add(1)
go func() {
defer wg.Done()
okCount := 0
errCount := 0
for val := range toSend {
msg := amqp091.Publishing{
ContentType: "text/plain",
ContentEncoding: "ascii",
Body: []byte(strconv.Itoa(val)),
}
confirm, err := pub.PublishWithDeferredConfirmWithContext(
context.Background(),
exchangeName,
routingKey,
false,
false,
msg,
)
quitOn(err)
if !skipWait {
wasOK := confirm.Wait()
if !wasOK {
log.Printf("Value %d was not confirmed\n", val)
errCount += 1
} else {
okCount += 1
}
}
}
log.Printf("Publisher %d done (%d OK, %d err)\n", pubNumber, okCount, errCount)
time.Sleep(time.Second) // Wait for confirmations a bit.
if err := pub.Close(); err != nil {
log.Printf("Publisher %d: %s\n", pubNumber, err)
}
}()
}
log.Printf("Pushing %d messages to %d confirming publishers\n", messageCount, publisherCount)
// Push all the messages to the publishers
for i := 1; i <= messageCount; i++ {
toSend <- i
}
close(toSend)
log.Printf("Done sending, waiting for publishers/consumers")
wg.Wait()
if useNotifyPublish {
log.Printf("Confirmations using NotifyPublish(): %d\n", totalNotifyConfirms.Load())
}
}