Skip to content

Fix for influxdb container labels and network stats #2184

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
16 changes: 5 additions & 11 deletions manager/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,20 +623,14 @@ func (c *containerData) updateStats() error {
nvidiaStatsErr = c.nvidiaCollector.UpdateStats(stats)
}

ref, err := c.handler.ContainerReference()
if err != nil {
// Ignore errors if the container is dead.
if !c.handler.Exists() {
return nil
}
return err
}

c.lock.Lock()
cInfo := info.ContainerInfo{
ContainerReference: ref,
ContainerReference: c.info.ContainerReference,
Spec: c.info.Spec,
}
c.lock.Unlock()

err = c.memoryCache.AddStats(&cInfo, stats)
err := c.memoryCache.AddStats(&cInfo, stats)
if err != nil {
return err
}
Expand Down
88 changes: 67 additions & 21 deletions storage/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"net/url"
"os"
"strings"
"sync"
"time"

Expand All @@ -36,15 +37,16 @@ func init() {
var argDbRetentionPolicy = flag.String("storage_driver_influxdb_retention_policy", "", "retention policy")

type influxdbStorage struct {
client *influxdb.Client
machineName string
database string
retentionPolicy string
bufferDuration time.Duration
lastWrite time.Time
points []*influxdb.Point
lock sync.Mutex
readyToFlush func() bool
client *influxdb.Client
machineName string
database string
retentionPolicy string
bufferDuration time.Duration
whitelistedLabels map[string]struct{}
lastWrite time.Time
points []*influxdb.Point
lock sync.Mutex
readyToFlush func() bool
}

// Series names
Expand Down Expand Up @@ -105,6 +107,7 @@ const (
const (
tagMachineName string = "machine"
tagContainerName string = "container_name"
tagContainerId string = "container_id"
)

func (self *influxdbStorage) containerFilesystemStatsToPoints(
Expand Down Expand Up @@ -162,11 +165,15 @@ func (self *influxdbStorage) tagPoints(cInfo *info.ContainerInfo, stats *info.Co
commonTags := map[string]string{
tagMachineName: self.machineName,
tagContainerName: containerName,
tagContainerId: cInfo.ContainerReference.Id,
}

containerLabels := containerLabels(&cInfo.Spec, self.whitelistedLabels)

for i := 0; i < len(points); i++ {
// merge with existing tags if any
addTagsToPoint(points[i], commonTags)
addTagsToPoint(points[i], cInfo.Spec.Labels)
addTagsToPoint(points[i], containerLabels)
points[i].Time = stats.Timestamp
}
}
Expand Down Expand Up @@ -203,10 +210,23 @@ func (self *influxdbStorage) containerStatsToPoints(
points = append(points, makePoint(serMemoryWorkingSet, stats.Memory.WorkingSet))

// Network Stats
points = append(points, makePoint(serRxBytes, stats.Network.RxBytes))
points = append(points, makePoint(serRxErrors, stats.Network.RxErrors))
points = append(points, makePoint(serTxBytes, stats.Network.TxBytes))
points = append(points, makePoint(serTxErrors, stats.Network.TxErrors))
for _, networkInterface := range stats.Network.Interfaces {
pointRxBytes := makePoint(serRxBytes, networkInterface.RxBytes)
pointRxErrors := makePoint(serRxErrors, networkInterface.RxErrors)
pointTxBytes := makePoint(serTxBytes, networkInterface.TxBytes)
pointTxErrors := makePoint(serTxErrors, networkInterface.TxErrors)

tags := map[string]string{"interface": networkInterface.Name}
addTagsToPoint(pointRxBytes, tags)
addTagsToPoint(pointRxErrors, tags)
addTagsToPoint(pointTxBytes, tags)
addTagsToPoint(pointTxErrors, tags)

points = append(points, pointRxBytes)
points = append(points, pointRxErrors)
points = append(points, pointTxBytes)
points = append(points, pointTxErrors)
}

self.tagPoints(cInfo, stats, points)

Expand Down Expand Up @@ -299,14 +319,26 @@ func newStorage(
return nil, err
}

//Reuse cadvisor flag for whitelisted container labels
var argWhitelistedLabels = flag.Lookup("whitelisted_container_labels").Value.(flag.Getter).Get().(string)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather explicitly plumb this through, and do all of the flag parsing when cAdvisor starts if possible.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be done, but I'm not so familiar with the patterns and preferences here yet.

Are you suggesting to pass it down on the storage constructor or maybe add it to the StorageDriver.AddStats interface?

I was trying to avoid modifying anything on other storage drivers as I have no context on then but it makes sense that they also respect the labels when storing if possible.


whitelistMap := make(map[string]struct{}, len(argWhitelistedLabels))
if argWhitelistedLabels != "" {
whitelistLabels := strings.Split(argWhitelistedLabels, ",")
for _, k := range whitelistLabels {
whitelistMap[k] = struct{}{}
}
}

ret := &influxdbStorage{
client: client,
machineName: machineName,
database: database,
retentionPolicy: retentionPolicy,
bufferDuration: bufferDuration,
lastWrite: time.Now(),
points: make([]*influxdb.Point, 0),
client: client,
machineName: machineName,
database: database,
retentionPolicy: retentionPolicy,
whitelistedLabels: whitelistMap,
bufferDuration: bufferDuration,
lastWrite: time.Now(),
points: make([]*influxdb.Point, 0),
}
ret.readyToFlush = ret.defaultReadyToFlush
return ret, nil
Expand Down Expand Up @@ -375,3 +407,17 @@ func toSignedIfUnsigned(value interface{}) interface{} {
}
return value
}

func containerLabels(cSpec *info.ContainerSpec, whiteListMap map[string]struct{}) map[string]string {
containerLabels := make(map[string]string)
if len(whiteListMap) == 0 {
containerLabels = cSpec.Labels
} else {
for k, v := range cSpec.Labels {
if _, ok := whiteListMap[k]; ok {
containerLabels[k] = v
}
}
}
return containerLabels
}
33 changes: 29 additions & 4 deletions storage/influxdb/influxdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,12 @@ func TestContainerStatsToPoints(t *testing.T) {
require.Nil(t, err)
require.NotNil(t, storage)

ref, stats := createTestStats()
cInfo, stats := createTestStats()
require.Nil(t, err)
require.NotNil(t, stats)

// When
points := storage.containerStatsToPoints(*ref, stats)
points := storage.containerStatsToPoints(*cInfo, stats)

// Then
assert.NotEmpty(t, points)
Expand All @@ -240,6 +240,10 @@ func TestContainerStatsToPoints(t *testing.T) {
for _, cpu_usage := range stats.Cpu.Usage.PerCpu {
assertContainsPointWithValue(t, points, serCpuUsagePerCpu, cpu_usage)
}

for _, point := range points {
assertPointWithLabel(t, point, "testLabel")
}
}

func assertContainsPointWithValue(t *testing.T, points []*influxdb.Point, name string, value interface{}) bool {
Expand All @@ -253,6 +257,17 @@ func assertContainsPointWithValue(t *testing.T, points []*influxdb.Point, name s
return assert.True(t, found, "no point found with name='%v' and value=%v", name, value)
}

func assertPointWithLabel(t *testing.T, point *influxdb.Point, label string) bool {
found := false
for _, tag := range point.Tags {
if tag == label {
found = true
break
}
}
return assert.True(t, found, "no label found with name='%v'", label)
}

func createTestStorage() (*influxdbStorage, error) {
machineName := "testMachine"
table := "cadvisor_table"
Expand All @@ -274,12 +289,17 @@ func createTestStorage() (*influxdbStorage, error) {
return storage, err
}

func createTestStats() (*info.ContainerReference, *info.ContainerStats) {
func createTestStats() (*info.ContainerInfo, *info.ContainerStats) {
ref := &info.ContainerReference{
Id: "abcd",
Name: "testContainername",
Aliases: []string{"testContainerAlias1", "testContainerAlias2"},
}

spec := &info.ContainerSpec{
Labels: map[string]string{"testLabel": "testLabelValue"},
}

cpuUsage := info.CpuUsage{
Total: uint64(rand.Intn(10000)),
PerCpu: []uint64{uint64(rand.Intn(1000)), uint64(rand.Intn(1000)), uint64(rand.Intn(1000))},
Expand All @@ -294,5 +314,10 @@ func createTestStats() (*info.ContainerReference, *info.ContainerStats) {
LoadAverage: int32(rand.Intn(1000)),
},
}
return ref, stats

cinfo := &info.ContainerInfo{
ContainerReference: ref,
Spec: spec,
}
return cinfo, stats
}