Skip to content

Commit

Permalink
Feature/cache expiration (#31785) (#31979)
Browse files Browse the repository at this point in the history
* fixed cache expiration bug
  • Loading branch information
mergify[bot] authored Jun 20, 2022
1 parent ab2b28e commit 3a29e47
Show file tree
Hide file tree
Showing 26 changed files with 134 additions and 71 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
*Metricbeat*

- make `system/filesystem` code sensitive to `hostfs` and migrate libraries to `elastic-agent-opts` {pull}31001[31001]
- Fix kubernetes module's internal cache expiration issue. This avoid metrics like `kubernetes.container.cpu.usage.limit.pct` from not being populated. {pull}31785[31785]
- add missing HealthyHostCount and UnHealthyHostCount for application ELB. {pull}31853[31853]

*Packetbeat*
Expand Down
4 changes: 3 additions & 1 deletion libbeat/common/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,9 @@ func (c *Cache) StartJanitor(interval time.Duration) {

// StopJanitor stops the goroutine created by StartJanitor.
func (c *Cache) StopJanitor() {
close(c.janitorQuit)
if c.janitorQuit != nil {
close(c.janitorQuit)
}
}

// get returns the non-expired values from the cache.
Expand Down
4 changes: 2 additions & 2 deletions metricbeat/module/kubernetes/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewContainerMetadataEnricher(base, true),
enricher: util.NewContainerMetadataEnricher(base, mod.GetPerfMetricsCache(), true),
mod: mod,
}, nil
}
Expand All @@ -93,7 +93,7 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}

events, err := eventMapping(body, util.PerfMetrics, m.Logger())
events, err := eventMapping(body, m.mod.GetPerfMetricsCache(), m.Logger())
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
3 changes: 2 additions & 1 deletion metricbeat/module/kubernetes/container/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io/ioutil"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand All @@ -43,7 +44,7 @@ func TestEventMapping(t *testing.T) {
body, err := ioutil.ReadAll(f)
assert.NoError(t, err, "cannot read test file "+testFile)

cache := util.NewPerfMetricsCache()
cache := util.NewPerfMetricsCache(120 * time.Second)
cache.NodeCoresAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 2)
cache.NodeMemAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 146227200)
cache.ContainerMemLimit.Set(util.ContainerUID("default", "nginx-deployment-2303442956-pcqfc", "nginx"), 14622720)
Expand Down
17 changes: 17 additions & 0 deletions metricbeat/module/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/elastic/beats/v7/metricbeat/helper"
p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
)

func init() {
Expand All @@ -41,6 +42,7 @@ type Module interface {
mb.Module
GetStateMetricsFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error)
GetKubeletStats(http *helper.HTTP) ([]byte, error)
GetPerfMetricsCache() *util.PerfMetricsCache
}

type familiesCache struct {
Expand Down Expand Up @@ -84,6 +86,7 @@ type module struct {

kubeStateMetricsCache *kubeStateMetricsCache
kubeletStatsCache *kubeletStatsCache
perfMetrics *util.PerfMetricsCache
cacheHash uint64
}

Expand All @@ -94,15 +97,25 @@ func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) {
kubeletStatsCache := &kubeletStatsCache{
cacheMap: make(map[uint64]*statsCache),
}
perfMetrics := util.NewPerfMetricsCache(0)
return func(base mb.BaseModule) (mb.Module, error) {
hash, err := generateCacheHash(base.Config().Hosts)
if err != nil {
return nil, fmt.Errorf("error generating cache hash for kubeStateMetricsCache: %w", err)
}

// NOTE: `Period * 2` is an arbitrary value to make the cache NEVER to expire before the next scraping run
// if different metricsets have different periods, we will effectively set (timeout = max(Period) * 2)
minCacheExpirationTime := base.Config().Period * 2
if perfMetrics.GetTimeout() < minCacheExpirationTime {
perfMetrics.SetOrUpdateTimeout(minCacheExpirationTime)
}

m := module{
BaseModule: base,
kubeStateMetricsCache: kubeStateMetricsCache,
kubeletStatsCache: kubeletStatsCache,
perfMetrics: perfMetrics,
cacheHash: hash,
}
return &m, nil
Expand Down Expand Up @@ -153,3 +166,7 @@ func generateCacheHash(host []string) (uint64, error) {
}
return id, nil
}

func (m *module) GetPerfMetricsCache() *util.PerfMetricsCache {
return m.perfMetrics
}
2 changes: 1 addition & 1 deletion metricbeat/module/kubernetes/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down
4 changes: 2 additions & 2 deletions metricbeat/module/kubernetes/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, true),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, mod.GetPerfMetricsCache(), true),
mod: mod,
}, nil
}
Expand All @@ -95,7 +95,7 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}

