Skip to content

Commit

Permalink
Create a new dedicated call to get count of openfiles for a set of PI…
Browse files Browse the repository at this point in the history
…Ds (#11435)
  • Loading branch information
vboulineau authored Apr 5, 2022
1 parent 416a923 commit d097a57
Show file tree
Hide file tree
Showing 23 changed files with 185 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (cext *containerdCustomMetricsExtension) PreProcess(sender generic.SenderFu

func (cext *containerdCustomMetricsExtension) Process(tags []string, container *workloadmeta.Container, collector provider.Collector, cacheValidity time.Duration) {
// Duplicate call with generic.Processor, but cache should allow for a fast response.
containerStats, err := collector.GetContainerStats(container.ID, cacheValidity)
containerStats, err := collector.GetContainerStats(container.Namespace, container.ID, cacheValidity)
if err != nil {
log.Debugf("Gathering container metrics for container: %v failed, metrics may be missing, err: %v", container, err)
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (dn *dockerCustomMetricsExtension) PreProcess(sender generic.SenderFunc, ag
func (dn *dockerCustomMetricsExtension) Process(tags []string, container *workloadmeta.Container, collector provider.Collector, cacheValidity time.Duration) {
// Duplicate call with generic.Processor, but cache should allow for a fast response.
// We only need it for PIDs
containerStats, err := collector.GetContainerStats(container.ID, cacheValidity)
containerStats, err := collector.GetContainerStats(container.Namespace, container.ID, cacheValidity)
if err != nil {
log.Debugf("Gathering container metrics for container: %v failed, metrics may be missing, err: %v", container, err)
return
Expand Down
4 changes: 2 additions & 2 deletions pkg/collector/corechecks/containers/docker/check_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ func (dn *dockerNetworkExtension) PreProcess(sender generic.SenderFunc, aggSende
func (dn *dockerNetworkExtension) Process(tags []string, container *workloadmeta.Container, collector provider.Collector, cacheValidity time.Duration) {
// Duplicate call with generic.Processor, but cache should allow for a fast response.
// We only need it for PIDs
containerStats, err := collector.GetContainerStats(container.ID, cacheValidity)
containerStats, err := collector.GetContainerStats(container.Namespace, container.ID, cacheValidity)
if err != nil {
log.Debugf("Gathering container metrics for container: %v failed, metrics may be missing, err: %v", container, err)
return
}

containerNetworkStats, err := collector.GetContainerNetworkStats(container.ID, cacheValidity)
containerNetworkStats, err := collector.GetContainerNetworkStats(container.Namespace, container.ID, cacheValidity)
if err != nil {
log.Debugf("Gathering network metrics for container: %v failed, metrics may be missing, err: %v", container, err)
return
Expand Down
10 changes: 8 additions & 2 deletions pkg/collector/corechecks/containers/generic/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (p *Processor) Run(sender aggregator.Sender, cacheValidity time.Duration) e
continue
}

containerStats, err := collector.GetContainerStats(container.ID, cacheValidity)
containerStats, err := collector.GetContainerStats(container.Namespace, container.ID, cacheValidity)
if err != nil {
log.Debugf("Container stats for: %v not available through collector %q, err: %v", container, collector.ID(), err)
continue
Expand All @@ -113,6 +113,13 @@ func (p *Processor) Run(sender aggregator.Sender, cacheValidity time.Duration) e
continue
}

openFiles, err := collector.GetContainerOpenFilesCount(container.Namespace, container.ID, cacheValidity)
if err == nil {
p.sendMetric(sender.Gauge, "container.pid.open_files", pointer.UIntPtrToFloatPtr(openFiles), tags)
} else {
log.Debugf("OpenFiles count for: %v not available through collector %q, err: %v", container, collector.ID(), err)
}

// TODO: Implement container stats. We currently don't have enough information from Metadata service to do it.

// Extensions: Process hook
Expand Down Expand Up @@ -181,7 +188,6 @@ func (p *Processor) processContainer(sender aggregator.Sender, tags []string, co
if containerStats.PID != nil {
p.sendMetric(sender.Gauge, "container.pid.thread_count", containerStats.PID.ThreadCount, tags)
p.sendMetric(sender.Gauge, "container.pid.thread_limit", containerStats.PID.ThreadLimit, tags)
p.sendMetric(sender.Gauge, "container.pid.open_files", containerStats.PID.OpenFiles, tags)
}

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (pn *ProcessorNetwork) PreProcess(sender SenderFunc, aggSender aggregator.S

// Process stores each container in relevant network group
func (pn *ProcessorNetwork) Process(tags []string, container *workloadmeta.Container, collector provider.Collector, cacheValidity time.Duration) {
containerNetworkStats, err := collector.GetContainerNetworkStats(container.ID, cacheValidity)
containerNetworkStats, err := collector.GetContainerNetworkStats(container.Namespace, container.ID, cacheValidity)
if err != nil {
log.Debugf("Gathering network metrics for container: %v failed, metrics may be missing, err: %v", container, err)
return
Expand Down
4 changes: 2 additions & 2 deletions pkg/process/util/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (p *containerProvider) GetContainers(cacheValidity time.Duration, previousC
continue
}

containerStats, err := collector.GetContainerStats(container.ID, cacheValidity)
containerStats, err := collector.GetContainerStats(container.Namespace, container.ID, cacheValidity)
if err != nil || containerStats == nil {
log.Debugf("Container stats for: %+v not available through collector %q, err: %v", container, collector.ID(), err)
// If main container stats are missing, we skip the container
Expand All @@ -166,7 +166,7 @@ func (p *containerProvider) GetContainers(cacheValidity time.Duration, previousC
}
}

containerNetworkStats, err := collector.GetContainerNetworkStats(container.ID, cacheValidity)
containerNetworkStats, err := collector.GetContainerNetworkStats(container.Namespace, container.ID, cacheValidity)
if err != nil {
log.Debugf("Container network stats for: %+v not available through collector %q, err: %v", container, collector.ID(), err)
}
Expand Down
34 changes: 12 additions & 22 deletions pkg/util/containers/v2/metrics/containerd/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,9 @@ func (c *containerdCollector) ID() string {
}

// GetContainerStats returns stats by container ID.
func (c *containerdCollector) GetContainerStats(containerID string, cacheValidity time.Duration) (*provider.ContainerStats, error) {
namespace, err := c.containerNamespace(containerID)
if err != nil {
return nil, err
}
c.client.SetCurrentNamespace(namespace)

func (c *containerdCollector) GetContainerStats(containerNS, containerID string, cacheValidity time.Duration) (*provider.ContainerStats, error) {
// TODO: Relying on `SetCurrentNamespace` is not correct as the collector is supposed to allow for concurrent calls
c.client.SetCurrentNamespace(containerNS)
metrics, err := c.getContainerdMetrics(containerID)
if err != nil {
return nil, err
Expand Down Expand Up @@ -124,13 +120,16 @@ func (c *containerdCollector) GetContainerStats(containerID string, cacheValidit
return containerStats, nil
}

// GetContainerOpenFilesCount returns open files count by container ID.
func (c *containerdCollector) GetContainerOpenFilesCount(containerNS, containerID string, cacheValidity time.Duration) (*uint64, error) {
// Not available
return nil, nil
}

// GetContainerNetworkStats returns network stats by container ID.
func (c *containerdCollector) GetContainerNetworkStats(containerID string, cacheValidity time.Duration) (*provider.ContainerNetworkStats, error) {
namespace, err := c.containerNamespace(containerID)
if err != nil {
return nil, err
}
c.client.SetCurrentNamespace(namespace)
func (c *containerdCollector) GetContainerNetworkStats(containerNS, containerID string, cacheValidity time.Duration) (*provider.ContainerNetworkStats, error) {
// TODO: Relying on `SetCurrentNamespace` is not correct as the collector is supposed to allow for concurrent calls
c.client.SetCurrentNamespace(containerNS)

metrics, err := c.getContainerdMetrics(containerID)
if err != nil {
Expand Down Expand Up @@ -221,12 +220,3 @@ func (c *containerdCollector) refreshPIDCache(currentTime time.Time, cacheValidi
c.pidCache.Store(currentTime, pidCacheFullRefreshKey, struct{}{}, nil)
return nil
}

func (c *containerdCollector) containerNamespace(containerID string) (string, error) {
container, err := c.workloadmetaStore.GetContainer(containerID)
if err != nil {
return "", err
}

return container.Namespace, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func TestGetContainerStats_Containerd(t *testing.T) {
}

// ID and cache TTL not relevant for these tests
result, err := collector.GetContainerStats(containerID, 10*time.Second)
result, err := collector.GetContainerStats("", containerID, 10*time.Second)
assert.NoError(t, err)

result.CPU.Limit = nil // Don't check this field. It's complex to calculate. Needs separate tests.
Expand Down Expand Up @@ -363,7 +363,7 @@ func TestGetContainerNetworkStats_Containerd(t *testing.T) {
}

// ID and cache TTL not relevant for these tests
result, err := collector.GetContainerNetworkStats(containerID, 10*time.Second)
result, err := collector.GetContainerNetworkStats("", containerID, 10*time.Second)
result.Timestamp = time.Time{} // We have no control over it, so set it to avoid checking it.

assert.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TestGetContainerStats_Containerd(t *testing.T) {
}

// ID and cache TTL not relevant for these tests
result, err := collector.GetContainerStats(containerID, 10*time.Second)
result, err := collector.GetContainerStats("test-namespace", containerID, 10*time.Second)
assert.NoError(t, err)

result.CPU.Limit = nil // Don't check this field. It's complex to calculate. Needs separate tests.
Expand Down Expand Up @@ -170,7 +170,7 @@ func TestGetContainerNetworkStats_Containerd(t *testing.T) {
}

// ID and cache TTL not relevant for these tests
result, err := collector.GetContainerNetworkStats(containerID, 10*time.Second)
result, err := collector.GetContainerNetworkStats("test-namespace", containerID, 10*time.Second)

assert.NoError(t, err)
assert.Empty(t, cmp.Diff(test.expectedNetworkStats, result))
Expand Down
10 changes: 8 additions & 2 deletions pkg/util/containers/v2/metrics/cri/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (collector *criCollector) ID() string {
}

// GetContainerStats returns stats by container ID.
func (collector *criCollector) GetContainerStats(containerID string, cacheValidity time.Duration) (*provider.ContainerStats, error) {
func (collector *criCollector) GetContainerStats(containerNS, containerID string, cacheValidity time.Duration) (*provider.ContainerStats, error) {
stats, err := collector.getCriContainerStats(containerID)
if err != nil {
return nil, err
Expand All @@ -75,8 +75,14 @@ func (collector *criCollector) GetContainerStats(containerID string, cacheValidi
}, nil
}

// GetContainerOpenFilesCount returns open files count by container ID.
func (collector *criCollector) GetContainerOpenFilesCount(containerNS, containerID string, cacheValidity time.Duration) (*uint64, error) {
// Not available
return nil, nil
}

// GetContainerNetworkStats returns network stats by container ID.
func (collector *criCollector) GetContainerNetworkStats(containerID string, cacheValidity time.Duration) (*provider.ContainerNetworkStats, error) {
func (collector *criCollector) GetContainerNetworkStats(containerNS, containerID string, cacheValidity time.Duration) (*provider.ContainerNetworkStats, error) {
// Not available
return nil, nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/containers/v2/metrics/cri/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestGetContainerStats(t *testing.T) {
client: mockedCriClient,
}

stats, err := collector.GetContainerStats(containerID, 10*time.Second)
stats, err := collector.GetContainerStats("", containerID, 10*time.Second)
assert.NoError(t, err)

assert.Equal(t, pointer.UIntToFloatPtr(1000), stats.CPU.Total)
Expand All @@ -55,7 +55,7 @@ func TestGetContainerStats(t *testing.T) {

func TestGetContainerNetworkStats(t *testing.T) {
collector := criCollector{}
stats, err := collector.GetContainerNetworkStats("123", time.Second)
stats, err := collector.GetContainerNetworkStats("", "123", time.Second)
assert.NoError(t, err)
assert.Nil(t, stats) // The CRI collector does not return any network data
}
10 changes: 8 additions & 2 deletions pkg/util/containers/v2/metrics/docker/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (d *dockerCollector) ID() string {
}

// GetContainerStats returns stats by container ID.
func (d *dockerCollector) GetContainerStats(containerID string, cacheValidity time.Duration) (*provider.ContainerStats, error) {
func (d *dockerCollector) GetContainerStats(containerNS, containerID string, cacheValidity time.Duration) (*provider.ContainerStats, error) {
stats, err := d.stats(containerID)
if err != nil {
return nil, err
Expand All @@ -91,8 +91,14 @@ func (d *dockerCollector) GetContainerStats(containerID string, cacheValidity ti
return outStats, nil
}

// GetContainerOpenFilesCount returns open files count by container ID.
func (d *dockerCollector) GetContainerOpenFilesCount(containerNS, containerID string, cacheValidity time.Duration) (*uint64, error) {
// Not available
return nil, nil
}

// GetContainerNetworkStats returns network stats by container ID.
func (d *dockerCollector) GetContainerNetworkStats(containerID string, cacheValidity time.Duration) (*provider.ContainerNetworkStats, error) {
func (d *dockerCollector) GetContainerNetworkStats(containerNS, containerID string, cacheValidity time.Duration) (*provider.ContainerNetworkStats, error) {
stats, err := d.stats(containerID)
if err != nil {
return nil, err
Expand Down
16 changes: 14 additions & 2 deletions pkg/util/containers/v2/metrics/ecsfargate/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func newEcsFargateCollector() (*ecsFargateCollector, error) {
func (e *ecsFargateCollector) ID() string { return ecsFargateCollectorID }

// GetContainerStats returns stats by container ID.
func (e *ecsFargateCollector) GetContainerStats(containerID string, cacheValidity time.Duration) (*provider.ContainerStats, error) {
func (e *ecsFargateCollector) GetContainerStats(containerNS, containerID string, cacheValidity time.Duration) (*provider.ContainerStats, error) {
stats, err := e.stats(containerID)
if err != nil {
return nil, err
Expand All @@ -86,8 +86,20 @@ func (e *ecsFargateCollector) GetContainerStats(containerID string, cacheValidit
return containerStats, nil
}

// GetContainerPIDStats returns pid stats by container ID.
func (e *ecsFargateCollector) GetContainerPIDStats(containerNS, containerID string, cacheValidity time.Duration) (*provider.ContainerPIDStats, error) {
// Not available
return nil, nil
}

// GetContainerOpenFilesCount returns open files count by container ID.
func (e *ecsFargateCollector) GetContainerOpenFilesCount(containerNS, containerID string, cacheValidity time.Duration) (*uint64, error) {
// Not available
return nil, nil
}

// GetContainerNetworkStats returns network stats by container ID.
func (e *ecsFargateCollector) GetContainerNetworkStats(containerID string, cacheValidity time.Duration) (*provider.ContainerNetworkStats, error) {
func (e *ecsFargateCollector) GetContainerNetworkStats(containerNS, containerID string, cacheValidity time.Duration) (*provider.ContainerNetworkStats, error) {
stats, err := e.stats(containerID)
if err != nil {
return nil, err
Expand Down
16 changes: 14 additions & 2 deletions pkg/util/containers/v2/metrics/kubelet/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (kc *kubeletCollector) GetSelfContainerID() (string, error) {
}

// GetContainerStats returns stats by container ID.
func (kc *kubeletCollector) GetContainerStats(containerID string, cacheValidity time.Duration) (*provider.ContainerStats, error) {
func (kc *kubeletCollector) GetContainerStats(containerNS, containerID string, cacheValidity time.Duration) (*provider.ContainerStats, error) {
currentTime := time.Now()

containerStats, found, err := kc.statsCache.Get(currentTime, contStatsCachePrefix+containerID, cacheValidity)
Expand All @@ -118,8 +118,20 @@ func (kc *kubeletCollector) GetContainerStats(containerID string, cacheValidity
return nil, nil
}

// GetContainerPIDStats returns pid stats by container ID.
func (kc *kubeletCollector) GetContainerPIDStats(containerNS, containerID string, cacheValidity time.Duration) (*provider.ContainerPIDStats, error) {
// Not available
return nil, nil
}

// GetContainerOpenFilesCount returns open files count by container ID.
func (kc *kubeletCollector) GetContainerOpenFilesCount(containerNS, containerID string, cacheValidity time.Duration) (*uint64, error) {
// Not available
return nil, nil
}

// GetContainerNetworkStats returns network stats by container ID.
func (kc *kubeletCollector) GetContainerNetworkStats(containerID string, cacheValidity time.Duration) (*provider.ContainerNetworkStats, error) {
func (kc *kubeletCollector) GetContainerNetworkStats(containerNS, containerID string, cacheValidity time.Duration) (*provider.ContainerNetworkStats, error) {
currentTime := time.Now()

containerNetworkStats, found, err := kc.statsCache.Get(currentTime, contNetStatsCachePrefix+containerID, cacheValidity)
Expand Down
20 changes: 10 additions & 10 deletions pkg/util/containers/v2/metrics/kubelet/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ func TestKubeletCollectorLinux(t *testing.T) {
metadataStore: metadataStore,
}

// On first `GetContainerStats`, the full data is read and cache is filled
// On first `GetCoreContainerStats`, the full data is read and cache is filled
expectedTime, _ := time.Parse(time.RFC3339, "2019-11-20T13:13:13Z")
expectedTime = expectedTime.Local()
cID1Stats, err := kubeletCollector.GetContainerStats("cID1", time.Minute)
cID1Stats, err := kubeletCollector.GetContainerStats("", "cID1", time.Minute)
// Removing content from kubeletMock to make sure anything we hit is from cache
clearFakeStatsSummary(kubeletMock)

Expand All @@ -79,7 +79,7 @@ func TestKubeletCollectorLinux(t *testing.T) {

expectedTime, _ = time.Parse(time.RFC3339, "2019-11-20T13:13:09Z")
expectedTime = expectedTime.Local()
cID2Stats, err := kubeletCollector.GetContainerStats("cID2", time.Minute)
cID2Stats, err := kubeletCollector.GetContainerStats("", "cID2", time.Minute)
assert.NoError(t, err)
assert.Equal(t, &provider.ContainerStats{
Timestamp: expectedTime,
Expand All @@ -94,7 +94,7 @@ func TestKubeletCollectorLinux(t *testing.T) {

expectedTime, _ = time.Parse(time.RFC3339, "2019-11-20T13:13:16Z")
expectedTime = expectedTime.Local()
cID3Stats, err := kubeletCollector.GetContainerStats("cID3", time.Minute)
cID3Stats, err := kubeletCollector.GetContainerStats("", "cID3", time.Minute)
assert.NoError(t, err)
assert.Equal(t, &provider.ContainerStats{
Timestamp: expectedTime,
Expand All @@ -121,21 +121,21 @@ func TestKubeletCollectorLinux(t *testing.T) {
NetworkIsolationGroupID: pointer.UInt64Ptr(17659160645723176180),
}

cID3NetworkStats, err := kubeletCollector.GetContainerNetworkStats("cID3", time.Minute)
cID3NetworkStats, err := kubeletCollector.GetContainerNetworkStats("", "cID3", time.Minute)
assert.NoError(t, err)
assert.Equal(t, expectedPodNetworkStats, cID3NetworkStats)

cID2NetworkStats, err := kubeletCollector.GetContainerNetworkStats("cID2", time.Minute)
cID2NetworkStats, err := kubeletCollector.GetContainerNetworkStats("", "cID2", time.Minute)
assert.NoError(t, err)
assert.Equal(t, expectedPodNetworkStats, cID2NetworkStats)

// Test getting stats for an unknown container, should answer without data but without error (no API call triggered)
cID4Stats, err := kubeletCollector.GetContainerStats("cID4", time.Minute)
cID4Stats, err := kubeletCollector.GetContainerStats("", "cID4", time.Minute)
assert.NoError(t, err)
assert.Nil(t, cID4Stats)

// Forcing a refresh, will trigger a Kubelet call (which will answer with 404 Not found)
cID1Stats, err = kubeletCollector.GetContainerStats("cID1", 0)
cID1Stats, err = kubeletCollector.GetContainerStats("", "cID1", 0)
assert.Equal(t, err.Error(), "Unable to fetch stats summary from Kubelet, rc: 404")
assert.Nil(t, cID1Stats)
}
Expand Down Expand Up @@ -169,10 +169,10 @@ func TestKubeletCollectorWindows(t *testing.T) {
metadataStore: metadataStore,
}

// On first `GetContainerStats`, the full data is read and cache is filled
// On first `GetCoreContainerStats`, the full data is read and cache is filled
expectedTime, _ := time.Parse(time.RFC3339, "2020-04-24T15:54:14Z")
expectedTime = expectedTime.Local()
cID1Stats, err := kubeletCollector.GetContainerStats("cID1", time.Minute)
cID1Stats, err := kubeletCollector.GetContainerStats("", "cID1", time.Minute)
assert.NoError(t, err)
assert.Equal(t, &provider.ContainerStats{
Timestamp: expectedTime,
Expand Down
Loading

0 comments on commit d097a57

Please sign in to comment.