Skip to content

DeferredConfirmation.Wait() can hang indefinitely #182

Closed
@ghost

Description

I've noticed that DeferredConfirmation.Wait() can hang indefinitely under some circumstances. Some experimentation has revealed that RabbitMQ is receiving the published message (and delivering it to any queues/consumers), but the publishing channel does not always get a confirmation.

I have not yet been able to isolate whether RabbitMQ is indeed not confirming the message, or whether I might have found an issue in this library that is only revealed under particular circumstances. I have a packet capture which would likely reveal this, but I am not knowledgeable enough to analyze it fruitfully.

This behaviour can be reproduced by creating multiple channels on a single connection, each in confirm mode, and proceeding to publish rapidly on each of them. I'm publishing using Channel.PublishWithDeferredConfirmWithContext(), followed by Wait() on the returned DeferredConfirmation.

I'm aware that I can pass a context with a timeout to work around this, and indeed that's the only thing that reliably works. However, I then have to deal with the possibility of a message getting published multiple times (when I retry).

Things that have not worked:

  • Creating a separate connection for publishers and consumers
  • Using Channel.NotifyPublish in addition to DeferredConfirmation.Wait()

Things that do work:

  • Limiting myself to a single channel for publishing
  • Avoid using DeferredConfirmation.Wait() and only use Channel.NotifyPublish()
  • Giving up the psychological need for confirmation and assume it works

I have a program I've used to demonstrate the issue. The default settings usually cause at least one indefinite hang on DeferredConfirmation.Wait(), or at least it has done so using at least 2 servers I've tried (one Docker container, one installed via package manager).

Code:
package main

import (
	"context"
	"flag"
	"log"
	"os"
	"strconv"
	"sync"
	"sync/atomic"
	"time"

	"github.com/rabbitmq/amqp091-go"
)

func quitOn(err error) {
	if err != nil {
		log.Printf("%T: %v", err, err)
		os.Exit(1)
	}
}

func main() {
	var serverURL string
	var messageCount int
	var exchangeName string
	var routingKey string
	var publisherCount int
	var bufferSize int
	var useNotifyPublish bool
	var skipWait bool

	// Load Variables
	flag.StringVar(&serverURL, "url", "amqp://guest:guest@localhost/", "AMQP Server URL")
	flag.IntVar(&messageCount, "n", 10000, "Number of messages")
	flag.StringVar(&exchangeName, "exchange", "amq.direct", "Exchange for publishing")
	flag.StringVar(&routingKey, "routing-key", "foo.bar.baz", "Routing Key for publishing")
	flag.IntVar(&publisherCount, "publishers", 10, "Number of publishers")
	flag.IntVar(&bufferSize, "buffer-size", 100, "Internal pre-publisher chan buffer size")
	flag.BoolVar(&useNotifyPublish, "notify-publish", false, "Use Channel.NotifyPublish()")
	flag.BoolVar(&skipWait, "skip-wait", false, "Skip DeferredConfirmation.Wait()")
	pHelp := flag.Bool("h", false, "Show Help")
	flag.Parse()
	if *pHelp {
		flag.Usage()
		os.Exit(0)
	}

	// Connect to the server (one for publish, one for consume)
	log.Printf("Connecting to %s\n", serverURL)
	connPub, err := amqp091.Dial(serverURL)
	quitOn(err)
	defer connPub.Close()
	connCon, err := amqp091.Dial(serverURL)
	quitOn(err)
	defer connCon.Close()

	// Chan for publishers to receive from
	toSend := make(chan int, bufferSize)

	// Create a WaitGroup for publishers/consumers to finish their work
	wg := new(sync.WaitGroup)

	// Declare and bind a Queue (default direct queue)
	consumer, err := connCon.Channel()
	quitOn(err)
	queue, err := consumer.QueueDeclare("", false, true, true, false, nil)
	quitOn(err)
	err = consumer.QueueBind(queue.Name, routingKey, exchangeName, false, nil)
	quitOn(err)

	// Consume the Queue
	wg.Add(1)
	go func() {
		defer wg.Done()
		messages, err := consumer.Consume(queue.Name, "", true, true, false, false, nil)
		quitOn(err)
		deliveryCount := 0
		for range messages {
			deliveryCount++
			if deliveryCount == messageCount {
				break
			}
		}
		log.Printf("Consumer: %d messages received\n", messageCount)
		if err := consumer.Close(); err != nil {
			log.Printf("Consumer: %s\n", err)
		}
	}()

	// Keep track of total confirms on NotifyPublish
	totalNotifyConfirms := new(atomic.Int32)

	// Create all the publishers
	for i := 0; i < publisherCount; i++ {
		pubNumber := i + 1
		pub, err := connPub.Channel()
		quitOn(err)
		err = pub.Confirm(false)
		quitOn(err)
		chConfirm := make(chan amqp091.Confirmation)
		// It doesn't seem to matter whether we use this or not!
		if useNotifyPublish {
			pub.NotifyPublish(chConfirm)
			wg.Add(1)
			go func() {
				defer wg.Done()
				confirmCount := 0
				for range chConfirm {
					confirmCount += 1
				}
				log.Printf("Publisher %d got %d confirms (NotifyConfirm)\n", pubNumber, confirmCount)
				totalNotifyConfirms.Add(int32(confirmCount))
			}()
		}
		wg.Add(1)
		go func() {
			defer wg.Done()
			okCount := 0
			errCount := 0
			for val := range toSend {
				msg := amqp091.Publishing{
					ContentType:     "text/plain",
					ContentEncoding: "ascii",
					Body:            []byte(strconv.Itoa(val)),
				}
				confirm, err := pub.PublishWithDeferredConfirmWithContext(
					context.Background(),
					exchangeName,
					routingKey,
					false,
					false,
					msg,
				)
				quitOn(err)
				if !skipWait {
					wasOK := confirm.Wait()
					if !wasOK {
						log.Printf("Value %d was not confirmed\n", val)
						errCount += 1
					} else {
						okCount += 1
					}
				}
			}
			log.Printf("Publisher %d done (%d OK, %d err)\n", pubNumber, okCount, errCount)
			time.Sleep(time.Second) // Wait for confirmations a bit.
			if err := pub.Close(); err != nil {
				log.Printf("Publisher %d: %s\n", pubNumber, err)
			}
		}()
	}

	log.Printf("Pushing %d messages to %d confirming publishers\n", messageCount, publisherCount)
	// Push all the messages to the publishers
	for i := 1; i <= messageCount; i++ {
		toSend <- i
	}
	close(toSend)

	log.Printf("Done sending, waiting for publishers/consumers")
	wg.Wait()
	if useNotifyPublish {
		log.Printf("Confirmations using NotifyPublish(): %d\n", totalNotifyConfirms.Load())
	}
}

Metadata

Metadata

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions