Skip to content

Commit

Permalink
add optional TLS client auth to Kafka storage
Browse files Browse the repository at this point in the history
  • Loading branch information
devx committed May 6, 2016
1 parent 05fb225 commit 58b26c9
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 5 deletions.
4 changes: 2 additions & 2 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion docs/storage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ cAdvisor supports exporting stats to various storage driver plugins. To enable a
- [BigQuery](https://cloud.google.com/bigquery/). See the [documentation](../../storage/bigquery/README.md) for usage.
- [ElasticSearch](https://www.elastic.co/). See the [documentation](elasticsearch.md) for usage and examples.
- [InfluxDB](https://influxdb.com/). See the [documentation](influxdb.md) for usage and examples.
- [Kafka](http://kafka.apache.org/)
- [Kafka](http://kafka.apache.org/). See the [documentation](kafka.md) for usage.
- [Prometheus](http://prometheus.io). See the [documentation](prometheus.md) for usage and examples.
- [Redis](http://redis.io/)
- [StatsD](https://github.com/etsy/statsd)
Expand Down
43 changes: 43 additions & 0 deletions docs/storage/kafka.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Exporting cAdvisor Stats to Kafka

cAdvisor supports exporting stats to [Kafka](http://kafka.apache.org/). To use Kafka, you need to provide the additional flags to cAdvisor:

Set the storage driver as Kafka:

```
-storage_driver=kafka
```

If no broker are provided it will default to a broker listening at localhost:9092, with 'stats' as the default topic.


Specify a Kafka broker address:

```
-storage_driver_kafka_broker_list=localhost:9092
```

Specify a Kafka topic:

```
-storage_driver_kafka_topic=myTopic
```

As of version 9.0. Kafka supports TLS client auth:

```
# To enable TLS client auth support you need to provide the following:
# Location to Certificate Authority certificate
-storage_driver_kafka_ssl_ca=/path/to/ca.pem
# Location to client certificate certificate
-storage_driver_kafka_ssl_cert=/path/to/client_cert.pem
# Location to client certificate key
-storage_driver_kafka_ssl_key=/path/to/client_key.pem
# Verify SSL certificate chain (default: true)
-storage_driver_kafka_ssl_verify=false
```
46 changes: 44 additions & 2 deletions storage/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
package kafka

import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"flag"
"io/ioutil"
"os"
"strings"
"time"
Expand All @@ -34,8 +37,12 @@ func init() {
}

var (
brokers = flag.String("storage_driver_kafka_broker_list", "localhost:9092", "kafka broker(s) csv")
topic = flag.String("storage_driver_kafka_topic", "stats", "kafka topic")
brokers = flag.String("storage_driver_kafka_broker_list", "localhost:9092", "kafka broker(s) csv")
topic = flag.String("storage_driver_kafka_topic", "stats", "kafka topic")
certFile = flag.String("storage_driver_kafka_ssl_cert", "", "optional certificate file for TLS client authentication")
keyFile = flag.String("storage_driver_kafka_ssl_key", "", "optional key file for TLS client authentication")
caFile = flag.String("storage_driver_kafka_ssl_ca", "", "optional certificate authority file for TLS client authentication")
verifySSL = flag.Bool("storage_driver_kafka_ssl_verify", true, "verify ssl certificate chain")
)

type kafkaStorage struct {
Expand Down Expand Up @@ -94,8 +101,43 @@ func new() (storage.StorageDriver, error) {
return newStorage(machineName)
}

func generateTLSConfig() (*tls.Config, error) {
if *certFile != "" && *keyFile != "" && *caFile != "" {
cert, err := tls.LoadX509KeyPair(*certFile, *keyFile)
if err != nil {
return nil, err
}

caCert, err := ioutil.ReadFile(*caFile)
if err != nil {
return nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

return &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: *verifySSL,
}, nil
}

return nil, nil
}

func newStorage(machineName string) (storage.StorageDriver, error) {
config := kafka.NewConfig()

tlsConfig, err := generateTLSConfig()
if err != nil {
return nil, err
}

if tlsConfig != nil {
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
}

config.Producer.RequiredAcks = kafka.WaitForAll

brokerList := strings.Split(*brokers, ",")
Expand Down

0 comments on commit 58b26c9

Please sign in to comment.