Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable extracting tag values from MQTT topics #9995

Merged
merged 15 commits into from
Nov 23, 2021
Prev Previous commit
Next Next commit
feat: added unit tests and documentation
  • Loading branch information
MyaLongmire committed Nov 15, 2021
commit 0dbf0858a0f97de75d7b2013539a3ec9a4b5e118
64 changes: 56 additions & 8 deletions plugins/inputs/mqtt_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
The [MQTT][mqtt] consumer plugin reads from the specified MQTT topics
and creates metrics using one of the supported [input data formats][].

### Configuration
## Configuration

```toml
[[inputs.mqtt_consumer]]
Expand Down Expand Up @@ -76,20 +76,68 @@ and creates metrics using one of the supported [input data formats][].

## Enable extracting tag values from MQTT topics
## _ denotes an ignored entry in the topic path
## [[inputs.mqtt_consumer.topic_parsing]]
## topic = ""
## measurement = ""
## tags = ""
## fields = ""
# [[inputs.mqtt_consumer.topic_parsing]]
# topic = ""
# measurement = ""
# tags = ""
# fields = ""
## Value supported is int, float, unit
## [[inputs.mqtt_consumer.topic.types]]
## key = type
# [[inputs.mqtt_consumer.topic.types]]
# key = type
```

## About Topic Pasring
MyaLongmire marked this conversation as resolved.
Show resolved Hide resolved

The MQTT topic as a whole is stored as a tag, but this can be far too coarse
too be easily used when utilising the data further down the line. This
MyaLongmire marked this conversation as resolved.
Show resolved Hide resolved
change allows tag values to be extracted from the MQTT topic letting you
store the information provided in the topic in a meaningful way. An _ denotes an
MyaLongmire marked this conversation as resolved.
Show resolved Hide resolved
ignored entry in the topic path. Please see the following example.

## Example Configuration for topic parsing

```toml
[[inputs.mqtt_consumer]]
## Broker URLs for the MQTT server or cluster. To connect to multiple
## clusters or standalone servers, use a separate plugin instance.
## example: servers = ["tcp://localhost:1883"]
## servers = ["ssl://localhost:1883"]
## servers = ["ws://localhost:1883"]
servers = ["tcp://127.0.0.1:1883"]

## Topics that will be subscribed to.
topics = [
"telegraf/+/cpu/23",
]

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "value"
data_type = "float"

[[inputs.mqtt_consumer.topic_parsing]]
topic = "telegraf/one/cpu/23"
srebhan marked this conversation as resolved.
Show resolved Hide resolved
measurement = "_/_/measurement/_"
tags = "tag/_/_/_"
fields = "_/_/_/test"
[inputs.mqtt_consumer.topic_parsing.types]
test = "int"
MyaLongmire marked this conversation as resolved.
Show resolved Hide resolved
```

Result:

```shell
cpu,host=pop-os,tag=telegraf,topic=telegraf/one/cpu/23 value=45,test=23i 1637014942460689291
```

### Metrics

- All measurements are tagged with the incoming topic, ie
`topic=telegraf/host01/cpu`

- example when [[inputs.mqtt_consumer.topic_parsing]] is set

[mqtt]: https://mqtt.org
[input data formats]: /docs/DATA_FORMATS_INPUT.md
29 changes: 24 additions & 5 deletions plugins/inputs/mqtt_consumer/mqtt_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ var sampleConfig = `
## measurement = ""
## tags = ""
## fields = ""
## [inputs.mqtt_consumer.topic_parsing.types]
##
`

func (m *MQTTConsumer) SampleConfig() string {
Expand Down Expand Up @@ -303,22 +305,36 @@ func (m *MQTTConsumer) recvMessage(_ mqtt.Client, msg mqtt.Message) {
}
}

// compareTopics is used to support the mqtt wild card `+` which allows for one topic of any value
func compareTopics(expected string, incoming string) bool {
expectedSplit := strings.Split(expected, "/")
incomingSplit := strings.Split(incoming, "/")
if len(expectedSplit) != len(incomingSplit) {
return false
}

for i := range expectedSplit {
if incomingSplit[i] != expectedSplit[i] && expectedSplit[i] != "+" {
return false
}
}
MyaLongmire marked this conversation as resolved.
Show resolved Hide resolved

return true
}

