Skip to content

Commit

Permalink
fix(inputs.amqp_consumer): NACKing messages on non-delivery related e…
Browse files Browse the repository at this point in the history
…rrors (influxdata#15796)
  • Loading branch information
srebhan authored and asaharn committed Oct 16, 2024
1 parent 5cac43b commit 60de025
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 10 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
<!-- markdownlint-disable MD024 -->
# Changelog

## Unreleased

### Important Changes

- PR [#15796](https://github.com/influxdata/telegraf/pull/15796) changes the
delivery state update of un-parseable messages from `ACK` to `NACK` without
requeueing. This way, those messages are not lost and can optionally be
handled using a dead-letter exchange by other means.

## v1.32.0 [2024-09-09]

### Important Changes
Expand Down
29 changes: 24 additions & 5 deletions plugins/inputs/amqp_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ implementation of this protocol being [RabbitMQ](https://www.rabbitmq.com/).
Metrics are read from a topic exchange using the configured queue and
binding_key.

Message payload should be formatted in one of the [Telegraf Data
Formats](../../../docs/DATA_FORMATS_INPUT.md).
Message payload should be formatted in one of the
[Telegraf Data Formats](../../../docs/DATA_FORMATS_INPUT.md).

For an introduction to AMQP see:

Expand Down Expand Up @@ -153,10 +153,29 @@ to use them.
data_format = "influx"
```

## Message acknowledgement behavior

This plugin tracks metrics to report the delivery state to the broker.

Messages are **acknowledged** (ACK) in the broker if they were successfully
parsed and delivered to all corresponding output sinks.

Messages are **not acknowledged** (NACK) if parsing of the messages fails and no
metrics were created. In this case requeueing is disabled so messages will not
be sent out to any other queue. The message will then be discarded or sent to a
dead-letter exchange depending on the server configuration. See
[RabitMQ documentation][rabbitmq_doc] for more details.

Messages are **rejected** (REJECT) if the messages were parsed correctly but
could not be delivered e.g. due to output-service outages. Requeueing is
disabled in this case and messages will be discarded by the server. See
[RabitMQ documentation][rabbitmq_doc] for more details.

[rabbitmq_doc]: https://www.rabbitmq.com/docs/confirms

## Metrics

TODO
The format of metrics produced by this plugin depends on the content and
data format of received messages.

## Example Output

TODO
8 changes: 3 additions & 5 deletions plugins/inputs/amqp_consumer/amqp_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,11 +422,9 @@ func (a *AMQPConsumer) process(ctx context.Context, msgs <-chan amqp.Delivery, a

func (a *AMQPConsumer) onMessage(acc telegraf.TrackingAccumulator, d amqp.Delivery) error {
onError := func() {
// Discard the message from the queue; will never be able to process
// this message.
rejErr := d.Ack(false)
if rejErr != nil {
a.Log.Errorf("Unable to reject message: %d: %v", d.DeliveryTag, rejErr)
// Discard the message from the queue; will never be able to process it
if err := d.Nack(false, false); err != nil {
a.Log.Errorf("Unable to NACK message: %d: %v", d.DeliveryTag, err)
a.conn.Close()
}
}
Expand Down

0 comments on commit 60de025

Please sign in to comment.