Skip to content

Update statsd storage - issue #724 #798

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 28, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 16 additions & 25 deletions storage/statsd/client.go → storage/statsd/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package statsd
package client

import (
"fmt"
Expand All @@ -22,8 +22,9 @@ import (
)

type Client struct {
HostPort string
conn net.Conn
HostPort string
Namespace string
conn net.Conn
}

func (self *Client) Open() error {
Expand All @@ -36,38 +37,28 @@ func (self *Client) Open() error {
return nil
}

func (self *Client) Close() {
func (self *Client) Close() error {
self.conn.Close()
}

func (self *Client) UpdateGauge(name, value string) error {
stats := make(map[string]string)
val := fmt.Sprintf("%s|g", value)
stats[name] = val
if err := self.send(stats); err != nil {
return err
}
self.conn = nil
return nil
}

// Simple send to statsd daemon without sampling.
func (self *Client) send(data map[string]string) error {
for k, v := range data {
formatted := fmt.Sprintf("%s:%s", k, v)
_, err := fmt.Fprintf(self.conn, formatted)
if err != nil {
glog.V(3).Infof("failed to send data %q: %v", formatted, err)
// return on first error.
return err
}
func (self *Client) Send(namespace, containerName, key string, value uint64) error {
// only send counter value
formatted := fmt.Sprintf("%s.%s.%s:%d|g", namespace, containerName, key, value)
_, err := fmt.Fprintf(self.conn, formatted)
if err != nil {
glog.V(3).Infof("failed to send data %q: %v", formatted, err)
return err
}
return nil
}

func New(hostPort string) (*Client, error) {
client := Client{HostPort: hostPort}
if err := client.Open(); err != nil {
Client := Client{HostPort: hostPort}
if err := Client.Open(); err != nil {
return nil, err
}
return &client, nil
return &Client, nil
}
127 changes: 127 additions & 0 deletions storage/statsd/statsd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package statsd

import (
info "github.com/google/cadvisor/info/v1"
client "github.com/google/cadvisor/storage/statsd/client"
)

type statsdStorage struct {
client *client.Client
Namespace string
}

const (
colCpuCumulativeUsage string = "cpu_cumulative_usage"
// Memory Usage
colMemoryUsage string = "memory_usage"
// Working set size
colMemoryWorkingSet string = "memory_working_set"
// Cumulative count of bytes received.
colRxBytes string = "rx_bytes"
// Cumulative count of receive errors encountered.
colRxErrors string = "rx_errors"
// Cumulative count of bytes transmitted.
colTxBytes string = "tx_bytes"
// Cumulative count of transmit errors encountered.
colTxErrors string = "tx_errors"
// Filesystem summary
colFsSummary = "fs_summary"
// Filesystem limit.
colFsLimit = "fs_limit"
// Filesystem usage.
colFsUsage = "fs_usage"
)

func (self *statsdStorage) containerStatsToValues(
stats *info.ContainerStats,
) (series map[string]uint64) {
series = make(map[string]uint64)

// Cumulative Cpu Usage
series[colCpuCumulativeUsage] = stats.Cpu.Usage.Total

// Memory Usage
series[colMemoryUsage] = stats.Memory.Usage

// Working set size
series[colMemoryWorkingSet] = stats.Memory.WorkingSet

// Network stats.
series[colRxBytes] = stats.Network.RxBytes
series[colRxErrors] = stats.Network.RxErrors
series[colTxBytes] = stats.Network.TxBytes
series[colTxErrors] = stats.Network.TxErrors

return series
}

func (self *statsdStorage) containerFsStatsToValues(
series *map[string]uint64,
stats *info.ContainerStats,
) {
for _, fsStat := range stats.Filesystem {
// Summary stats.
(*series)[colFsSummary+"."+colFsLimit] += fsStat.Limit
(*series)[colFsSummary+"."+colFsUsage] += fsStat.Usage

// Per device stats.
(*series)[fsStat.Device+"."+colFsLimit] = fsStat.Limit
(*series)[fsStat.Device+"."+colFsUsage] = fsStat.Usage
}
}

//Push the data into redis
func (self *statsdStorage) AddStats(ref info.ContainerReference, stats *info.ContainerStats) error {
if stats == nil {
return nil
}

var containerName string
if len(ref.Aliases) > 0 {
containerName = ref.Aliases[0]
} else {
containerName = ref.Name
}

series := self.containerStatsToValues(stats)
self.containerFsStatsToValues(&series, stats)
for key, value := range series {
err := self.client.Send(self.Namespace, containerName, key, value)
if err != nil {
return err
}
}
return nil
}

func (self *statsdStorage) Close() error {
self.client.Close()
self.client = nil
return nil
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move open, close, and Send to a separate client file? We are trying to change the metrics format and merge some of the container stats to values methods. It will make the conversion a bit easier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem, i will update it soon, maybe today.

func New(namespace, hostPort string) (*statsdStorage, error) {
statsdClient, err := client.New(hostPort)
if err != nil {
return nil, err
}
statsdStorage := &statsdStorage{
client: statsdClient,
Namespace: namespace,
}
return statsdStorage, nil
}
6 changes: 6 additions & 0 deletions storagedriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/google/cadvisor/storage/bigquery"
"github.com/google/cadvisor/storage/influxdb"
"github.com/google/cadvisor/storage/redis"
"github.com/google/cadvisor/storage/statsd"
)

var argDbUsername = flag.String("storage_driver_user", "root", "database username")
Expand Down Expand Up @@ -88,6 +89,11 @@ func NewMemoryStorage(backendStorageName string) (*memory.InMemoryCache, error)
*argDbHost,
*argDbBufferDuration,
)
case "statsd":
backendStorage, err = statsd.New(
*argDbName,
*argDbHost,
)
default:
err = fmt.Errorf("unknown backend storage driver: %v", *argDbDriver)
}
Expand Down