Skip to content

Commit

Permalink
Add support for exchanges to RabbitMQ input (#3619)
Browse files Browse the repository at this point in the history
  • Loading branch information
kerams authored and danielnelson committed Jan 4, 2018
1 parent 57138e0 commit bde4d00
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 3 deletions.
18 changes: 18 additions & 0 deletions plugins/inputs/rabbitmq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
## A list of queues to gather as the rabbitmq_queue measurement. If not
## specified, metrics for all queues are gathered.
# queues = ["telegraf"]

## A list of exchanges to gather as the rabbitmq_exchange measurement. If not
## specified, metrics for all exchanges are gathered.
# exchanges = ["telegraf"]
```

### Measurements & Fields:
Expand Down Expand Up @@ -95,6 +99,10 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
- messages_redeliver_rate (float, messages per second)
- messages_unack (integer, count)

- rabbitmq_exchange
- messages_publish_in (int, count)
- messages_publish_out (int, count)

### Tags:

- All measurements have the following tags:
Expand All @@ -114,6 +122,15 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
- durable
- auto_delete

- rabbitmq_exchange
- url
- exchange
- type
- vhost
- internal
- durable
- auto_delete

### Sample Queries:

Message rates for the entire node can be calculated from total message counts. For instance, to get the rate of messages published per minute, use this query:
Expand All @@ -129,4 +146,5 @@ FROM rabbitmq_overview WHERE time > now() - 10m GROUP BY time(1m)
rabbitmq_queue,url=http://amqp.example.org:15672,queue=telegraf,vhost=influxdb,node=rabbit@amqp.example.org,durable=true,auto_delete=false,host=amqp.example.org messages_deliver_get=0i,messages_publish=329i,messages_publish_rate=0.2,messages_redeliver_rate=0,message_bytes_ready=0i,message_bytes_unacked=0i,messages_deliver=329i,messages_unack=0i,consumers=1i,idle_since="",messages=0i,messages_deliver_rate=0.2,messages_deliver_get_rate=0.2,messages_redeliver=0i,memory=43032i,message_bytes_ram=0i,messages_ack=329i,messages_ready=0i,messages_ack_rate=0.2,consumer_utilisation=1,message_bytes=0i,message_bytes_persist=0i 1493684035000000000
rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i 1493684035000000000
rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i 149368403500000000
rabbitmq_exchange,url=http://amqp.example.org:15672,exchange=telegraf,type=fanout,vhost=influxdb,internal=false,durable=true,auto_delete=false,host=amqp.example.org messages_publish_in=2i,messages_publish_out=1i 149368403500000000
```
71 changes: 68 additions & 3 deletions plugins/inputs/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ type RabbitMQ struct {
ResponseHeaderTimeout internal.Duration `toml:"header_timeout"`
ClientTimeout internal.Duration `toml:"client_timeout"`

Nodes []string
Queues []string
Nodes []string
Queues []string
Exchanges []string

Client *http.Client
}
Expand Down Expand Up @@ -78,6 +79,8 @@ type MessageStats struct {
PublishDetails Details `json:"publish_details"`
Redeliver int64
RedeliverDetails Details `json:"redeliver_details"`
PublishIn int64 `json:"publish_in"`
PublishOut int64 `json:"publish_out"`
}

// ObjectTotals ...
Expand Down Expand Up @@ -133,10 +136,20 @@ type Node struct {
SocketsUsed int64 `json:"sockets_used"`
}

type Exchange struct {
Name string
MessageStats `json:"message_stats"`
Type string
Internal bool
Vhost string
Durable bool
AutoDelete bool `json:"auto_delete"`
}

// gatherFunc ...
type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator)

var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues}
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues, gatherExchanges}

