diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go index 890ed9f5d1a34..d869ccc7eb102 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -96,10 +96,6 @@ var sampleConfig = ` "telegraf/+/mem", "sensors/#", ] - ## Enable extracting tag values from MQTT topics - ## _ denotes an ignored entry in the topic path - # topic_tags = "_/format/client/_" - # topic_measurement = "measurement/_/_/_" # topic_fields = "_/_/_/temperature" ## The message topic will be stored in a tag specified by this value. If set ## to the empty string no topic tag will be created. @@ -199,15 +195,15 @@ func (m *MQTTConsumer) Init() error { m.TopicParsing[i].SplitFields = strings.Split(p.Fields, "/") m.TopicParsing[i].SplitTopic = strings.Split(p.Topic, "/") - if len(splitMeasurement) != len(m.TopicParsing[i].SplitTopic) { + if len(splitMeasurement) != len(m.TopicParsing[i].SplitTopic) && len(splitMeasurement) != 1 { return fmt.Errorf("config error topic parsing: measurement length does not equal topic length") } - if len(m.TopicParsing[i].SplitFields) != len(m.TopicParsing[i].SplitTopic) { + if len(m.TopicParsing[i].SplitFields) != len(m.TopicParsing[i].SplitTopic) && p.Fields != "" { return fmt.Errorf("config error topic parsing: fields length does not equal topic length") } - if len(m.TopicParsing[i].SplitTags) != len(m.TopicParsing[i].SplitTopic) { + if len(m.TopicParsing[i].SplitTags) != len(m.TopicParsing[i].SplitTopic) && p.Tags != "" { return fmt.Errorf("config error topic parsing: tags length does not equal topic length") } } @@ -408,7 +404,6 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { // parseFields gets multiple fields from the topic based on the user configuration (TopicParsing.Fields) func parseMetric(keys []string, values []string, types map[string]string, isTag bool, metric telegraf.Metric) error { - var metricFound bool for i, k := range keys { if k == "_" { continue @@ -416,19 +411,14 @@ func parseMetric(keys []string, values []string, types map[string]string, isTag if isTag { metric.AddTag(k, values[i]) - metricFound = true } else { newType, err := typeConvert(types, values[i], k) if err != nil { return err } metric.AddField(k, newType) - metricFound = true } } - if !metricFound { - return fmt.Errorf("no fields or tags found") - } return nil } diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go index 7ba5609973d96..2eb7d6ffabc26 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go @@ -350,6 +350,68 @@ func TestTopicTag(t *testing.T) { ), }, }, + { + name: "topic parsing configured without fields", + topic: "telegraf/123/test/hello", + topicTag: func() *string { + tag := "" + return &tag + }, + topicParsing: []TopicParsingConfig{ + { + Topic: "telegraf/+/test/hello", + Measurement: "_/_/measurement/_", + Tags: "testTag/_/_/_", + FieldTypes: map[string]string{ + "testNumber": "int", + }, + }, + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "test", + map[string]string{ + "testTag": "telegraf", + }, + map[string]interface{}{ + "time_idle": 42, + }, + time.Unix(0, 0), + ), + }, + }, + { + name: "topic parsing configured without measurement", + topic: "telegraf/123/test/hello", + topicTag: func() *string { + tag := "" + return &tag + }, + topicParsing: []TopicParsingConfig{ + { + Topic: "telegraf/+/test/hello", + Tags: "testTag/_/_/_", + Fields: "_/testNumber/_/testString", + FieldTypes: map[string]string{ + "testNumber": "int", + }, + }, + }, + expected: []telegraf.Metric{ + testutil.MustMetric( + "cpu", + map[string]string{ + "testTag": "telegraf", + }, + map[string]interface{}{ + "testNumber": 123, + "testString": "hello", + "time_idle": 42, + }, + time.Unix(0, 0), + ), + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {