Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/remove k8s cache #32539

Merged
merged 41 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
3fed429
coresLimit = nodeCores only if nodeCores > 0 & coresLimit > nodeCores
gsantoro Jul 26, 2022
67fa04d
tmp cache expiration to 1000h
gsantoro Jul 26, 2022
1b36038
tmp implementation
gsantoro Jul 27, 2022
4d2c1d2
metricsstorage with some tests
gsantoro Jul 27, 2022
c2ce5b4
use containerMetrics along perMetrics
gsantoro Jul 27, 2022
418a95b
completely replaced perfMetrics with metricsStorage. tried running li…
gsantoro Jul 27, 2022
bd2fa44
synchronise access to metrics at node/container level
gsantoro Jul 27, 2022
91aa68d
namespace metrics by name, remove metrics at delete node/container, e…
gsantoro Jul 28, 2022
dfbe4ba
logs with fmt.printf
gsantoro Jul 28, 2022
553b668
fixed test + linter
gsantoro Jul 28, 2022
8a6b143
minor fix
gsantoro Jul 28, 2022
818a5eb
fix from PR
gsantoro Jul 29, 2022
bebf135
fix from PR
gsantoro Jul 29, 2022
977b47a
minor renaming
gsantoro Jul 29, 2022
ef9636d
refactoring from PR
gsantoro Jul 29, 2022
612a33a
fixed formatting
gsantoro Jul 29, 2022
58e8215
more tests, keys() function in metrics_repo
gsantoro Aug 1, 2022
6a892a6
cleanup an unnecessary comment
gsantoro Aug 1, 2022
1a28534
nesting node/pod/container without mutex
gsantoro Aug 2, 2022
a8f4f09
tests passing with locks at root level
gsantoro Aug 2, 2022
dab6b23
new type Float64Metric to be a wrapper of float64 to handle nil values
gsantoro Aug 2, 2022
86636b7
refactoring + extra tests for podnames and nodeNames
gsantoro Aug 2, 2022
3b834a2
fine grane locks and refactoring
gsantoro Aug 2, 2022
9652034
fixed linting
gsantoro Aug 2, 2022
4713211
renaming functions in metrics_repo.go
gsantoro Aug 3, 2022
64b0aeb
renaming clone to Clone, removed todo comments
gsantoro Aug 3, 2022
3fac2ae
added docstring, moved rwmutex from containerMetrics to containerStor…
gsantoro Aug 3, 2022
74ccb5a
fixed linter issue
gsantoro Aug 3, 2022
9860ab9
extra manifest for debug.dev.multi
gsantoro Aug 11, 2022
21adb60
merge conflicts
gsantoro Aug 11, 2022
77cfb7a
more tests to check what happen to pod metrics when you have more tha…
gsantoro Aug 4, 2022
2baaa73
minor refactoring
gsantoro Aug 8, 2022
7835ec0
changed tiltfile since it doesn't run properly in debug mode on a mul…
gsantoro Aug 9, 2022
adddffa
rollback unneeded change in PR
gsantoro Aug 10, 2022
4611cca
more tests, minor refactoring, more comments
gsantoro Aug 10, 2022
4c4dd3b
new line just to kick the build process
gsantoro Aug 11, 2022
b08e14f
fix format
gsantoro Aug 11, 2022
12827a3
fix from PR
gsantoro Aug 11, 2022
7addddd
two more tests
gsantoro Aug 11, 2022
e755f62
fix linter
gsantoro Aug 11, 2022
ece02a9
added changelog
gsantoro Aug 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix to ARN parsing for Cloudwatch resource names with leading slashes {pull}32358[32358]
- Fix an infinite loop in AWS billing metricset. {pull}32626[32626]
- Add missing metrics in AWS Transit Gateway module {pull}32617[32617]
- Replace internal expiring cache used by the Kubernetes module with in-memory dictionary {pull}32539[32539]

*Packetbeat*

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
{
"node": {
"nodeName": "gke-beats-default-pool-a5b33e2e-hdww",
"systemContainers": [
{
"name": "kubelet",
"startTime": "2017-04-18T12:53:49Z",
"cpu": {
"time": "2017-04-20T08:06:46Z",
"usageNanoCores": 11263994,
"usageCoreNanoSeconds": 2357800908948
},
"memory": {
"time": "2017-04-20T08:06:46Z",
"usageBytes": 36683776,
"workingSetBytes": 36495360,
"rssBytes": 35512320,
"pageFaults": 100835242,
"majorPageFaults": 0
},
"userDefinedMetrics": null
}
],
"startTime": "2017-04-18T12:53:49Z",
"cpu": {
"time": "2017-04-20T08:06:41Z",
"usageNanoCores": 18691146,
"usageCoreNanoSeconds": 4189523881380
},
"memory": {
"time": "2017-04-20T08:06:41Z",
"availableBytes": 1768316928,
"usageBytes": 2764943360,
"workingSetBytes": 2111090688,
"rssBytes": 2150400,
"pageFaults": 131567,
"majorPageFaults": 103
},
"network": {
"time": "2017-04-20T08:06:41Z",
"rxBytes": 1115133198,
"rxErrors": 0,
"txBytes": 812729002,
"txErrors": 0
},
"fs": {
"availableBytes": 98727014400,
"capacityBytes": 101258067968,
"usedBytes": 2514276352,
"inodesFree": 18446744073709551615,
"inodes": 6258720,
"inodesUsed": 138624
},
"runtime": {
"imageFs": {
"availableBytes": 98727014400,
"capacityBytes": 101258067968,
"usedBytes": 860204379,
"inodesFree": 18446744073709551615,
"inodes": 6258720,
"inodesUsed": 138624
}
}
},
"pods": [
{
"podRef": {
"name": "nginx-deployment-2303442956-pcqfc",
"namespace": "default",
"uid": "beabc196-2456-11e7-a3ad-42010a840235"
},
"startTime": "2017-04-18T16:47:44Z",
"containers": [
{
"name": "nginx",
"startTime": "2017-04-18T16:47:44Z",
"cpu": {
"time": "2017-04-20T08:06:34Z",
"usageNanoCores": 11263994,
"usageCoreNanoSeconds": 43959424
},
"memory": {
"time": "2017-04-20T08:06:34Z",
"usageBytes": 1462272,
"workingSetBytes": 1454080,
"rssBytes": 1409024,
"pageFaults": 841,
"majorPageFaults": 0
},
"rootfs": {
"availableBytes": 98727014400,
"capacityBytes": 101258067968,
"usedBytes": 61440,
"inodesFree": 6120096,
"inodes": 6258720,
"inodesUsed": 21
},
"logs": {
"availableBytes": 98727014400,
"capacityBytes": 101258067968,
"usedBytes": 28672,
"inodesFree": 6120096,
"inodes": 6258720,
"inodesUsed": 138624
},
"userDefinedMetrics": null
},
{
"name": "sidecar",
"startTime": "2017-04-18T16:47:44Z",
"cpu": {
"time": "2017-04-20T08:06:34Z",
"usageNanoCores": 11263994,
"usageCoreNanoSeconds": 43959424
},
"memory": {
"time": "2017-04-20T08:06:34Z",
"usageBytes": 1462272,
"workingSetBytes": 1454080,
"rssBytes": 1409024,
"pageFaults": 841,
"majorPageFaults": 0
},
"rootfs": {
"availableBytes": 98727014400,
"capacityBytes": 101258067968,
"usedBytes": 61440,
"inodesFree": 6120096,
"inodes": 6258720,
"inodesUsed": 21
},
"logs": {
"availableBytes": 98727014400,
"capacityBytes": 101258067968,
"usedBytes": 28672,
"inodesFree": 6120096,
"inodes": 6258720,
"inodesUsed": 138624
},
"userDefinedMetrics": null
}
],
"network": {
"time": "2017-04-20T08:06:41Z",
"rxBytes": 107056,
"rxErrors": 0,
"txBytes": 72447,
"txErrors": 0
},
"volume": [
{
"availableBytes": 1939689472,
"capacityBytes": 1939701760,
"usedBytes": 12288,
"inodesFree": 473551,
"inodes": 473560,
"inodesUsed": 9,
"name": "default-token-sg8x5"
}
]
}
]
}
4 changes: 2 additions & 2 deletions metricbeat/module/kubernetes/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewContainerMetadataEnricher(base, mod.GetPerfMetricsCache(), true),
enricher: util.NewContainerMetadataEnricher(base, mod.GetMetricsRepo(), true),
mod: mod,
}, nil
}
Expand All @@ -93,7 +93,7 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}