func (m *MQTTConsumer) onMessage(acc telegraf.TrackingAccumulator, msg mqtt.Message) error {
println("onMessgae")

metrics, err := m.parser.Parse(msg.Payload())
println(err)
if err != nil {
return err
}

for _, metric := range metrics {
println("in for loop")
if m.topicTag != "" {
metric.AddTag(m.topicTag, msg.Topic())
}
for _, p := range m.TopicParsing {
fmt.Printf("parsing topics %s \n", p)
if p.Topic == msg.Topic() {
if compareTopics(p.Topic, msg.Topic()) {
values := strings.Split(msg.Topic(), "/")

if p.Measurement != "" {
Expand Down Expand Up @@ -433,6 +449,7 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
return opts, nil
}

// parseMeasurement gets a single value from the incoming topic based on the user configuration (TopicParsing.Measurement)
func parseMeasurement(keys []string, values []string) (string, error) {
for i, k := range keys {
if k != "_" {
Expand All @@ -442,6 +459,7 @@ func parseMeasurement(keys []string, values []string) (string, error) {
return "", fmt.Errorf("no measurements found")
}

// parseTags gets multiple tags from the topic based on the user configuration (TopicParsing.Tags)
func parseTags(keys []string, values []string) (map[string]string, error) {
results := make(map[string]string)
for i, k := range keys {
Expand All @@ -455,6 +473,7 @@ func parseTags(keys []string, values []string) (map[string]string, error) {
return results, nil
}

// parseFields gets multiple fields from the topic based on the user configuration (TopicParsing.Fields)
func parseFields(keys []string, values []string, types map[string]string) (map[string]interface{}, error) {
MyaLongmire marked this conversation as resolved.
Show resolved Hide resolved
results := make(map[string]interface{})
for i, k := range keys {
Expand Down
93 changes: 84 additions & 9 deletions plugins/inputs/mqtt_consumer/mqtt_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func TestPersistentClientIDFail(t *testing.T) {
}

type Message struct {
topic string
}

func (m *Message) Duplicate() bool {
Expand All @@ -168,7 +169,7 @@ func (m *Message) Retained() bool {
}

func (m *Message) Topic() string {
return "telegraf"
return m.topic
}

func (m *Message) MessageID() uint16 {
Expand All @@ -185,12 +186,15 @@ func (m *Message) Ack() {

func TestTopicTag(t *testing.T) {
tests := []struct {
name string
topicTag func() *string
expected []telegraf.Metric
name string
topic string
topicTag func() *string
topicParsing []TopicParsingConfig
expected []telegraf.Metric
}{
{
name: "default topic when topic tag is unset for backwards compatibility",
name: "default topic when topic tag is unset for backwards compatibility",
topic: "telegraf",
topicTag: func() *string {
return nil
},
Expand All @@ -208,7 +212,8 @@ func TestTopicTag(t *testing.T) {
},
},
{
name: "use topic tag when set",
name: "use topic tag when set",
topic: "telegraf",
topicTag: func() *string {
tag := "topic_tag"
return &tag
Expand All @@ -227,7 +232,8 @@ func TestTopicTag(t *testing.T) {
},
},
{
name: "no topic tag is added when topic tag is set to the empty string",
name: "no topic tag is added when topic tag is set to the empty string",
topic: "telegraf",
topicTag: func() *string {
tag := ""
return &tag
Expand All @@ -243,6 +249,71 @@ func TestTopicTag(t *testing.T) {
),
},
},
{
name: "topic parsing configured",
topic: "telegraf/123/test",
topicTag: func() *string {
tag := ""
return &tag
},
topicParsing: []TopicParsingConfig{
{
Topic: "telegraf/123/test",
Measurement: "_/_/measurement",
Tags: "testTag/_/_",
Fields: "_/testNumber/_",
FieldTypes: map[string]string{
"testNumber": "int",
},
},
},
expected: []telegraf.Metric{
testutil.MustMetric(
"test",
map[string]string{
"testTag": "telegraf",
},
map[string]interface{}{
"testNumber": 123,
"time_idle": 42,
},
time.Unix(0, 0),
),
},
},
{
name: "topic parsing configured with a mqtt wild card `+`",
topic: "telegraf/123/test/hello",
topicTag: func() *string {
tag := ""
return &tag
},
topicParsing: []TopicParsingConfig{
{
Topic: "telegraf/+/test/hello",
Measurement: "_/_/measurement/_",
Tags: "testTag/_/_/_",
Fields: "_/testNumber/_/testString",
FieldTypes: map[string]string{
"testNumber": "int",
},
},
},
expected: []telegraf.Metric{
testutil.MustMetric(
"test",
map[string]string{
"testTag": "telegraf",
},
map[string]interface{}{
"testNumber": 123,
"testString": "hello",
"time_idle": 42,
},
time.Unix(0, 0),
),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -265,8 +336,9 @@ func TestTopicTag(t *testing.T) {
return client
})
plugin.Log = testutil.Logger{}
plugin.Topics = []string{"telegraf"}
plugin.Topics = []string{tt.topic}
plugin.TopicTag = tt.topicTag()
plugin.TopicParsing = tt.topicParsing

parser, err := parsers.NewInfluxParser()
require.NoError(t, err)
Expand All @@ -279,7 +351,10 @@ func TestTopicTag(t *testing.T) {
err = plugin.Start(&acc)
require.NoError(t, err)

handler(nil, &Message{})
var m Message
m.topic = tt.topic

handler(nil, &m)

plugin.Stop()

Expand Down