diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index bfd9c04b80..b58b50274f 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -22,8 +22,8 @@ }, { "ImportPath": "github.com/Shopify/sarama", - "Comment": "v1.7.0", - "Rev": "87ec8d76e89cc002ff5673ee7598ae1db2f32424" + "Comment": "v1.8.0", + "Rev": "4ba9bba6adb6697bcec3841e1ecdfecf5227c3b9" }, { "ImportPath": "github.com/Sirupsen/logrus", diff --git a/docs/storage/README.md b/docs/storage/README.md index f3c05737b9..fa2c8f519c 100644 --- a/docs/storage/README.md +++ b/docs/storage/README.md @@ -7,7 +7,7 @@ cAdvisor supports exporting stats to various storage driver plugins. To enable a - [BigQuery](https://cloud.google.com/bigquery/). See the [documentation](../../storage/bigquery/README.md) for usage. - [ElasticSearch](https://www.elastic.co/). See the [documentation](elasticsearch.md) for usage and examples. - [InfluxDB](https://influxdb.com/). See the [documentation](influxdb.md) for usage and examples. -- [Kafka](http://kafka.apache.org/) +- [Kafka](http://kafka.apache.org/). See the [documentation](kafka.md) for usage. - [Prometheus](http://prometheus.io). See the [documentation](prometheus.md) for usage and examples. - [Redis](http://redis.io/) - [StatsD](https://github.com/etsy/statsd) diff --git a/docs/storage/kafka.md b/docs/storage/kafka.md new file mode 100644 index 0000000000..7dee651292 --- /dev/null +++ b/docs/storage/kafka.md @@ -0,0 +1,43 @@ +# Exporting cAdvisor Stats to Kafka + +cAdvisor supports exporting stats to [Kafka](http://kafka.apache.org/). To use Kafka, you need to provide the additional flags to cAdvisor: + +Set the storage driver as Kafka: + +``` + -storage_driver=kafka +``` + +If no broker are provided it will default to a broker listening at localhost:9092, with 'stats' as the default topic. + + +Specify a Kafka broker address: + +``` +-storage_driver_kafka_broker_list=localhost:9092 + +``` + +Specify a Kafka topic: + +``` +-storage_driver_kafka_topic=myTopic +``` + +As of version 9.0. Kafka supports TLS client auth: + +``` + # To enable TLS client auth support you need to provide the following: + + # Location to Certificate Authority certificate + -storage_driver_kafka_ssl_ca=/path/to/ca.pem + + # Location to client certificate certificate + -storage_driver_kafka_ssl_cert=/path/to/client_cert.pem + + # Location to client certificate key + -storage_driver_kafka_ssl_key=/path/to/client_key.pem + + # Verify SSL certificate chain (default: true) + -storage_driver_kafka_ssl_verify=false +``` diff --git a/storage/kafka/kafka.go b/storage/kafka/kafka.go index e43a374c81..16de668aec 100644 --- a/storage/kafka/kafka.go +++ b/storage/kafka/kafka.go @@ -15,8 +15,11 @@ package kafka import ( + "crypto/tls" + "crypto/x509" "encoding/json" "flag" + "io/ioutil" "os" "strings" "time" @@ -34,8 +37,12 @@ func init() { } var ( - brokers = flag.String("storage_driver_kafka_broker_list", "localhost:9092", "kafka broker(s) csv") - topic = flag.String("storage_driver_kafka_topic", "stats", "kafka topic") + brokers = flag.String("storage_driver_kafka_broker_list", "localhost:9092", "kafka broker(s) csv") + topic = flag.String("storage_driver_kafka_topic", "stats", "kafka topic") + certFile = flag.String("storage_driver_kafka_ssl_cert", "", "optional certificate file for TLS client authentication") + keyFile = flag.String("storage_driver_kafka_ssl_key", "", "optional key file for TLS client authentication") + caFile = flag.String("storage_driver_kafka_ssl_ca", "", "optional certificate authority file for TLS client authentication") + verifySSL = flag.Bool("storage_driver_kafka_ssl_verify", true, "verify ssl certificate chain") ) type kafkaStorage struct { @@ -94,8 +101,43 @@ func new() (storage.StorageDriver, error) { return newStorage(machineName) } +func generateTLSConfig() (*tls.Config, error) { + if *certFile != "" && *keyFile != "" && *caFile != "" { + cert, err := tls.LoadX509KeyPair(*certFile, *keyFile) + if err != nil { + return nil, err + } + + caCert, err := ioutil.ReadFile(*caFile) + if err != nil { + return nil, err + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + return &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + InsecureSkipVerify: *verifySSL, + }, nil + } + + return nil, nil +} + func newStorage(machineName string) (storage.StorageDriver, error) { config := kafka.NewConfig() + + tlsConfig, err := generateTLSConfig() + if err != nil { + return nil, err + } + + if tlsConfig != nil { + config.Net.TLS.Enable = true + config.Net.TLS.Config = tlsConfig + } + config.Producer.RequiredAcks = kafka.WaitForAll brokerList := strings.Split(*brokers, ",")