events, err := eventMapping(body, util.PerfMetrics, m.Logger())
events, err := eventMapping(body, m.mod.GetPerfMetricsCache(), m.Logger())
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
3 changes: 2 additions & 1 deletion metricbeat/module/kubernetes/pod/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io/ioutil"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand All @@ -43,7 +44,7 @@ func TestEventMapping(t *testing.T) {
body, err := ioutil.ReadAll(f)
assert.NoError(t, err, "cannot read test file "+testFile)

cache := util.NewPerfMetricsCache()
cache := util.NewPerfMetricsCache(120 * time.Second)
cache.NodeCoresAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 2)
cache.NodeMemAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 146227200)
cache.ContainerMemLimit.Set(util.ContainerUID("default", "nginx-deployment-2303442956-pcqfc", "nginx"), 14622720)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewContainerMetadataEnricher(base, false),
enricher: util.NewContainerMetadataEnricher(base, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down
4 changes: 1 addition & 3 deletions metricbeat/module/kubernetes/state_cronjob/state_cronjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewCronJobMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
BaseMetricSet: base,
prometheus: prometheus,
mod: mod,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.CronJob{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.CronJob{}, mod.GetPerfMetricsCache(), false),
mapping: &p.MetricsMapping{
Metrics: map[string]p.MetricMap{
"kube_cronjob_info": p.InfoMetric(),
Expand Down Expand Up @@ -122,8 +122,6 @@ func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}

return
}

// Close stops this metricset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.DaemonSet{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.DaemonSet{}, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Deployment{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Deployment{}, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down Expand Up @@ -128,8 +128,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}

return
}

// Close stops this metricset
Expand Down
4 changes: 1 addition & 3 deletions metricbeat/module/kubernetes/state_job/state_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Job{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Job{}, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down Expand Up @@ -145,8 +145,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}

return
}

// Close stops this metricset
Expand Down
4 changes: 1 addition & 3 deletions metricbeat/module/kubernetes/state_node/state_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down Expand Up @@ -151,8 +151,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}

return
}

// Close stops this metricset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewPersistentVolumeMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
BaseMetricSet: base,
prometheus: prometheus,
mod: mod,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.PersistentVolume{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.PersistentVolume{}, mod.GetPerfMetricsCache(), false),
mapping: &p.MetricsMapping{
Metrics: map[string]p.MetricMap{
"kube_persistentvolume_capacity_bytes": p.Metric("capacity.bytes"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewpersistentvolumeclaimMetricSet(base mb.BaseMetricSet) (mb.MetricSet, err
BaseMetricSet: base,
prometheus: prometheus,
mod: mod,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.PersistentVolumeClaim{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.PersistentVolumeClaim{}, mod.GetPerfMetricsCache(), false),
mapping: &p.MetricsMapping{
Metrics: map[string]p.MetricMap{

Expand Down
4 changes: 1 addition & 3 deletions metricbeat/module/kubernetes/state_pod/state_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down Expand Up @@ -130,8 +130,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}

return
}

// Close stops this metricset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.ReplicaSet{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.ReplicaSet{}, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down Expand Up @@ -127,8 +127,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}

return
}

// Close stops this metricset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,5 +97,4 @@ func (m *ResourceQuotaMetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}
return
}
3 changes: 1 addition & 2 deletions metricbeat/module/kubernetes/state_service/state_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func NewServiceMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
"hostname": p.Label("ingress_hostname"),
},
},
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Service{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Service{}, mod.GetPerfMetricsCache(), false),
}, nil
}

Expand Down Expand Up @@ -122,7 +122,6 @@ func (m *ServiceMetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}
return
}

// Close stops this metricset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.StatefulSet{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.StatefulSet{}, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down Expand Up @@ -127,8 +127,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}

return
}

// Close stops this metricset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,4 @@ func (m *StorageClassMetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}
return
}
Loading

0 comments on commit 3a29e47

Please sign in to comment.