var sampleConfig = `
## Management Plugin url. (default: http://localhost:15672)
Expand Down Expand Up @@ -171,6 +184,10 @@ var sampleConfig = `
## A list of queues to gather as the rabbitmq_queue measurement. If not
## specified, metrics for all queues are gathered.
# queues = ["telegraf"]
## A list of exchanges to gather as the rabbitmq_exchange measurement. If not
## specified, metrics for all exchanges are gathered.
# exchanges = ["telegraf"]
`

// SampleConfig ...
Expand Down Expand Up @@ -374,6 +391,40 @@ func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator) {
}
}

func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) {
// Gather information about exchanges
exchanges := make([]Exchange, 0)
err := r.requestJSON("/api/exchanges", &exchanges)
if err != nil {
acc.AddError(err)
return
}

for _, exchange := range exchanges {
if !r.shouldGatherExchange(exchange) {
continue
}
tags := map[string]string{
"url": r.URL,
"exchange": exchange.Name,
"type": exchange.Type,
"vhost": exchange.Vhost,
"internal": strconv.FormatBool(exchange.Internal),
"durable": strconv.FormatBool(exchange.Durable),
"auto_delete": strconv.FormatBool(exchange.AutoDelete),
}

acc.AddFields(
"rabbitmq_exchange",
map[string]interface{}{
"messages_publish_in": exchange.MessageStats.PublishIn,
"messages_publish_out": exchange.MessageStats.PublishOut,
},
tags,
)
}
}

func (r *RabbitMQ) shouldGatherNode(node Node) bool {
if len(r.Nodes) == 0 {
return true
Expand Down Expand Up @@ -402,6 +453,20 @@ func (r *RabbitMQ) shouldGatherQueue(queue Queue) bool {
return false
}

func (r *RabbitMQ) shouldGatherExchange(exchange Exchange) bool {
if len(r.Exchanges) == 0 {
return true
}

for _, name := range r.Exchanges {
if name == exchange.Name {
return true
}
}

return false
}

func init() {
inputs.Add("rabbitmq", func() telegraf.Input {
return &RabbitMQ{
Expand Down
107 changes: 107 additions & 0 deletions plugins/inputs/rabbitmq/rabbitmq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,102 @@ const sampleQueuesResponse = `
]
`

const sampleExchangesResponse = `
[
{
"arguments": { },
"internal": false,
"auto_delete": false,
"durable": true,
"type": "direct",
"vhost": "\/",
"name": ""
},
{
"message_stats": {
"publish_in_details": {
"rate": 0
},
"publish_in": 2,
"publish_out_details": {
"rate": 0
},
"publish_out": 1
},
"arguments": { },
"internal": false,
"auto_delete": false,
"durable": true,
"type": "fanout",
"vhost": "\/",
"name": "telegraf"
},
{
"arguments": { },
"internal": false,
"auto_delete": false,
"durable": true,
"type": "direct",
"vhost": "\/",
"name": "amq.direct"
},
{
"arguments": { },
"internal": false,
"auto_delete": false,
"durable": true,
"type": "fanout",
"vhost": "\/",
"name": "amq.fanout"
},
{
"arguments": { },
"internal": false,
"auto_delete": false,
"durable": true,
"type": "headers",
"vhost": "\/",
"name": "amq.headers"
},
{
"arguments": { },
"internal": false,
"auto_delete": false,
"durable": true,
"type": "headers",
"vhost": "\/",
"name": "amq.match"
},
{
"arguments": { },
"internal": true,
"auto_delete": false,
"durable": true,
"type": "topic",
"vhost": "\/",
"name": "amq.rabbitmq.log"
},
{
"arguments": { },
"internal": true,
"auto_delete": false,
"durable": true,
"type": "topic",
"vhost": "\/",
"name": "amq.rabbitmq.trace"
},
{
"arguments": { },
"internal": false,
"auto_delete": false,
"durable": true,
"type": "topic",
"vhost": "\/",
"name": "amq.topic"
}
]
`

func TestRabbitMQGeneratesMetrics(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var rsp string
Expand All @@ -385,6 +481,8 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
rsp = sampleNodesResponse
case "/api/queues":
rsp = sampleQueuesResponse
case "/api/exchanges":
rsp = sampleExchangesResponse
default:
panic("Cannot handle request")
}
Expand Down Expand Up @@ -441,4 +539,13 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
}

assert.True(t, acc.HasMeasurement("rabbitmq_queue"))

exchangeIntMetrics := []string{
"messages_publish_in",
"messages_publish_out",
}

for _, metric := range exchangeIntMetrics {
assert.True(t, acc.HasInt64Field("rabbitmq_exchange", metric))
}
}

0 comments on commit bde4d00

Please sign in to comment.