Skip to content

Commit

Permalink
Adding Kafka Origin Topic tag.
Browse files Browse the repository at this point in the history
The Kafka consumer input plugin has already the ability to
read from multiple topics, however it does not retain the
topic which the data was read from. That data is very useful
to introduce some smart routing down the line.

The change adds the topic as a tag. The tag name is configurable.

Added some basic tests, and tested on multiple kafka clusters.
  • Loading branch information
Sebastien Le Digabel committed Nov 26, 2018
1 parent 581772a commit 0d6f335
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 1 deletion.
3 changes: 3 additions & 0 deletions plugins/inputs/kafka_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and use the old zookeeper connection method.
brokers = ["localhost:9092"]
## topic(s) to consume
topics = ["telegraf"]
## Add topic as tag
# add_topic_tag = false
# topic_tag_name = "topic"

## Optional Client id
# client_id = "Telegraf"
Expand Down
12 changes: 11 additions & 1 deletion plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type Kafka struct {
Offset string `toml:"offset"`
SASLUsername string `toml:"sasl_username"`
SASLPassword string `toml:"sasl_password"`
AddTopicTag bool `toml:"add_topic_tag"`
TopicTagName string `toml:"topic_tag_name"`

tls.ClientConfig

cluster Consumer
Expand All @@ -60,6 +63,9 @@ var sampleConfig = `
brokers = ["localhost:9092"]
## topic(s) to consume
topics = ["telegraf"]
## Add topic as tag
# add_topic_tag = false
# topic_tag_name = "topic"
## Optional Client id
# client_id = "Telegraf"
Expand Down Expand Up @@ -256,7 +262,11 @@ func (k *Kafka) onMessage(acc telegraf.TrackingAccumulator, msg *sarama.Consumer
if err != nil {
return err
}

if k.AddTopicTag {
for _, metric := range metrics {
metric.AddTag(k.TopicTagName, msg.Topic)
}
}
id := acc.AddTrackingMetricGroup(metrics)
k.messages[id] = msg

Expand Down
46 changes: 46 additions & 0 deletions plugins/inputs/kafka_consumer/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,26 @@ func newTestKafka() (*Kafka, *TestConsumer) {
return &k, consumer
}

func newTestKafkaWithTopicTag() (*Kafka, *TestConsumer) {
consumer := &TestConsumer{
errors: make(chan error),
messages: make(chan *sarama.ConsumerMessage, 1000),
}
k := Kafka{
cluster: consumer,
ConsumerGroup: "test",
Topics: []string{"telegraf"},
Brokers: []string{"localhost:9092"},
Offset: "oldest",
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
doNotCommitMsgs: true,
messages: make(map[telegraf.TrackingID]*sarama.ConsumerMessage),
AddTopicTag: true,
TopicTagName: "topic",
}
return &k, consumer
}

// Test that the parser parses kafka messages into points
func TestRunParser(t *testing.T) {
k, consumer := newTestKafka()
Expand All @@ -75,6 +95,22 @@ func TestRunParser(t *testing.T) {
assert.Equal(t, acc.NFields(), 1)
}

// Test that the parser parses kafka messages into points
// and adds the topic tag
func TestRunParserWithTopic(t *testing.T) {
k, consumer := newTestKafkaWithTopicTag()
acc := testutil.Accumulator{}
ctx := context.Background()

k.parser, _ = parsers.NewInfluxParser()
go k.receiver(ctx, &acc)
consumer.Inject(saramaMsgWithTopic(testMsg, "test_topic"))
acc.Wait(1)

assert.Equal(t, acc.NFields(), 1)
assert.True(t, acc.HasTag("cpu_load_short", "topic"))
}

// Test that the parser ignores invalid messages
func TestRunParserInvalidMsg(t *testing.T) {
k, consumer := newTestKafka()
Expand Down Expand Up @@ -173,3 +209,13 @@ func saramaMsg(val string) *sarama.ConsumerMessage {
Partition: 0,
}
}

func saramaMsgWithTopic(val string, topic string) *sarama.ConsumerMessage {
return &sarama.ConsumerMessage{
Key: nil,
Value: []byte(val),
Offset: 0,
Partition: 0,
Topic: topic,
}
}

0 comments on commit 0d6f335

Please sign in to comment.