Skip to content

Commit

Permalink
Add the support for kafka in cAdvisor's storage, including output of …
Browse files Browse the repository at this point in the history
…container id and labels

Addressing PR Feedback

Addressing PR Feedback
  • Loading branch information
arichardet committed Jan 28, 2016
1 parent bef8522 commit 80ba7e7
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 1 deletion.
2 changes: 1 addition & 1 deletion cadvisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var argIp = flag.String("listen_ip", "", "IP to listen on, defaults to all IPs")
var argPort = flag.Int("port", 8080, "port to listen")
var maxProcs = flag.Int("max_procs", 0, "max number of CPUs that can be used simultaneously. Less than 1 for default (number of cores).")

var argDbDriver = flag.String("storage_driver", "", "storage driver to use. Data is always cached shortly in memory, this controls where data is pushed besides the local cache. Empty means none. Options are: <empty> (default), bigquery, and influxdb")
var argDbDriver = flag.String("storage_driver", "", "storage driver to use. Data is always cached shortly in memory, this controls where data is pushed besides the local cache. Empty means none. Options are: <empty> (default), bigquery, influxdb, and kafka")
var versionFlag = flag.Bool("version", false, "print cAdvisor version and exit")

var httpAuthFile = flag.String("http_auth_file", "", "HTTP auth file for the web UI")
Expand Down
2 changes: 2 additions & 0 deletions container/docker/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,11 @@ func (self *dockerContainerHandler) Cleanup() {

func (self *dockerContainerHandler) ContainerReference() (info.ContainerReference, error) {
return info.ContainerReference{
Id: self.id,
Name: self.name,
Aliases: self.aliases,
Namespace: DockerNamespace,
Labels: self.labels,
}, nil
}

Expand Down
5 changes: 5 additions & 0 deletions info/v1/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ type ContainerSpec struct {

// Container reference contains enough information to uniquely identify a container
type ContainerReference struct {
// The container id
Id string `json:"id,omitempty"`

// The absolute name of the container. This is unique on the machine.
Name string `json:"name"`

Expand All @@ -80,6 +83,8 @@ type ContainerReference struct {
// Namespace under which the aliases of a container are unique.
// An example of a namespace is "docker" for Docker containers.
Namespace string `json:"namespace,omitempty"`

Labels map[string]string `json:"labels,omitempty"`
}

// Sorts by container name.
Expand Down
114 changes: 114 additions & 0 deletions storage/kafka/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"encoding/json"
"flag"
"os"
"strings"
"time"

info "github.com/google/cadvisor/info/v1"
"github.com/google/cadvisor/storage"
"github.com/google/cadvisor/utils/container"

kafka "github.com/Shopify/sarama"
"github.com/golang/glog"
)

func init() {
storage.RegisterStorageDriver("kafka", new)
}

var (
brokers = flag.String("storage_driver_kafka_broker_list", "localhost:9092", "kafka broker(s) csv")
topic = flag.String("storage_driver_kafka_topic", "stats", "kafka topic")
)

type kafkaStorage struct {
producer kafka.AsyncProducer
topic string
machineName string
}

type detailSpec struct {
Timestamp time.Time `json:"timestamp"`
MachineName string `json:"machine_name,omitempty"`
ContainerName string `json:"container_Name,omitempty"`
ContainerID string `json:"container_Id,omitempty"`
ContainerLabels map[string]string `json:"container_labels,omitempty"`
ContainerStats *info.ContainerStats `json:"container_stats,omitempty"`
}

func (driver *kafkaStorage) infoToDetailSpec(ref info.ContainerReference, stats *info.ContainerStats) *detailSpec {
timestamp := time.Now()
containerID := ref.Id
containerLabels := ref.Labels
containerName := container.GetPreferredName(ref)

detail := &detailSpec{
Timestamp: timestamp,
MachineName: driver.machineName,
ContainerName: containerName,
ContainerID: containerID,
ContainerLabels: containerLabels,
ContainerStats: stats,
}
return detail
}

func (driver *kafkaStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
detail := driver.infoToDetailSpec(ref, stats)
b, err := json.Marshal(detail)

driver.producer.Input() <- &kafka.ProducerMessage{
Topic: driver.topic,
Value: kafka.StringEncoder(b),
}

return err
}

func (self *kafkaStorage) Close() error {
return self.producer.Close()
}

func new() (storage.StorageDriver, error) {
machineName, err := os.Hostname()
if err != nil {
return nil, err
}
return newStorage(machineName)
}

func newStorage(machineName string) (storage.StorageDriver, error) {
config := kafka.NewConfig()
config.Producer.RequiredAcks = kafka.WaitForAll

brokerList := strings.Split(*brokers, ",")
glog.V(4).Infof("Kafka brokers:%q", brokers)

producer, err := kafka.NewAsyncProducer(brokerList, config)
if err != nil {
return nil, err
}
ret := &kafkaStorage{
producer: producer,
topic: *topic,
machineName: machineName,
}
return ret, nil
}
1 change: 1 addition & 0 deletions storagedriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
_ "github.com/google/cadvisor/storage/bigquery"
_ "github.com/google/cadvisor/storage/elasticsearch"
_ "github.com/google/cadvisor/storage/influxdb"
_ "github.com/google/cadvisor/storage/kafka"
_ "github.com/google/cadvisor/storage/redis"
_ "github.com/google/cadvisor/storage/statsd"
_ "github.com/google/cadvisor/storage/stdout"
Expand Down
31 changes: 31 additions & 0 deletions utils/container/container.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package container

import (
info "github.com/google/cadvisor/info/v1"
)

// Returns the alias a container is known by within a certain namespace,
// if available. Otherwise returns the absolute name of the container.
func GetPreferredName(ref info.ContainerReference) string {
var containerName string
if len(ref.Aliases) > 0 {
containerName = ref.Aliases[0]
} else {
containerName = ref.Name
}
return containerName
}

0 comments on commit 80ba7e7

Please sign in to comment.