Skip to content

Commit

Permalink
Add topic_tag option to mqtt_consumer (influxdata#6266)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and idohalevi committed Sep 23, 2020
1 parent bd4b58d commit 73e56f8
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 7 deletions.
4 changes: 4 additions & 0 deletions plugins/inputs/mqtt_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ and creates metrics using one of the supported [input data formats][].
"sensors/#",
]

## The message topic will be stored in a tag specified by this value. If set
## to the empty string no topic tag will be created.
# topic_tag = "topic"

## QoS policy for messages
## 0 = at most once
## 1 = at least once
Expand Down
27 changes: 20 additions & 7 deletions plugins/inputs/mqtt_consumer/mqtt_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ type Client interface {
type ClientFactory func(o *mqtt.ClientOptions) Client

type MQTTConsumer struct {
Servers []string
Topics []string
Username string
Password string
Servers []string `toml:"servers"`
Topics []string `toml:"topics"`
TopicTag *string `toml:"topic_tag"`
Username string `toml:"username"`
Password string `toml:"password"`
QoS int `toml:"qos"`
ConnectionTimeout internal.Duration `toml:"connection_timeout"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
Expand All @@ -67,6 +68,7 @@ type MQTTConsumer struct {
state ConnectionState
sem semaphore
messages map[telegraf.TrackingID]bool
topicTag string

ctx context.Context
cancel context.CancelFunc
Expand All @@ -84,6 +86,10 @@ var sampleConfig = `
"sensors/#",
]
## The message topic will be stored in a tag specified by this value. If set
## to the empty string no topic tag will be created.
# topic_tag = "topic"
## QoS policy for messages
## 0 = at most once
## 1 = at least once
Expand Down Expand Up @@ -161,6 +167,11 @@ func (m *MQTTConsumer) Init() error {
return fmt.Errorf("connection_timeout must be greater than 1s: %s", m.ConnectionTimeout.Duration)
}

m.topicTag = "topic"
if m.TopicTag != nil {
m.topicTag = *m.TopicTag
}

opts, err := m.createOpts()
if err != nil {
return err
Expand Down Expand Up @@ -267,9 +278,11 @@ func (m *MQTTConsumer) onMessage(acc telegraf.TrackingAccumulator, msg mqtt.Mess
return err
}

topic := msg.Topic()
for _, metric := range metrics {
metric.AddTag("topic", topic)
if m.topicTag != "" {
topic := msg.Topic()
for _, metric := range metrics {
metric.AddTag(m.topicTag, topic)
}
}

id := acc.AddTrackingMetricGroup(metrics)
Expand Down
136 changes: 136 additions & 0 deletions plugins/inputs/mqtt_consumer/mqtt_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,142 @@ func TestPersistentClientIDFail(t *testing.T) {
require.Error(t, err)
}

type Message struct {
}

func (m *Message) Duplicate() bool {
panic("not implemented")
}

func (m *Message) Qos() byte {
panic("not implemented")
}

func (m *Message) Retained() bool {
panic("not implemented")
}

func (m *Message) Topic() string {
return "telegraf"
}

func (m *Message) MessageID() uint16 {
panic("not implemented")
}

func (m *Message) Payload() []byte {
return []byte("cpu time_idle=42i")
}

func (m *Message) Ack() {
panic("not implemented")
}

func TestTopicTag(t *testing.T) {
tests := []struct {
name string
topicTag func() *string
expected []telegraf.Metric
}{
{
name: "default topic when topic tag is unset for backwards compatibility",
topicTag: func() *string {
return nil
},
expected: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"topic": "telegraf",
},
map[string]interface{}{
"time_idle": 42,
},
time.Unix(0, 0),
),
},
},
{
name: "use topic tag when set",
topicTag: func() *string {
tag := "topic_tag"
return &tag
},
expected: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{
"topic_tag": "telegraf",
},
map[string]interface{}{
"time_idle": 42,
},
time.Unix(0, 0),
),
},
},
{
name: "no topic tag is added when topic tag is set to the empty string",
topicTag: func() *string {
tag := ""
return &tag
},
expected: []telegraf.Metric{
testutil.MustMetric(
"cpu",
map[string]string{},
map[string]interface{}{
"time_idle": 42,
},
time.Unix(0, 0),
),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var handler mqtt.MessageHandler
client := &FakeClient{
ConnectF: func() mqtt.Token {
return &FakeToken{}
},
AddRouteF: func(topic string, callback mqtt.MessageHandler) {
handler = callback
},
SubscribeMultipleF: func(filters map[string]byte, callback mqtt.MessageHandler) mqtt.Token {
return &FakeToken{}
},
DisconnectF: func(quiesce uint) {
},
}

plugin := New(func(o *mqtt.ClientOptions) Client {
return client
})
plugin.Topics = []string{"telegraf"}
plugin.TopicTag = tt.topicTag()

parser, err := parsers.NewInfluxParser()
require.NoError(t, err)
plugin.SetParser(parser)

err = plugin.Init()
require.NoError(t, err)

var acc testutil.Accumulator
err = plugin.Start(&acc)
require.NoError(t, err)

handler(nil, &Message{})

plugin.Stop()

testutil.RequireMetricsEqual(t, tt.expected, acc.GetTelegrafMetrics(),
testutil.IgnoreTime())
})
}
}

func TestAddRouteCalledForEachTopic(t *testing.T) {
client := &FakeClient{
ConnectF: func() mqtt.Token {
Expand Down

0 comments on commit 73e56f8

Please sign in to comment.