Skip to content

Commit

Permalink
Add ability to tag metrics with topic in kafka_consumer (influxdata#5038
Browse files Browse the repository at this point in the history
)
  • Loading branch information
sledigabel authored and otherpirate committed Mar 15, 2019
1 parent dacea80 commit aeeb15a
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 1 deletion.
2 changes: 2 additions & 0 deletions plugins/inputs/kafka_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ and use the old zookeeper connection method.
brokers = ["localhost:9092"]
## topic(s) to consume
topics = ["telegraf"]
## Add topic as tag if topic_tag is not empty
# topic_tag = ""

## Optional Client id
# client_id = "Telegraf"
Expand Down
10 changes: 9 additions & 1 deletion plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type Kafka struct {
Offset string `toml:"offset"`
SASLUsername string `toml:"sasl_username"`
SASLPassword string `toml:"sasl_password"`
TopicTag string `toml:"topic_tag"`

tls.ClientConfig

cluster Consumer
Expand All @@ -60,6 +62,8 @@ var sampleConfig = `
brokers = ["localhost:9092"]
## topic(s) to consume
topics = ["telegraf"]
## Add topic as tag if topic_tag is not empty
# topic_tag = ""
## Optional Client id
# client_id = "Telegraf"
Expand Down Expand Up @@ -256,7 +260,11 @@ func (k *Kafka) onMessage(acc telegraf.TrackingAccumulator, msg *sarama.Consumer
if err != nil {
return err
}

if len(k.TopicTag) > 0 {
for _, metric := range metrics {
metric.AddTag(k.TopicTag, msg.Topic)
}
}
id := acc.AddTrackingMetricGroup(metrics)
k.messages[id] = msg

Expand Down
45 changes: 45 additions & 0 deletions plugins/inputs/kafka_consumer/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,25 @@ func newTestKafka() (*Kafka, *TestConsumer) {
return &k, consumer
}

func newTestKafkaWithTopicTag() (*Kafka, *TestConsumer) {
consumer := &TestConsumer{
errors: make(chan error),
messages: make(chan *sarama.ConsumerMessage, 1000),
}
k := Kafka{
cluster: consumer,
ConsumerGroup: "test",
Topics: []string{"telegraf"},
Brokers: []string{"localhost:9092"},
Offset: "oldest",
MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
doNotCommitMsgs: true,
messages: make(map[telegraf.TrackingID]*sarama.ConsumerMessage),
TopicTag: "topic",
}
return &k, consumer
}

// Test that the parser parses kafka messages into points
func TestRunParser(t *testing.T) {
k, consumer := newTestKafka()
Expand All @@ -75,6 +94,22 @@ func TestRunParser(t *testing.T) {
assert.Equal(t, acc.NFields(), 1)
}

// Test that the parser parses kafka messages into points
// and adds the topic tag
func TestRunParserWithTopic(t *testing.T) {
k, consumer := newTestKafkaWithTopicTag()
acc := testutil.Accumulator{}
ctx := context.Background()

k.parser, _ = parsers.NewInfluxParser()
go k.receiver(ctx, &acc)
consumer.Inject(saramaMsgWithTopic(testMsg, "test_topic"))
acc.Wait(1)

assert.Equal(t, acc.NFields(), 1)
assert.True(t, acc.HasTag("cpu_load_short", "topic"))
}

// Test that the parser ignores invalid messages
func TestRunParserInvalidMsg(t *testing.T) {
k, consumer := newTestKafka()
Expand Down Expand Up @@ -173,3 +208,13 @@ func saramaMsg(val string) *sarama.ConsumerMessage {
Partition: 0,
}
}

func saramaMsgWithTopic(val string, topic string) *sarama.ConsumerMessage {
return &sarama.ConsumerMessage{
Key: nil,
Value: []byte(val),
Offset: 0,
Partition: 0,
Topic: topic,
}
}

0 comments on commit aeeb15a

Please sign in to comment.