From 3a29e47bd452d989846651bcec34e9b6e4a98972 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Mon, 20 Jun 2022 11:10:22 +0100 Subject: [PATCH] Feature/cache expiration (#31785) (#31979) * fixed cache expiration bug --- CHANGELOG.next.asciidoc | 1 + libbeat/common/cache.go | 4 +- .../module/kubernetes/container/container.go | 4 +- .../kubernetes/container/container_test.go | 3 +- metricbeat/module/kubernetes/kubernetes.go | 17 +++++ metricbeat/module/kubernetes/node/node.go | 2 +- metricbeat/module/kubernetes/pod/pod.go | 4 +- metricbeat/module/kubernetes/pod/pod_test.go | 3 +- .../state_container/state_container.go | 2 +- .../kubernetes/state_cronjob/state_cronjob.go | 4 +- .../state_daemonset/state_daemonset.go | 2 +- .../state_deployment/state_deployment.go | 4 +- .../module/kubernetes/state_job/state_job.go | 4 +- .../kubernetes/state_node/state_node.go | 4 +- .../state_persistentvolume.go | 2 +- .../state_persistentvolumeclaim.go | 2 +- .../module/kubernetes/state_pod/state_pod.go | 4 +- .../state_replicaset/state_replicaset.go | 4 +- .../state_resourcequota.go | 1 - .../kubernetes/state_service/state_service.go | 3 +- .../state_statefulset/state_statefulset.go | 4 +- .../state_storageclass/state_storageclass.go | 1 - .../module/kubernetes/util/kubernetes.go | 53 ++++++++++----- .../module/kubernetes/util/metrics_cache.go | 64 +++++++++++++++---- .../kubernetes/util/metrics_cache_test.go | 5 +- metricbeat/module/kubernetes/volume/volume.go | 4 +- 26 files changed, 134 insertions(+), 71 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 06be216c661..c2c186bcb39 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -60,6 +60,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] *Metricbeat* - make `system/filesystem` code sensitive to `hostfs` and migrate libraries to `elastic-agent-opts` {pull}31001[31001] +- Fix kubernetes module's internal cache expiration issue. This avoid metrics like `kubernetes.container.cpu.usage.limit.pct` from not being populated. {pull}31785[31785] - add missing HealthyHostCount and UnHealthyHostCount for application ELB. {pull}31853[31853] *Packetbeat* diff --git a/libbeat/common/cache.go b/libbeat/common/cache.go index c06aa2427b8..d75a8471c5f 100644 --- a/libbeat/common/cache.go +++ b/libbeat/common/cache.go @@ -255,7 +255,9 @@ func (c *Cache) StartJanitor(interval time.Duration) { // StopJanitor stops the goroutine created by StartJanitor. func (c *Cache) StopJanitor() { - close(c.janitorQuit) + if c.janitorQuit != nil { + close(c.janitorQuit) + } } // get returns the non-expired values from the cache. diff --git a/metricbeat/module/kubernetes/container/container.go b/metricbeat/module/kubernetes/container/container.go index 70cbd52647b..5e09d9a7f53 100644 --- a/metricbeat/module/kubernetes/container/container.go +++ b/metricbeat/module/kubernetes/container/container.go @@ -75,7 +75,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, http: http, - enricher: util.NewContainerMetadataEnricher(base, true), + enricher: util.NewContainerMetadataEnricher(base, mod.GetPerfMetricsCache(), true), mod: mod, }, nil } @@ -93,7 +93,7 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } - events, err := eventMapping(body, util.PerfMetrics, m.Logger()) + events, err := eventMapping(body, m.mod.GetPerfMetricsCache(), m.Logger()) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/container/container_test.go b/metricbeat/module/kubernetes/container/container_test.go index 56c602afef9..d50ca134b56 100644 --- a/metricbeat/module/kubernetes/container/container_test.go +++ b/metricbeat/module/kubernetes/container/container_test.go @@ -24,6 +24,7 @@ import ( "io/ioutil" "os" "testing" + "time" "github.com/stretchr/testify/assert" @@ -43,7 +44,7 @@ func TestEventMapping(t *testing.T) { body, err := ioutil.ReadAll(f) assert.NoError(t, err, "cannot read test file "+testFile) - cache := util.NewPerfMetricsCache() + cache := util.NewPerfMetricsCache(120 * time.Second) cache.NodeCoresAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 2) cache.NodeMemAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 146227200) cache.ContainerMemLimit.Set(util.ContainerUID("default", "nginx-deployment-2303442956-pcqfc", "nginx"), 14622720) diff --git a/metricbeat/module/kubernetes/kubernetes.go b/metricbeat/module/kubernetes/kubernetes.go index 121bb3bb0f9..dc09f5a4a34 100644 --- a/metricbeat/module/kubernetes/kubernetes.go +++ b/metricbeat/module/kubernetes/kubernetes.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/metricbeat/helper" p "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" ) func init() { @@ -41,6 +42,7 @@ type Module interface { mb.Module GetStateMetricsFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error) GetKubeletStats(http *helper.HTTP) ([]byte, error) + GetPerfMetricsCache() *util.PerfMetricsCache } type familiesCache struct { @@ -84,6 +86,7 @@ type module struct { kubeStateMetricsCache *kubeStateMetricsCache kubeletStatsCache *kubeletStatsCache + perfMetrics *util.PerfMetricsCache cacheHash uint64 } @@ -94,15 +97,25 @@ func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) { kubeletStatsCache := &kubeletStatsCache{ cacheMap: make(map[uint64]*statsCache), } + perfMetrics := util.NewPerfMetricsCache(0) return func(base mb.BaseModule) (mb.Module, error) { hash, err := generateCacheHash(base.Config().Hosts) if err != nil { return nil, fmt.Errorf("error generating cache hash for kubeStateMetricsCache: %w", err) } + + // NOTE: `Period * 2` is an arbitrary value to make the cache NEVER to expire before the next scraping run + // if different metricsets have different periods, we will effectively set (timeout = max(Period) * 2) + minCacheExpirationTime := base.Config().Period * 2 + if perfMetrics.GetTimeout() < minCacheExpirationTime { + perfMetrics.SetOrUpdateTimeout(minCacheExpirationTime) + } + m := module{ BaseModule: base, kubeStateMetricsCache: kubeStateMetricsCache, kubeletStatsCache: kubeletStatsCache, + perfMetrics: perfMetrics, cacheHash: hash, } return &m, nil @@ -153,3 +166,7 @@ func generateCacheHash(host []string) (uint64, error) { } return id, nil } + +func (m *module) GetPerfMetricsCache() *util.PerfMetricsCache { + return m.perfMetrics +} diff --git a/metricbeat/module/kubernetes/node/node.go b/metricbeat/module/kubernetes/node/node.go index 3dd38d910b1..8564d539af4 100644 --- a/metricbeat/module/kubernetes/node/node.go +++ b/metricbeat/module/kubernetes/node/node.go @@ -76,7 +76,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, http: http, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } diff --git a/metricbeat/module/kubernetes/pod/pod.go b/metricbeat/module/kubernetes/pod/pod.go index c936b6017a8..1df68669fc1 100644 --- a/metricbeat/module/kubernetes/pod/pod.go +++ b/metricbeat/module/kubernetes/pod/pod.go @@ -77,7 +77,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, http: http, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, true), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, mod.GetPerfMetricsCache(), true), mod: mod, }, nil } @@ -95,7 +95,7 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } - events, err := eventMapping(body, util.PerfMetrics, m.Logger()) + events, err := eventMapping(body, m.mod.GetPerfMetricsCache(), m.Logger()) if err != nil { m.Logger().Error(err) reporter.Error(err) diff --git a/metricbeat/module/kubernetes/pod/pod_test.go b/metricbeat/module/kubernetes/pod/pod_test.go index 175055b07ef..226433acf1a 100644 --- a/metricbeat/module/kubernetes/pod/pod_test.go +++ b/metricbeat/module/kubernetes/pod/pod_test.go @@ -24,6 +24,7 @@ import ( "io/ioutil" "os" "testing" + "time" "github.com/stretchr/testify/assert" @@ -43,7 +44,7 @@ func TestEventMapping(t *testing.T) { body, err := ioutil.ReadAll(f) assert.NoError(t, err, "cannot read test file "+testFile) - cache := util.NewPerfMetricsCache() + cache := util.NewPerfMetricsCache(120 * time.Second) cache.NodeCoresAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 2) cache.NodeMemAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 146227200) cache.ContainerMemLimit.Set(util.ContainerUID("default", "nginx-deployment-2303442956-pcqfc", "nginx"), 14622720) diff --git a/metricbeat/module/kubernetes/state_container/state_container.go b/metricbeat/module/kubernetes/state_container/state_container.go index a33dce02f85..cf7b5eb9538 100644 --- a/metricbeat/module/kubernetes/state_container/state_container.go +++ b/metricbeat/module/kubernetes/state_container/state_container.go @@ -118,7 +118,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, - enricher: util.NewContainerMetadataEnricher(base, false), + enricher: util.NewContainerMetadataEnricher(base, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } diff --git a/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go b/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go index f8e83e5ed22..2e69ce47316 100644 --- a/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go +++ b/metricbeat/module/kubernetes/state_cronjob/state_cronjob.go @@ -63,7 +63,7 @@ func NewCronJobMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { BaseMetricSet: base, prometheus: prometheus, mod: mod, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.CronJob{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.CronJob{}, mod.GetPerfMetricsCache(), false), mapping: &p.MetricsMapping{ Metrics: map[string]p.MetricMap{ "kube_cronjob_info": p.InfoMetric(), @@ -122,8 +122,6 @@ func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) { return } } - - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go b/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go index 541a09ca540..de0956994f8 100644 --- a/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go +++ b/metricbeat/module/kubernetes/state_daemonset/state_daemonset.go @@ -89,7 +89,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.DaemonSet{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.DaemonSet{}, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } diff --git a/metricbeat/module/kubernetes/state_deployment/state_deployment.go b/metricbeat/module/kubernetes/state_deployment/state_deployment.go index dde5f25525d..87bb67320e3 100644 --- a/metricbeat/module/kubernetes/state_deployment/state_deployment.go +++ b/metricbeat/module/kubernetes/state_deployment/state_deployment.go @@ -90,7 +90,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Deployment{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Deployment{}, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } @@ -128,8 +128,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } } - - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_job/state_job.go b/metricbeat/module/kubernetes/state_job/state_job.go index 282bfa670d0..f49f75bf22a 100644 --- a/metricbeat/module/kubernetes/state_job/state_job.go +++ b/metricbeat/module/kubernetes/state_job/state_job.go @@ -108,7 +108,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Job{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Job{}, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } @@ -145,8 +145,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } } - - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_node/state_node.go b/metricbeat/module/kubernetes/state_node/state_node.go index c6bdd890e91..be48d50b1e9 100644 --- a/metricbeat/module/kubernetes/state_node/state_node.go +++ b/metricbeat/module/kubernetes/state_node/state_node.go @@ -114,7 +114,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } @@ -151,8 +151,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } } - - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_persistentvolume/state_persistentvolume.go b/metricbeat/module/kubernetes/state_persistentvolume/state_persistentvolume.go index eee1ee54ed2..dc5e0db7ee2 100644 --- a/metricbeat/module/kubernetes/state_persistentvolume/state_persistentvolume.go +++ b/metricbeat/module/kubernetes/state_persistentvolume/state_persistentvolume.go @@ -58,7 +58,7 @@ func NewPersistentVolumeMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { BaseMetricSet: base, prometheus: prometheus, mod: mod, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.PersistentVolume{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.PersistentVolume{}, mod.GetPerfMetricsCache(), false), mapping: &p.MetricsMapping{ Metrics: map[string]p.MetricMap{ "kube_persistentvolume_capacity_bytes": p.Metric("capacity.bytes"), diff --git a/metricbeat/module/kubernetes/state_persistentvolumeclaim/state_persistentvolumeclaim.go b/metricbeat/module/kubernetes/state_persistentvolumeclaim/state_persistentvolumeclaim.go index 99d784879c0..e257828233e 100644 --- a/metricbeat/module/kubernetes/state_persistentvolumeclaim/state_persistentvolumeclaim.go +++ b/metricbeat/module/kubernetes/state_persistentvolumeclaim/state_persistentvolumeclaim.go @@ -58,7 +58,7 @@ func NewpersistentvolumeclaimMetricSet(base mb.BaseMetricSet) (mb.MetricSet, err BaseMetricSet: base, prometheus: prometheus, mod: mod, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.PersistentVolumeClaim{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.PersistentVolumeClaim{}, mod.GetPerfMetricsCache(), false), mapping: &p.MetricsMapping{ Metrics: map[string]p.MetricMap{ diff --git a/metricbeat/module/kubernetes/state_pod/state_pod.go b/metricbeat/module/kubernetes/state_pod/state_pod.go index 17ed2f5a3fc..f1981e7207e 100644 --- a/metricbeat/module/kubernetes/state_pod/state_pod.go +++ b/metricbeat/module/kubernetes/state_pod/state_pod.go @@ -92,7 +92,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } @@ -130,8 +130,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } } - - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go b/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go index 9f3e7bc38bc..1b730c54df4 100644 --- a/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go +++ b/metricbeat/module/kubernetes/state_replicaset/state_replicaset.go @@ -90,7 +90,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.ReplicaSet{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.ReplicaSet{}, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } @@ -127,8 +127,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } } - - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_resourcequota/state_resourcequota.go b/metricbeat/module/kubernetes/state_resourcequota/state_resourcequota.go index eb8bc6ddf35..f4a95a6d25b 100644 --- a/metricbeat/module/kubernetes/state_resourcequota/state_resourcequota.go +++ b/metricbeat/module/kubernetes/state_resourcequota/state_resourcequota.go @@ -97,5 +97,4 @@ func (m *ResourceQuotaMetricSet) Fetch(reporter mb.ReporterV2) { return } } - return } diff --git a/metricbeat/module/kubernetes/state_service/state_service.go b/metricbeat/module/kubernetes/state_service/state_service.go index 7f996613860..f079f55ed08 100644 --- a/metricbeat/module/kubernetes/state_service/state_service.go +++ b/metricbeat/module/kubernetes/state_service/state_service.go @@ -86,7 +86,7 @@ func NewServiceMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { "hostname": p.Label("ingress_hostname"), }, }, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Service{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Service{}, mod.GetPerfMetricsCache(), false), }, nil } @@ -122,7 +122,6 @@ func (m *ServiceMetricSet) Fetch(reporter mb.ReporterV2) { return } } - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go b/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go index f0eeab506d9..7ad669c0830 100644 --- a/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go +++ b/metricbeat/module/kubernetes/state_statefulset/state_statefulset.go @@ -90,7 +90,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, prometheus: prometheus, - enricher: util.NewResourceMetadataEnricher(base, &kubernetes.StatefulSet{}, false), + enricher: util.NewResourceMetadataEnricher(base, &kubernetes.StatefulSet{}, mod.GetPerfMetricsCache(), false), mod: mod, }, nil } @@ -127,8 +127,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) { return } } - - return } // Close stops this metricset diff --git a/metricbeat/module/kubernetes/state_storageclass/state_storageclass.go b/metricbeat/module/kubernetes/state_storageclass/state_storageclass.go index 820a052a05a..5a0ce3d0109 100644 --- a/metricbeat/module/kubernetes/state_storageclass/state_storageclass.go +++ b/metricbeat/module/kubernetes/state_storageclass/state_storageclass.go @@ -99,5 +99,4 @@ func (m *StorageClassMetricSet) Fetch(reporter mb.ReporterV2) { return } } - return } diff --git a/metricbeat/module/kubernetes/util/kubernetes.go b/metricbeat/module/kubernetes/util/kubernetes.go index 8afdbdbbd70..5e95d395d7d 100644 --- a/metricbeat/module/kubernetes/util/kubernetes.go +++ b/metricbeat/module/kubernetes/util/kubernetes.go @@ -18,6 +18,7 @@ package util import ( + "errors" "fmt" "strings" "sync" @@ -80,10 +81,11 @@ const selector = "kubernetes" func NewResourceMetadataEnricher( base mb.BaseMetricSet, res kubernetes.Resource, + perfMetrics *PerfMetricsCache, nodeScope bool) Enricher { - config := ValidatedConfig(base) - if config == nil { + config, err := GetValidatedConfig(base) + if err != nil { logp.Info("Kubernetes metricset enriching is disabled") return &nilEnricher{} } @@ -122,12 +124,12 @@ func NewResourceMetadataEnricher( name := r.GetObjectMeta().GetName() if cpu, ok := r.Status.Capacity["cpu"]; ok { if q, err := resource.ParseQuantity(cpu.String()); err == nil { - PerfMetrics.NodeCoresAllocatable.Set(name, float64(q.MilliValue())/1000) + perfMetrics.NodeCoresAllocatable.Set(name, float64(q.MilliValue())/1000) } } if memory, ok := r.Status.Capacity["memory"]; ok { if q, err := resource.ParseQuantity(memory.String()); err == nil { - PerfMetrics.NodeMemAllocatable.Set(name, float64(q.Value())) + perfMetrics.NodeMemAllocatable.Set(name, float64(q.Value())) } } @@ -181,10 +183,11 @@ func NewResourceMetadataEnricher( // NewContainerMetadataEnricher returns an Enricher configured for container events func NewContainerMetadataEnricher( base mb.BaseMetricSet, + perfMetrics *PerfMetricsCache, nodeScope bool) Enricher { - config := ValidatedConfig(base) - if config == nil { + config, err := GetValidatedConfig(base) + if err != nil { logp.Info("Kubernetes metricset enriching is disabled") return &nilEnricher{} } @@ -226,12 +229,12 @@ func NewContainerMetadataEnricher( // Report container limits to PerfMetrics cache if cpu, ok := container.Resources.Limits["cpu"]; ok { if q, err := resource.ParseQuantity(cpu.String()); err == nil { - PerfMetrics.ContainerCoresLimit.Set(cuid, float64(q.MilliValue())/1000) + perfMetrics.ContainerCoresLimit.Set(cuid, float64(q.MilliValue())/1000) } } if memory, ok := container.Resources.Limits["memory"]; ok { if q, err := resource.ParseQuantity(memory.String()); err == nil { - PerfMetrics.ContainerMemLimit.Set(cuid, float64(q.Value())) + perfMetrics.ContainerMemLimit.Set(cuid, float64(q.Value())) } } @@ -329,21 +332,39 @@ func GetDefaultDisabledMetaConfig() *kubernetesConfig { } } -func ValidatedConfig(base mb.BaseMetricSet) *kubernetesConfig { - config := kubernetesConfig{ +func GetValidatedConfig(base mb.BaseMetricSet) (*kubernetesConfig, error) { + config, err := GetConfig(base) + if err != nil { + logp.Err("Error while getting config: %v", err) + return nil, err + } + + config, err = validateConfig(config) + if err != nil { + logp.Err("Error while validating config: %v", err) + return nil, err + } + return config, nil +} + +func validateConfig(config *kubernetesConfig) (*kubernetesConfig, error) { + if !config.AddMetadata { + return nil, errors.New("metadata enriching is disabled") + } + return config, nil +} + +func GetConfig(base mb.BaseMetricSet) (*kubernetesConfig, error) { + config := &kubernetesConfig{ AddMetadata: true, SyncPeriod: time.Minute * 10, AddResourceMetadata: metadata.GetDefaultResourceMetadataConfig(), } if err := base.Module().UnpackConfig(&config); err != nil { - return nil + return nil, errors.New("error unpacking configs") } - // Return nil if metadata enriching is disabled: - if !config.AddMetadata { - return nil - } - return &config + return config, nil } func getString(m mapstr.M, key string) string { diff --git a/metricbeat/module/kubernetes/util/metrics_cache.go b/metricbeat/module/kubernetes/util/metrics_cache.go index 3fdf59d8b16..60bde73336e 100644 --- a/metricbeat/module/kubernetes/util/metrics_cache.go +++ b/metricbeat/module/kubernetes/util/metrics_cache.go @@ -23,23 +23,14 @@ import ( "github.com/elastic/beats/v7/libbeat/common" ) -// PerfMetrics stores known metrics from Kubernetes nodes and containers -var PerfMetrics = NewPerfMetricsCache() - -func init() { - PerfMetrics.Start() -} - -const defaultTimeout = 120 * time.Second - // NewPerfMetricsCache initializes and returns a new PerfMetricsCache -func NewPerfMetricsCache() *PerfMetricsCache { +func NewPerfMetricsCache(timeout time.Duration) *PerfMetricsCache { return &PerfMetricsCache{ - NodeMemAllocatable: newValueMap(defaultTimeout), - NodeCoresAllocatable: newValueMap(defaultTimeout), + NodeMemAllocatable: newValueMap(timeout), + NodeCoresAllocatable: newValueMap(timeout), - ContainerMemLimit: newValueMap(defaultTimeout), - ContainerCoresLimit: newValueMap(defaultTimeout), + ContainerMemLimit: newValueMap(timeout), + ContainerCoresLimit: newValueMap(timeout), } } @@ -68,6 +59,43 @@ func (c *PerfMetricsCache) Stop() { c.ContainerCoresLimit.Stop() } +// Returns the maximum timeout of all the caches under PerfMetricsCache +func (c *PerfMetricsCache) GetTimeout() time.Duration { + var ans time.Duration = 0 + + nmATimeout := c.NodeMemAllocatable.GetTimeout() + if nmATimeout > ans { + ans = nmATimeout + } + + ncATimeout := c.NodeCoresAllocatable.GetTimeout() + if ncATimeout > ans { + ans = ncATimeout + } + + cmLTimeout := c.ContainerMemLimit.GetTimeout() + if cmLTimeout > ans { + ans = cmLTimeout + } + + ccLTimeout := c.ContainerCoresLimit.GetTimeout() + if ccLTimeout > ans { + ans = ccLTimeout + } + return ans +} + +// Set the timeout of all the caches under PerfMetricsCache, then Stop and Start all the cache janitors +func (c *PerfMetricsCache) SetOrUpdateTimeout(timeout time.Duration) { + c.NodeMemAllocatable.SetTimeout(timeout) + c.NodeCoresAllocatable.SetTimeout(timeout) + c.ContainerMemLimit.SetTimeout(timeout) + c.ContainerCoresLimit.SetTimeout(timeout) + + c.Stop() + c.Start() +} + type valueMap struct { cache *common.Cache timeout time.Duration @@ -109,6 +137,14 @@ func (m *valueMap) Stop() { m.cache.StopJanitor() } +func (m *valueMap) GetTimeout() time.Duration { + return m.timeout +} + +func (m *valueMap) SetTimeout(timeout time.Duration) { + m.timeout = timeout +} + // ContainerUID creates an unique ID for from namespace, pod name and container name func ContainerUID(namespace, pod, container string) string { return namespace + "/" + pod + "/" + container diff --git a/metricbeat/module/kubernetes/util/metrics_cache_test.go b/metricbeat/module/kubernetes/util/metrics_cache_test.go index 649c1f5fb86..07b447c5fe7 100644 --- a/metricbeat/module/kubernetes/util/metrics_cache_test.go +++ b/metricbeat/module/kubernetes/util/metrics_cache_test.go @@ -19,12 +19,13 @@ package util import ( "testing" + "time" "github.com/stretchr/testify/assert" ) func TestValueMap(t *testing.T) { - test := newValueMap(defaultTimeout) + test := newValueMap(120 * time.Second) // no value assert.Equal(t, 0.0, test.Get("foo")) @@ -35,7 +36,7 @@ func TestValueMap(t *testing.T) { } func TestGetWithDefault(t *testing.T) { - test := newValueMap(defaultTimeout) + test := newValueMap(120 * time.Second) // Empty + default assert.Equal(t, 0.0, test.Get("foo")) diff --git a/metricbeat/module/kubernetes/volume/volume.go b/metricbeat/module/kubernetes/volume/volume.go index e110c43761d..b0f54c275a8 100644 --- a/metricbeat/module/kubernetes/volume/volume.go +++ b/metricbeat/module/kubernetes/volume/volume.go @@ -85,8 +85,8 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } // add ECS orchestrator fields - config := util.ValidatedConfig(base) - if config == nil { + config, err := util.GetValidatedConfig(base) + if err != nil { logp.Info("Kubernetes metricset enriching is disabled") } else { client, err := kubernetes.GetKubernetesClient(config.KubeConfig, config.KubeClientOptions)