events, err := eventMapping(body, m.mod.GetPerfMetricsCache(), m.Logger())
events, err := eventMapping(body, m.mod.GetMetricsRepo(), m.Logger())
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
36 changes: 30 additions & 6 deletions metricbeat/module/kubernetes/container/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"io/ioutil"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand All @@ -44,12 +43,25 @@ func TestEventMapping(t *testing.T) {
body, err := ioutil.ReadAll(f)
assert.NoError(t, err, "cannot read test file "+testFile)

cache := util.NewPerfMetricsCache(120 * time.Second)
cache.NodeCoresAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 2)
cache.NodeMemAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 146227200)
cache.ContainerMemLimit.Set(util.ContainerUID("default", "nginx-deployment-2303442956-pcqfc", "nginx"), 14622720)
metricsRepo := util.NewMetricsRepo()

events, err := eventMapping(body, cache, logger)
nodeName := "gke-beats-default-pool-a5b33e2e-hdww"

nodeMetrics := util.NewNodeMetrics()
nodeMetrics.CoresAllocatable = util.NewFloat64Metric(2)
nodeMetrics.MemoryAllocatable = util.NewFloat64Metric(146227200)
addNodeMetric(metricsRepo, nodeName, nodeMetrics)

namespace := "default"
podName := "nginx-deployment-2303442956-pcqfc"
podId := util.NewPodId(namespace, podName)
containerName := "nginx"

containerMetrics := util.NewContainerMetrics()
containerMetrics.MemoryLimit = util.NewFloat64Metric(14622720)
addContainerMetric(metricsRepo, nodeName, podId, containerName, containerMetrics)

events, err := eventMapping(body, metricsRepo, logger)
assert.NoError(t, err, "error mapping "+testFile)

assert.Len(t, events, 1, "got wrong number of events")
Expand Down Expand Up @@ -107,3 +119,15 @@ func testValue(t *testing.T, event mapstr.M, field string, value interface{}) {
assert.NoError(t, err, "Could not read field "+field)
assert.EqualValues(t, data, value, "Wrong value for field "+field)
}

func addContainerMetric(metricsRepo *util.MetricsRepo, nodeName string, podId util.PodId, containerName string, metrics *util.ContainerMetrics) {
nodeStore, _ := metricsRepo.AddNodeStore(nodeName)
podStore, _ := nodeStore.AddPodStore(podId)
containerStore, _ := podStore.AddContainerStore(containerName)
containerStore.SetContainerMetrics(metrics)
}

func addNodeMetric(metricsRepo *util.MetricsRepo, nodeName string, nodeMetrics *util.NodeMetrics) {
nodeStore, _ := metricsRepo.AddNodeStore(nodeName)
nodeStore.SetNodeMetrics(nodeMetrics)
}
50 changes: 39 additions & 11 deletions metricbeat/module/kubernetes/container/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"github.com/elastic/elastic-agent-libs/mapstr"
)

