Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: enable extracting tag values from MQTT topics #9995

Merged
merged 15 commits into from
Nov 23, 2021
Prev Previous commit
Next Next commit
fix: topicTag not found
  • Loading branch information
MyaLongmire committed Nov 15, 2021
commit 50aec120407a1768f981494c70c88378d7eb0ca3
119 changes: 27 additions & 92 deletions plugins/inputs/mqtt_consumer/mqtt_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
Expand All @@ -21,8 +20,7 @@ import (

var (
// 30 Seconds is the default used by paho.mqtt.golang
defaultConnectionTimeout = config.Duration(30 * time.Second)

defaultConnectionTimeout = config.Duration(30 * time.Second)
defaultMaxUndeliveredMessages = 1000
)

Expand All @@ -42,79 +40,65 @@ type Client interface {
AddRoute(topic string, callback mqtt.MessageHandler)
Disconnect(quiesce uint)
}

type ClientFactory func(o *mqtt.ClientOptions) Client

type TopicParsingConfig struct {
Topic string `toml:"topic"`
Measurement string `toml:"measurement"`
Tags string `toml:"tags"`
Fields string `toml:"fields"`
FieldTypes map[string]string `toml:"types"`
}

type MQTTConsumer struct {
Servers []string `toml:"servers"`
Topics []string `toml:"topics"`
TopicTag *string `toml:"topic_tag"`
TopicParsing []TopicParsingConfig `toml:"topic_parsing"`

Username string `toml:"username"`
Password string `toml:"password"`
QoS int `toml:"qos"`
ConnectionTimeout config.Duration `toml:"connection_timeout"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`

parser parsers.Parser

Servers []string `toml:"servers"`
Topics []string `toml:"topics"`
TopicTag *string `toml:"topic_tag"`
TopicParsing []TopicParsingConfig `toml:"topic_parsing"`
Username string `toml:"username"`
Password string `toml:"password"`
QoS int `toml:"qos"`
ConnectionTimeout config.Duration `toml:"connection_timeout"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
parser parsers.Parser
// Legacy metric buffer support; deprecated in v0.10.3
MetricBuffer int

MetricBuffer int
PersistentSession bool
ClientID string `toml:"client_id"`
tls.ClientConfig

Log telegraf.Logger

clientFactory ClientFactory
client Client
opts *mqtt.ClientOptions
acc telegraf.TrackingAccumulator
state ConnectionState
sem semaphore
messages map[telegraf.TrackingID]bool
messagesMutex sync.Mutex
chosenTopicTag string

ctx context.Context
cancel context.CancelFunc
Log telegraf.Logger
clientFactory ClientFactory
client Client
opts *mqtt.ClientOptions
acc telegraf.TrackingAccumulator
state ConnectionState
sem semaphore
messages map[telegraf.TrackingID]bool
messagesMutex sync.Mutex
topicTag string
ctx context.Context
cancel context.CancelFunc
}

var sampleConfig = `
## Broker URLs for the MQTT server or cluster. To connect to multiple
## clusters or standalone servers, use a separate plugin instance.
## clusters or standalone servers, use a seperate plugin instance.
MyaLongmire marked this conversation as resolved.
Show resolved Hide resolved
## example: servers = ["tcp://localhost:1883"]
## servers = ["ssl://localhost:1883"]
## servers = ["ws://localhost:1883"]
servers = ["tcp://127.0.0.1:1883"]

## Topics that will be subscribed to.
topics = [
"telegraf/host01/cpu",
"telegraf/+/mem",
"sensors/#",
]

## Enable extracting tag values from MQTT topics
## _ denotes an ignored entry in the topic path
# topic_tags = "_/format/client/_"
# topic_measurement = "measurement/_/_/_"
# topic_fields = "_/_/_/temperature"

## The message topic will be stored in a tag specified by this value. If set
## to the empty string no topic tag will be created.
# topic_tag = "topic"

## QoS policy for messages
## 0 = at most once
## 1 = at least once
Expand All @@ -123,10 +107,8 @@ var sampleConfig = `
## When using a QoS of 1 or 2, you should enable persistent_session to allow
## resuming unacknowledged messages.
# qos = 0

## Connection timeout for initial connection in seconds
# connection_timeout = "30s"

## Maximum messages to read from the broker that have not been written by an
## output. For best throughput set based on the number of metrics within
## each message and the size of the output's metric_batch_size.
Expand All @@ -136,34 +118,28 @@ var sampleConfig = `
## full batch is collected and the write is triggered immediately without
## waiting until the next flush_interval.
# max_undelivered_messages = 1000

## Persistent session disables clearing of the client session on connection.
## In order for this option to work you must also set client_id to identify
## the client. To receive messages that arrived while the client is offline,
## also set the qos option to 1 or 2 and don't forget to also set the QoS when
## publishing.
# persistent_session = false

## If unset, a random client ID will be generated.
# client_id = ""

## Username and password to connect MQTT server.
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"

## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"

## Enable extracting tag values from MQTT topics
## _ denotes an ignored entry in the topic path
## [[inputs.mqtt_consumer.topic_parsing]]
Expand All @@ -178,78 +154,60 @@ var sampleConfig = `
func (m *MQTTConsumer) SampleConfig() string {
return sampleConfig
}

func (m *MQTTConsumer) Description() string {
return "Read metrics from MQTT topic(s)"
}

func (m *MQTTConsumer) SetParser(parser parsers.Parser) {
m.parser = parser
}

func (m *MQTTConsumer) Init() error {
m.state = Disconnected

if m.PersistentSession && m.ClientID == "" {
return errors.New("persistent_session requires client_id")
}

if m.QoS > 2 || m.QoS < 0 {
return fmt.Errorf("qos value must be 0, 1, or 2: %d", m.QoS)
}

if time.Duration(m.ConnectionTimeout) < 1*time.Second {
return fmt.Errorf("connection_timeout must be greater than 1s: %s", time.Duration(m.ConnectionTimeout))
}

m.chosenTopicTag = "topic"
m.topicTag = "topic"
if m.TopicTag != nil {
m.chosenTopicTag = *m.TopicTag
m.topicTag = *m.TopicTag
}

opts, err := m.createOpts()
if err != nil {
return err
}

m.opts = opts
m.messages = map[telegraf.TrackingID]bool{}

return nil
}

func (m *MQTTConsumer) Start(acc telegraf.Accumulator) error {
m.state = Disconnected

m.acc = acc.WithTracking(m.MaxUndeliveredMessages)
m.sem = make(semaphore, m.MaxUndeliveredMessages)
m.ctx, m.cancel = context.WithCancel(context.Background())

m.client = m.clientFactory(m.opts)

// AddRoute sets up the function for handling messages. These need to be
// added in case we find a persistent session containing subscriptions so we
// know where to dispatch persisted and new messages to. In the alternate
// case that we need to create the subscriptions these will be replaced.
for _, topic := range m.Topics {
m.client.AddRoute(topic, m.recvMessage)
}

m.state = Connecting
return m.connect()
}

func (m *MQTTConsumer) connect() error {
token := m.client.Connect()
if token.Wait() && token.Error() != nil {
err := token.Error()
m.state = Disconnected
return err
}

m.Log.Infof("Connected %v", m.Servers)
m.state = Connected

// Persistent sessions should skip subscription if a session is present, as
// the subscriptions are stored by the server.
type sessionPresent interface {
Expand All @@ -259,28 +217,23 @@ func (m *MQTTConsumer) connect() error {
m.Log.Debugf("Session found %v", m.Servers)
return nil
}

topics := make(map[string]byte)
for _, topic := range m.Topics {
topics[topic] = byte(m.QoS)
}

subscribeToken := m.client.SubscribeMultiple(topics, m.recvMessage)
subscribeToken.Wait()
if subscribeToken.Error() != nil {
m.acc.AddError(fmt.Errorf("subscription error: topics: %s: %v",
strings.Join(m.Topics[:], ","), subscribeToken.Error()))
}

return nil
}

func (m *MQTTConsumer) onConnectionLost(_ mqtt.Client, err error) {
m.acc.AddError(fmt.Errorf("connection lost: %v", err))
m.Log.Debugf("Disconnected %v", m.Servers)
m.state = Disconnected
}

func (m *MQTTConsumer) recvMessage(_ mqtt.Client, msg mqtt.Message) {
for {
select {
Expand Down Expand Up @@ -345,7 +298,6 @@ func (m *MQTTConsumer) onMessage(acc telegraf.TrackingAccumulator, msg mqtt.Mess
}
metric.SetName(m)
}

if p.Tags != "" {
tags, err := parseTags(strings.Split(p.Tags, "/"), values)
if err != nil {
Expand All @@ -355,7 +307,6 @@ func (m *MQTTConsumer) onMessage(acc telegraf.TrackingAccumulator, msg mqtt.Mess
metric.AddTag(k, v)
}
}

if p.Fields != "" {
fields, err := parseFields(strings.Split(p.Fields, "/"), values, p.FieldTypes)
if err != nil {
Expand All @@ -368,14 +319,12 @@ func (m *MQTTConsumer) onMessage(acc telegraf.TrackingAccumulator, msg mqtt.Mess
}
}
}

id := acc.AddTrackingMetricGroup(metrics)
m.messagesMutex.Lock()
m.messages[id] = true
m.messagesMutex.Unlock()
return nil
}

func (m *MQTTConsumer) Stop() {
if m.state == Connected {
m.Log.Debugf("Disconnecting %v", m.Servers)
Expand All @@ -385,37 +334,29 @@ func (m *MQTTConsumer) Stop() {
}
m.cancel()
}

func (m *MQTTConsumer) Gather(_ telegraf.Accumulator) error {
if m.state == Disconnected {
m.state = Connecting
m.Log.Debugf("Connecting %v", m.Servers)
return m.connect()
}

return nil
}

func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
opts := mqtt.NewClientOptions()

opts.ConnectTimeout = time.Duration(m.ConnectionTimeout)

if m.ClientID == "" {
opts.SetClientID("Telegraf-Consumer-" + internal.RandomString(5))
} else {
opts.SetClientID(m.ClientID)
}

tlsCfg, err := m.ClientConfig.TLSConfig()
if err != nil {
return nil, err
}

if tlsCfg != nil {
opts.SetTLSConfig(tlsCfg)
}

user := m.Username
if user != "" {
opts.SetUsername(user)
Expand All @@ -424,11 +365,9 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
if password != "" {
opts.SetPassword(password)
}

if len(m.Servers) == 0 {
return opts, fmt.Errorf("could not get host informations")
}

for _, server := range m.Servers {
// Preserve support for host:port style servers; deprecated in Telegraf 1.4.4
if !strings.Contains(server, "://") {
Expand All @@ -439,14 +378,12 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
server = "ssl://" + server
}
}

opts.AddBroker(server)
}
opts.SetAutoReconnect(false)
opts.SetKeepAlive(time.Second * 60)
opts.SetCleanSession(!m.PersistentSession)
opts.SetConnectionLostHandler(m.onConnectionLost)

return opts, nil
}

Expand Down Expand Up @@ -515,7 +452,6 @@ func parseFields(keys []string, values []string, types map[string]string) (map[s
}
return results, nil
}

func New(factory ClientFactory) *MQTTConsumer {
return &MQTTConsumer{
Servers: []string{"tcp://127.0.0.1:1883"},
Expand All @@ -525,7 +461,6 @@ func New(factory ClientFactory) *MQTTConsumer {
state: Disconnected,
}
}

func init() {
inputs.Add("mqtt_consumer", func() telegraf.Input {
return New(func(o *mqtt.ClientOptions) Client {
Expand Down