From 79b1d1f899a0e217aaa07aed07824693438495cd Mon Sep 17 00:00:00 2001 From: Sebastien Le Digabel Date: Mon, 26 Nov 2018 15:39:03 +0000 Subject: [PATCH] Adding Kafka Origin Topic tag. The Kafka consumer input plugin has already the ability to read from multiple topics, however it does not retain the topic which the data was read from. That data is very useful to introduce some smart routing down the line. The change adds the topic as a tag. The tag name is configurable. Added some basic tests, and tested on multiple kafka clusters. --- plugins/inputs/kafka_consumer/README.md | 2 + .../inputs/kafka_consumer/kafka_consumer.go | 10 ++++- .../kafka_consumer/kafka_consumer_test.go | 45 +++++++++++++++++++ 3 files changed, 56 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index 8922f50713e49..56fc59245ad0e 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -14,6 +14,8 @@ and use the old zookeeper connection method. brokers = ["localhost:9092"] ## topic(s) to consume topics = ["telegraf"] + ## Add topic as tag if topic_tag is not empty + # topic_tag = "" ## Optional Client id # client_id = "Telegraf" diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 31159def3cc88..0814d8e14466e 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -40,6 +40,8 @@ type Kafka struct { Offset string `toml:"offset"` SASLUsername string `toml:"sasl_username"` SASLPassword string `toml:"sasl_password"` + TopicTag string `toml:"topic_tag"` + tls.ClientConfig cluster Consumer @@ -60,6 +62,8 @@ var sampleConfig = ` brokers = ["localhost:9092"] ## topic(s) to consume topics = ["telegraf"] + ## Add topic as tag if topic_tag is not empty + # topic_tag = "" ## Optional Client id # client_id = "Telegraf" @@ -256,7 +260,11 @@ func (k *Kafka) onMessage(acc telegraf.TrackingAccumulator, msg *sarama.Consumer if err != nil { return err } - + if len(k.TopicTag) > 0 { + for _, metric := range metrics { + metric.AddTag(k.TopicTag, msg.Topic) + } + } id := acc.AddTrackingMetricGroup(metrics) k.messages[id] = msg diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index 5bb7740a518ec..a4d06efe6fba8 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -61,6 +61,25 @@ func newTestKafka() (*Kafka, *TestConsumer) { return &k, consumer } +func newTestKafkaWithTopicTag() (*Kafka, *TestConsumer) { + consumer := &TestConsumer{ + errors: make(chan error), + messages: make(chan *sarama.ConsumerMessage, 1000), + } + k := Kafka{ + cluster: consumer, + ConsumerGroup: "test", + Topics: []string{"telegraf"}, + Brokers: []string{"localhost:9092"}, + Offset: "oldest", + MaxUndeliveredMessages: defaultMaxUndeliveredMessages, + doNotCommitMsgs: true, + messages: make(map[telegraf.TrackingID]*sarama.ConsumerMessage), + TopicTag: "topic", + } + return &k, consumer +} + // Test that the parser parses kafka messages into points func TestRunParser(t *testing.T) { k, consumer := newTestKafka() @@ -75,6 +94,22 @@ func TestRunParser(t *testing.T) { assert.Equal(t, acc.NFields(), 1) } +// Test that the parser parses kafka messages into points +// and adds the topic tag +func TestRunParserWithTopic(t *testing.T) { + k, consumer := newTestKafkaWithTopicTag() + acc := testutil.Accumulator{} + ctx := context.Background() + + k.parser, _ = parsers.NewInfluxParser() + go k.receiver(ctx, &acc) + consumer.Inject(saramaMsgWithTopic(testMsg, "test_topic")) + acc.Wait(1) + + assert.Equal(t, acc.NFields(), 1) + assert.True(t, acc.HasTag("cpu_load_short", "topic")) +} + // Test that the parser ignores invalid messages func TestRunParserInvalidMsg(t *testing.T) { k, consumer := newTestKafka() @@ -173,3 +208,13 @@ func saramaMsg(val string) *sarama.ConsumerMessage { Partition: 0, } } + +func saramaMsgWithTopic(val string, topic string) *sarama.ConsumerMessage { + return &sarama.ConsumerMessage{ + Key: nil, + Value: []byte(val), + Offset: 0, + Partition: 0, + Topic: topic, + } +}