func eventMapping(content []byte, perfMetrics *util.PerfMetricsCache, logger *logp.Logger) ([]mapstr.M, error) {
func eventMapping(content []byte, metricsRepo *util.MetricsRepo, logger *logp.Logger) ([]mapstr.M, error) {
events := []mapstr.M{}
var summary kubernetes.Summary

Expand All @@ -39,9 +39,23 @@ func eventMapping(content []byte, perfMetrics *util.PerfMetricsCache, logger *lo
}

node := summary.Node
nodeCores := perfMetrics.NodeCoresAllocatable.Get(node.NodeName)
nodeMem := perfMetrics.NodeMemAllocatable.Get(node.NodeName)

nodeCores := 0.0
nodeMem := 0.0

nodeStore := metricsRepo.GetNodeStore(node.NodeName)
nodeMetrics := nodeStore.GetNodeMetrics()
if nodeMetrics.CoresAllocatable != nil {
nodeCores = nodeMetrics.CoresAllocatable.Value
}
if nodeMetrics.MemoryAllocatable != nil {
nodeMem = nodeMetrics.MemoryAllocatable.Value
}

for _, pod := range summary.Pods {
podId := util.NewPodId(pod.PodRef.Namespace, pod.PodRef.Name)
podStore := nodeStore.GetPodStore(podId)

for _, container := range pod.Containers {
containerEvent := mapstr.M{
mb.ModuleDataKey: mapstr.M{
Expand Down Expand Up @@ -127,17 +141,31 @@ func eventMapping(content []byte, perfMetrics *util.PerfMetricsCache, logger *lo
kubernetes2.ShouldPut(containerEvent, "memory.usage.node.pct", float64(container.Memory.UsageBytes)/nodeMem, logger)
}

cuid := util.ContainerUID(pod.PodRef.Namespace, pod.PodRef.Name, container.Name)
coresLimit := perfMetrics.ContainerCoresLimit.GetWithDefault(cuid, nodeCores)
memLimit := perfMetrics.ContainerMemLimit.GetWithDefault(cuid, nodeMem)
containerStore := podStore.GetContainerStore(container.Name)
containerMetrics := containerStore.GetContainerMetrics()

containerCoresLimit := nodeCores
if containerMetrics.CoresLimit != nil {
containerCoresLimit = containerMetrics.CoresLimit.Value
}

containerMemLimit := nodeMem
if containerMetrics.MemoryLimit != nil {
containerMemLimit = containerMetrics.MemoryLimit.Value
}

// NOTE:
// we don't currently check if `containerMemLimit` > `nodeMem` as we do in `kubernetes/pod/data.go`.
// There we do check, since if a container doesn't have a limit set, it will inherit the node limits and the sum of all
// the container limits can be greater than the node limits. We assume here the user can set correct limits on containers.

if coresLimit > 0 {
kubernetes2.ShouldPut(containerEvent, "cpu.usage.limit.pct", float64(container.CPU.UsageNanoCores)/1e9/coresLimit, logger)
if containerCoresLimit > 0 {
gsantoro marked this conversation as resolved.
Show resolved Hide resolved
kubernetes2.ShouldPut(containerEvent, "cpu.usage.limit.pct", float64(container.CPU.UsageNanoCores)/1e9/containerCoresLimit, logger)
}

if memLimit > 0 {
kubernetes2.ShouldPut(containerEvent, "memory.usage.limit.pct", float64(container.Memory.UsageBytes)/memLimit, logger)
kubernetes2.ShouldPut(containerEvent, "memory.workingset.limit.pct", float64(container.Memory.WorkingSetBytes)/memLimit, logger)
if containerMemLimit > 0 {
kubernetes2.ShouldPut(containerEvent, "memory.usage.limit.pct", float64(container.Memory.UsageBytes)/containerMemLimit, logger)
kubernetes2.ShouldPut(containerEvent, "memory.workingset.limit.pct", float64(container.Memory.WorkingSetBytes)/containerMemLimit, logger)
}

events = append(events, containerEvent)
Expand Down
19 changes: 6 additions & 13 deletions metricbeat/module/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Module interface {
mb.Module
GetStateMetricsFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error)
GetKubeletStats(http *helper.HTTP) ([]byte, error)
GetPerfMetricsCache() *util.PerfMetricsCache
GetMetricsRepo() *util.MetricsRepo
}

type familiesCache struct {
Expand Down Expand Up @@ -86,7 +86,7 @@ type module struct {

kubeStateMetricsCache *kubeStateMetricsCache
kubeletStatsCache *kubeletStatsCache
perfMetrics *util.PerfMetricsCache
metricsRepo *util.MetricsRepo
cacheHash uint64
}

Expand All @@ -97,25 +97,18 @@ func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) {
kubeletStatsCache := &kubeletStatsCache{
cacheMap: make(map[uint64]*statsCache),
}
perfMetrics := util.NewPerfMetricsCache(0)
metricsRepo := util.NewMetricsRepo()
return func(base mb.BaseModule) (mb.Module, error) {
hash, err := generateCacheHash(base.Config().Hosts)
if err != nil {
return nil, fmt.Errorf("error generating cache hash for kubeStateMetricsCache: %w", err)
}

// NOTE: `Period * 2` is an arbitrary value to make the cache NEVER to expire before the next scraping run
// if different metricsets have different periods, we will effectively set (timeout = max(Period) * 2)
minCacheExpirationTime := base.Config().Period * 2
if perfMetrics.GetTimeout() < minCacheExpirationTime {
perfMetrics.SetOrUpdateTimeout(minCacheExpirationTime)
}

m := module{
BaseModule: base,
kubeStateMetricsCache: kubeStateMetricsCache,
kubeletStatsCache: kubeletStatsCache,
perfMetrics: perfMetrics,
metricsRepo: metricsRepo,
cacheHash: hash,
}
return &m, nil
Expand Down Expand Up @@ -167,6 +160,6 @@ func generateCacheHash(host []string) (uint64, error) {
return id, nil
}

func (m *module) GetPerfMetricsCache() *util.PerfMetricsCache {
return m.perfMetrics
func (m *module) GetMetricsRepo() *util.MetricsRepo {
return m.metricsRepo
}
2 changes: 1 addition & 1 deletion metricbeat/module/kubernetes/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, mod.GetPerfMetricsCache(), false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, mod.GetMetricsRepo(), false),
mod: mod,
}, nil
}
Expand Down
Loading