diff --git a/plugins/kafka/kafka.go b/plugins/kafka/kafka.go index faab709815d2e..a7bbbfabe126d 100644 --- a/plugins/kafka/kafka.go +++ b/plugins/kafka/kafka.go @@ -1,13 +1,14 @@ package kafka import ( - "encoding/json" - "github.com/wvanbergen/kafka/consumergroup" "os" "os/signal" "time" + "github.com/influxdb/influxdb/tsdb" "github.com/influxdb/telegraf/plugins" + "github.com/wvanbergen/kafka/consumergroup" + "gopkg.in/Shopify/sarama.v1" ) type Kafka struct { @@ -15,6 +16,7 @@ type Kafka struct { Topic string ZookeeperPeers []string Consumer *consumergroup.ConsumerGroup + BatchSize int } var sampleConfig = ` @@ -25,7 +27,10 @@ topic = "topic_with_metrics" consumerGroupName = "telegraf_metrics_consumers" # an array of Zookeeper connection strings -zookeeperPeers = ["localhost:2181"]` +zookeeperPeers = ["localhost:2181"] + +# Batch size of points sent to InfluxDB +batchSize = 10` func (k *Kafka) SampleConfig() string { return sampleConfig @@ -44,6 +49,7 @@ type Metric struct { func (k *Kafka) Gather(acc plugins.Accumulator) error { var consumerErr error + metricQueue := make(chan []byte, 200) if k.Consumer == nil { k.Consumer, consumerErr = consumergroup.JoinConsumerGroup( @@ -58,31 +64,83 @@ func (k *Kafka) Gather(acc plugins.Accumulator) error { } c := make(chan os.Signal, 1) + halt := make(chan bool, 1) signal.Notify(c, os.Interrupt) go func() { <-c + halt <- true + emitMetrics(k, acc, metricQueue) k.Consumer.Close() }() - } - processMessages(k, acc) + go readFromKafka(k.Consumer.Messages(), metricQueue, k.BatchSize, k.Consumer.CommitUpto, halt) + } - return nil + return emitMetrics(k, acc, metricQueue) } -func processMessages(k *Kafka, acc plugins.Accumulator) { +func emitMetrics(k *Kafka, acc plugins.Accumulator, metricConsumer <-chan []byte) error { timeout := time.After(1 * time.Second) for { select { - case msg := <-k.Consumer.Messages(): - metric := &Metric{} - json.Unmarshal(msg.Value, metric) + case batch := <-metricConsumer: + var points []tsdb.Point + var err error + if points, err = tsdb.ParsePoints(batch); err != nil { + return err + } + + for _, point := range points { + acc.AddValuesWithTime(point.Name(), point.Fields(), point.Tags(), point.Time()) + } + case <-timeout: + return nil + } + } +} + +const millisecond = 1000000 * time.Nanosecond + +type ack func(*sarama.ConsumerMessage) error - acc.AddValuesWithTime(metric.Measurement, metric.Values, metric.Tags, metric.Time) +func readFromKafka(kafkaMsgs <-chan *sarama.ConsumerMessage, metricProducer chan<- []byte, maxBatchSize int, ackMsg ack, halt <-chan bool) { + batch := make([]byte, 0) + currentBatchSize := 0 + timeout := time.After(500 * millisecond) + var msg *sarama.ConsumerMessage - k.Consumer.CommitUpto(msg) + for { + select { + case msg = <-kafkaMsgs: + if currentBatchSize != 0 { + batch = append(batch, '\n') + } + + batch = append(batch, msg.Value...) + currentBatchSize++ + + if currentBatchSize == maxBatchSize { + metricProducer <- batch + currentBatchSize = 0 + batch = make([]byte, 0) + ackMsg(msg) + } case <-timeout: + if currentBatchSize != 0 { + metricProducer <- batch + currentBatchSize = 0 + batch = make([]byte, 0) + ackMsg(msg) + } + + timeout = time.After(500 * millisecond) + case <-halt: + if currentBatchSize != 0 { + metricProducer <- batch + ackMsg(msg) + } + return } } diff --git a/plugins/kafka/kafka_test.go b/plugins/kafka/kafka_test.go index 4ae0e74076c80..5163e357e6e7a 100644 --- a/plugins/kafka/kafka_test.go +++ b/plugins/kafka/kafka_test.go @@ -1,72 +1,95 @@ package kafka import ( - "os" "strings" "testing" "time" - "github.com/Shopify/sarama" "github.com/influxdb/telegraf/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gopkg.in/Shopify/sarama.v1" ) -func TestReadsMetricsFromKafka(t *testing.T) { - var zkPeers, brokerPeers []string +const testMsg = "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257" - if len(os.Getenv("ZOOKEEPER_PEERS")) == 0 { - zkPeers = []string{"localhost:2181"} - } else { - zkPeers = strings.Split(os.Getenv("ZOOKEEPER_PEERS"), ",") +func TestReadFromKafkaBatchesMsgsOnBatchSize(t *testing.T) { + halt := make(chan bool, 1) + metricChan := make(chan []byte, 1) + kafkaChan := make(chan *sarama.ConsumerMessage, 10) + for i := 0; i < 10; i++ { + kafkaChan <- saramaMsg(testMsg) } - if len(os.Getenv("KAFKA_PEERS")) == 0 { - brokerPeers = []string{"localhost:9092"} - } else { - brokerPeers = strings.Split(os.Getenv("KAFKA_PEERS"), ",") - } + expectedBatch := strings.Repeat(testMsg+"\n", 9) + testMsg + readFromKafka(kafkaChan, metricChan, 10, func(msg *sarama.ConsumerMessage) error { + batch := <-metricChan + assert.Equal(t, expectedBatch, string(batch)) + + halt <- true - k := &Kafka{ - ConsumerGroupName: "telegraf_test_consumers", - Topic: "telegraf_test_topic", - ZookeeperPeers: zkPeers, + return nil + }, halt) +} + +func TestReadFromKafkaBatchesMsgsOnTimeout(t *testing.T) { + halt := make(chan bool, 1) + metricChan := make(chan []byte, 1) + kafkaChan := make(chan *sarama.ConsumerMessage, 10) + for i := 0; i < 3; i++ { + kafkaChan <- saramaMsg(testMsg) } - msg := `{ - "measurement": "cpu_load", - "tags": { - "host": "server01", - "core": "0" - }, - "time": "2009-11-10T23:00:00Z", - "values": { - "value": 0.45 - } -}` - producer, err := sarama.NewSyncProducer(brokerPeers, nil) - require.NoError(t, err) - _, _, err = producer.SendMessage(&sarama.ProducerMessage{Topic: k.Topic, Value: sarama.StringEncoder(msg)}) - producer.Close() + expectedBatch := strings.Repeat(testMsg+"\n", 2) + testMsg + readFromKafka(kafkaChan, metricChan, 10, func(msg *sarama.ConsumerMessage) error { + batch := <-metricChan + assert.Equal(t, expectedBatch, string(batch)) - var acc testutil.Accumulator + halt <- true + + return nil + }, halt) +} - // Sanity check - assert.Equal(t, 0, len(acc.Points), "there should not be any points") +func TestEmitMetricsSendMetricsToAcc(t *testing.T) { + k := &Kafka{} + var acc testutil.Accumulator + testChan := make(chan []byte, 1) + testChan <- []byte(testMsg) - err = k.Gather(&acc) + err := emitMetrics(k, &acc, testChan) require.NoError(t, err) assert.Equal(t, 1, len(acc.Points), "there should be a single point") point := acc.Points[0] - assert.Equal(t, "cpu_load", point.Measurement) - assert.Equal(t, map[string]interface{}{"value": 0.45}, point.Values) + assert.Equal(t, "cpu_load_short", point.Measurement) + assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Values) assert.Equal(t, map[string]string{ - "host": "server01", - "core": "0", + "host": "server01", + "direction": "in", + "region": "us-west", }, point.Tags) - tt, err := time.Parse(time.RFC3339, "2009-11-10T23:00:00Z") + + assert.Equal(t, time.Unix(0, 1422568543702900257), point.Time) +} + +func TestEmitMetricsTimesOut(t *testing.T) { + k := &Kafka{} + var acc testutil.Accumulator + testChan := make(chan []byte) + + err := emitMetrics(k, &acc, testChan) require.NoError(t, err) - assert.Equal(t, tt, point.Time) + + assert.Equal(t, 0, len(acc.Points), "there should not be a any points") +} + +func saramaMsg(val string) *sarama.ConsumerMessage { + return &sarama.ConsumerMessage{ + Key: nil, + Value: []byte(val), + Offset: 0, + Partition: 0, + } }