From aeeb15a39168321e097ba707d291839c6fe57baf Mon Sep 17 00:00:00 2001 From: Sebastien Le Digabel Date: Thu, 29 Nov 2018 00:29:26 +0000 Subject: [PATCH] Add ability to tag metrics with topic in kafka_consumer (#5038) --- plugins/inputs/kafka_consumer/README.md | 2 + .../inputs/kafka_consumer/kafka_consumer.go | 10 ++++- .../kafka_consumer/kafka_consumer_test.go | 45 +++++++++++++++++++ 3 files changed, 56 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index 8922f50713e49..56fc59245ad0e 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -14,6 +14,8 @@ and use the old zookeeper connection method. brokers = ["localhost:9092"] ## topic(s) to consume topics = ["telegraf"] + ## Add topic as tag if topic_tag is not empty + # topic_tag = "" ## Optional Client id # client_id = "Telegraf" diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 31159def3cc88..0814d8e14466e 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -40,6 +40,8 @@ type Kafka struct { Offset string `toml:"offset"` SASLUsername string `toml:"sasl_username"` SASLPassword string `toml:"sasl_password"` + TopicTag string `toml:"topic_tag"` + tls.ClientConfig cluster Consumer @@ -60,6 +62,8 @@ var sampleConfig = ` brokers = ["localhost:9092"] ## topic(s) to consume topics = ["telegraf"] + ## Add topic as tag if topic_tag is not empty + # topic_tag = "" ## Optional Client id # client_id = "Telegraf" @@ -256,7 +260,11 @@ func (k *Kafka) onMessage(acc telegraf.TrackingAccumulator, msg *sarama.Consumer if err != nil { return err } - + if len(k.TopicTag) > 0 { + for _, metric := range metrics { + metric.AddTag(k.TopicTag, msg.Topic) + } + } id := acc.AddTrackingMetricGroup(metrics) k.messages[id] = msg diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index 5bb7740a518ec..a4d06efe6fba8 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -61,6 +61,25 @@ func newTestKafka() (*Kafka, *TestConsumer) { return &k, consumer } +func newTestKafkaWithTopicTag() (*Kafka, *TestConsumer) { + consumer := &TestConsumer{ + errors: make(chan error), + messages: make(chan *sarama.ConsumerMessage, 1000), + } + k := Kafka{ + cluster: consumer, + ConsumerGroup: "test", + Topics: []string{"telegraf"}, + Brokers: []string{"localhost:9092"}, + Offset: "oldest", + MaxUndeliveredMessages: defaultMaxUndeliveredMessages, + doNotCommitMsgs: true, + messages: make(map[telegraf.TrackingID]*sarama.ConsumerMessage), + TopicTag: "topic", + } + return &k, consumer +} + // Test that the parser parses kafka messages into points func TestRunParser(t *testing.T) { k, consumer := newTestKafka() @@ -75,6 +94,22 @@ func TestRunParser(t *testing.T) { assert.Equal(t, acc.NFields(), 1) } +// Test that the parser parses kafka messages into points +// and adds the topic tag +func TestRunParserWithTopic(t *testing.T) { + k, consumer := newTestKafkaWithTopicTag() + acc := testutil.Accumulator{} + ctx := context.Background() + + k.parser, _ = parsers.NewInfluxParser() + go k.receiver(ctx, &acc) + consumer.Inject(saramaMsgWithTopic(testMsg, "test_topic")) + acc.Wait(1) + + assert.Equal(t, acc.NFields(), 1) + assert.True(t, acc.HasTag("cpu_load_short", "topic")) +} + // Test that the parser ignores invalid messages func TestRunParserInvalidMsg(t *testing.T) { k, consumer := newTestKafka() @@ -173,3 +208,13 @@ func saramaMsg(val string) *sarama.ConsumerMessage { Partition: 0, } } + +func saramaMsgWithTopic(val string, topic string) *sarama.ConsumerMessage { + return &sarama.ConsumerMessage{ + Key: nil, + Value: []byte(val), + Offset: 0, + Partition: 0, + Topic: topic, + } +}