Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Kafka Plugin #35

Merged
merged 1 commit into from
Jul 2, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/all/all.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package all

import (
_ "github.com/influxdb/telegraf/plugins/kafka_consumer"
_ "github.com/influxdb/telegraf/plugins/memcached"
_ "github.com/influxdb/telegraf/plugins/mysql"
_ "github.com/influxdb/telegraf/plugins/postgresql"
Expand Down
153 changes: 153 additions & 0 deletions plugins/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package kafka_consumer

import (
"os"
"os/signal"
"time"

"github.com/influxdb/influxdb/tsdb"
"github.com/influxdb/telegraf/plugins"
"github.com/wvanbergen/kafka/consumergroup"
"gopkg.in/Shopify/sarama.v1"
)

type Kafka struct {
ConsumerGroupName string
Topic string
ZookeeperPeers []string
Consumer *consumergroup.ConsumerGroup
BatchSize int
}

var sampleConfig = `
# topic to consume
topic = "topic_with_metrics"

# the name of the consumer group
consumerGroupName = "telegraf_metrics_consumers"

# an array of Zookeeper connection strings
zookeeperPeers = ["localhost:2181"]

# Batch size of points sent to InfluxDB
batchSize = 1000`

func (k *Kafka) SampleConfig() string {
return sampleConfig
}

func (k *Kafka) Description() string {
return "read metrics from a Kafka topic"
}

type Metric struct {
Measurement string `json:"measurement"`
Values map[string]interface{} `json:"values"`
Tags map[string]string `json:"tags"`
Time time.Time `json:"time"`
}

func (k *Kafka) Gather(acc plugins.Accumulator) error {
var consumerErr error
metricQueue := make(chan []byte, 200)

if k.Consumer == nil {
k.Consumer, consumerErr = consumergroup.JoinConsumerGroup(
k.ConsumerGroupName,
[]string{k.Topic},
k.ZookeeperPeers,
nil,
)

if consumerErr != nil {
return consumerErr
}

c := make(chan os.Signal, 1)
halt := make(chan bool, 1)
signal.Notify(c, os.Interrupt)
go func() {
<-c
halt <- true
emitMetrics(k, acc, metricQueue)
k.Consumer.Close()
}()

go readFromKafka(k.Consumer.Messages(), metricQueue, k.BatchSize, k.Consumer.CommitUpto, halt)
}

return emitMetrics(k, acc, metricQueue)
}

func emitMetrics(k *Kafka, acc plugins.Accumulator, metricConsumer <-chan []byte) error {
timeout := time.After(1 * time.Second)

for {
select {
case batch := <-metricConsumer:
var points []tsdb.Point
var err error
if points, err = tsdb.ParsePoints(batch); err != nil {
return err
}

for _, point := range points {
acc.AddValuesWithTime(point.Name(), point.Fields(), point.Tags(), point.Time())
}
case <-timeout:
return nil
}
}
}

const millisecond = 1000000 * time.Nanosecond

type ack func(*sarama.ConsumerMessage) error

func readFromKafka(kafkaMsgs <-chan *sarama.ConsumerMessage, metricProducer chan<- []byte, maxBatchSize int, ackMsg ack, halt <-chan bool) {
batch := make([]byte, 0)
currentBatchSize := 0
timeout := time.After(500 * millisecond)
var msg *sarama.ConsumerMessage

for {
select {
case msg = <-kafkaMsgs:
if currentBatchSize != 0 {
batch = append(batch, '\n')
}

batch = append(batch, msg.Value...)
currentBatchSize++

if currentBatchSize == maxBatchSize {
metricProducer <- batch
currentBatchSize = 0
batch = make([]byte, 0)
ackMsg(msg)
}
case <-timeout:
if currentBatchSize != 0 {
metricProducer <- batch
currentBatchSize = 0
batch = make([]byte, 0)
ackMsg(msg)
}

timeout = time.After(500 * millisecond)
case <-halt:
if currentBatchSize != 0 {
metricProducer <- batch
ackMsg(msg)
}

return
}
}
}

func init() {
plugins.Add("kafka", func() plugins.Plugin {
return &Kafka{}
})
}
62 changes: 62 additions & 0 deletions plugins/kafka_consumer/kafka_consumer_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package kafka_consumer

import (
"fmt"
"os"
"strings"
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestReadsMetricsFromKafka(t *testing.T) {
var zkPeers, brokerPeers []string

if len(os.Getenv("ZOOKEEPER_PEERS")) == 0 {
zkPeers = []string{"localhost:2181"}
} else {
zkPeers = strings.Split(os.Getenv("ZOOKEEPER_PEERS"), ",")
}

if len(os.Getenv("KAFKA_PEERS")) == 0 {
brokerPeers = []string{"localhost:9092"}
} else {
brokerPeers = strings.Split(os.Getenv("KAFKA_PEERS"), ",")
}

k := &Kafka{
ConsumerGroupName: "telegraf_test_consumers",
Topic: fmt.Sprintf("telegraf_test_topic_%d", time.Now().Unix()),
ZookeeperPeers: zkPeers,
}

msg := "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257"
producer, err := sarama.NewSyncProducer(brokerPeers, nil)
require.NoError(t, err)
_, _, err = producer.SendMessage(&sarama.ProducerMessage{Topic: k.Topic, Value: sarama.StringEncoder(msg)})
producer.Close()

var acc testutil.Accumulator

// Sanity check
assert.Equal(t, 0, len(acc.Points), "there should not be any points")

err = k.Gather(&acc)
require.NoError(t, err)

assert.Equal(t, 1, len(acc.Points), "there should be a single point")

point := acc.Points[0]
assert.Equal(t, "cpu_load_short", point.Measurement)
assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Values)
assert.Equal(t, map[string]string{
"host": "server01",
"direction": "in",
"region": "us-west",
}, point.Tags)
assert.Equal(t, time.Unix(0, 1422568543702900257), point.Time)
}
95 changes: 95 additions & 0 deletions plugins/kafka_consumer/kafka_consumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package kafka_consumer

import (
"strings"
"testing"
"time"

"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/Shopify/sarama.v1"
)

const testMsg = "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257"

func TestReadFromKafkaBatchesMsgsOnBatchSize(t *testing.T) {
halt := make(chan bool, 1)
metricChan := make(chan []byte, 1)
kafkaChan := make(chan *sarama.ConsumerMessage, 10)
for i := 0; i < 10; i++ {
kafkaChan <- saramaMsg(testMsg)
}

expectedBatch := strings.Repeat(testMsg+"\n", 9) + testMsg
readFromKafka(kafkaChan, metricChan, 10, func(msg *sarama.ConsumerMessage) error {
batch := <-metricChan
assert.Equal(t, expectedBatch, string(batch))

halt <- true

return nil
}, halt)
}

func TestReadFromKafkaBatchesMsgsOnTimeout(t *testing.T) {
halt := make(chan bool, 1)
metricChan := make(chan []byte, 1)
kafkaChan := make(chan *sarama.ConsumerMessage, 10)
for i := 0; i < 3; i++ {
kafkaChan <- saramaMsg(testMsg)
}

expectedBatch := strings.Repeat(testMsg+"\n", 2) + testMsg
readFromKafka(kafkaChan, metricChan, 10, func(msg *sarama.ConsumerMessage) error {
batch := <-metricChan
assert.Equal(t, expectedBatch, string(batch))

halt <- true

return nil
}, halt)
}

func TestEmitMetricsSendMetricsToAcc(t *testing.T) {
k := &Kafka{}
var acc testutil.Accumulator
testChan := make(chan []byte, 1)
testChan <- []byte(testMsg)

err := emitMetrics(k, &acc, testChan)
require.NoError(t, err)

assert.Equal(t, 1, len(acc.Points), "there should be a single point")

point := acc.Points[0]
assert.Equal(t, "cpu_load_short", point.Measurement)
assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Values)
assert.Equal(t, map[string]string{
"host": "server01",
"direction": "in",
"region": "us-west",
}, point.Tags)

assert.Equal(t, time.Unix(0, 1422568543702900257), point.Time)
}

func TestEmitMetricsTimesOut(t *testing.T) {
k := &Kafka{}
var acc testutil.Accumulator
testChan := make(chan []byte)

err := emitMetrics(k, &acc, testChan)
require.NoError(t, err)

assert.Equal(t, 0, len(acc.Points), "there should not be a any points")
}

func saramaMsg(val string) *sarama.ConsumerMessage {
return &sarama.ConsumerMessage{
Key: nil,
Value: []byte(val),
Offset: 0,
Partition: 0,
}
}