Skip to content

Commit

Permalink
[8.1](backport elastic#31785) Feature/cache expiration (elastic#31976)
Browse files Browse the repository at this point in the history
* Feature/cache expiration (elastic#31785)
  • Loading branch information
mergify[bot] authored Jun 20, 2022
1 parent e2e5fde commit 924001d
Show file tree
Hide file tree
Showing 25 changed files with 147 additions and 67 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ https://github.com/elastic/beats/compare/v8.1.3\...main[Check the HEAD diff]
*Metricbeat*

- Improve handling of disabled commands in Zookeeper Metricbeat module. {pull}31013[#31013]
- Fix kubernetes module's internal cache expiration issue. This avoid metrics like `kubernetes.container.cpu.usage.limit.pct` from not being populated. {pull}31785[31785]

*Packetbeat*

Expand Down
4 changes: 3 additions & 1 deletion libbeat/common/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,9 @@ func (c *Cache) StartJanitor(interval time.Duration) {

// StopJanitor stops the goroutine created by StartJanitor.
func (c *Cache) StopJanitor() {
close(c.janitorQuit)
if c.janitorQuit != nil {
close(c.janitorQuit)
}
}

// get returns the non-expired values from the cache.
Expand Down
12 changes: 12 additions & 0 deletions libbeat/common/kubernetes/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,18 @@ func NewInformer(client kubernetes.Interface, resource Resource, opts WatchOptio
}

objType = "statefulset"
case *DaemonSet:
ss := client.AppsV1().DaemonSets(opts.Namespace)
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return ss.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return ss.Watch(ctx, options)
},
}

objType = "daemonset"
case *Service:
svc := client.CoreV1().Services(opts.Namespace)
listwatch = &cache.ListWatch{
Expand Down
3 changes: 3 additions & 0 deletions libbeat/common/kubernetes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ type ReplicaSet = appsv1.ReplicaSet
// StatefulSet data
type StatefulSet = appsv1.StatefulSet

// DaemonSet data
type DaemonSet = appsv1.DaemonSet

// Service data
type Service = v1.Service

Expand Down
4 changes: 2 additions & 2 deletions metricbeat/module/kubernetes/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewContainerMetadataEnricher(base, true),
enricher: util.NewContainerMetadataEnricher(base, mod.GetPerfMetricsCache(), true),
mod: mod,
}, nil
}
Expand All @@ -95,7 +95,7 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}

events, err := eventMapping(body, util.PerfMetrics)
events, err := eventMapping(body, m.mod.GetPerfMetricsCache())
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
3 changes: 2 additions & 1 deletion metricbeat/module/kubernetes/container/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io/ioutil"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand All @@ -40,7 +41,7 @@ func TestEventMapping(t *testing.T) {
body, err := ioutil.ReadAll(f)
assert.NoError(t, err, "cannot read test file "+testFile)

cache := util.NewPerfMetricsCache()
cache := util.NewPerfMetricsCache(120 * time.Second)
cache.NodeCoresAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 2)
cache.NodeMemAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 146227200)
cache.ContainerMemLimit.Set(util.ContainerUID("default", "nginx-deployment-2303442956-pcqfc", "nginx"), 14622720)
Expand Down
17 changes: 17 additions & 0 deletions metricbeat/module/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/elastic/beats/v7/metricbeat/helper"
p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
)

func init() {
Expand All @@ -41,6 +42,7 @@ type Module interface {
mb.Module
GetStateMetricsFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error)
GetKubeletStats(http *helper.HTTP) ([]byte, error)
GetPerfMetricsCache() *util.PerfMetricsCache
}

type familiesCache struct {
Expand Down Expand Up @@ -84,6 +86,7 @@ type module struct {

kubeStateMetricsCache *kubeStateMetricsCache
kubeletStatsCache *kubeletStatsCache
perfMetrics *util.PerfMetricsCache
cacheHash uint64
}

Expand All @@ -94,15 +97,25 @@ func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) {
kubeletStatsCache := &kubeletStatsCache{
cacheMap: make(map[uint64]*statsCache),
}
perfMetrics := util.NewPerfMetricsCache(0)
return func(base mb.BaseModule) (mb.Module, error) {
hash, err := generateCacheHash(base.Config().Hosts)
if err != nil {
return nil, errors.Wrap(err, "error generating cache hash for kubeStateMetricsCache")
}

// NOTE: `Period * 2` is an arbitrary value to make the cache NEVER to expire before the next scraping run
// if different metricsets have different periods, we will effectively set (timeout = max(Period) * 2)
minCacheExpirationTime := base.Config().Period * 2
if perfMetrics.GetTimeout() < minCacheExpirationTime {
perfMetrics.SetOrUpdateTimeout(minCacheExpirationTime)
}

m := module{
BaseModule: base,
kubeStateMetricsCache: kubeStateMetricsCache,
kubeletStatsCache: kubeletStatsCache,
perfMetrics: perfMetrics,
cacheHash: hash,
}
return &m, nil
Expand Down Expand Up @@ -153,3 +166,7 @@ func generateCacheHash(host []string) (uint64, error) {
}
return id, nil
}

func (m *module) GetPerfMetricsCache() *util.PerfMetricsCache {
return m.perfMetrics
}
2 changes: 1 addition & 1 deletion metricbeat/module/kubernetes/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down
4 changes: 2 additions & 2 deletions metricbeat/module/kubernetes/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, true),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, mod.GetPerfMetricsCache(), true),
mod: mod,
}, nil
}
Expand All @@ -96,7 +96,7 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}

events, err := eventMapping(body, util.PerfMetrics)
events, err := eventMapping(body, m.mod.GetPerfMetricsCache())
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
3 changes: 2 additions & 1 deletion metricbeat/module/kubernetes/pod/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io/ioutil"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand All @@ -40,7 +41,7 @@ func TestEventMapping(t *testing.T) {
body, err := ioutil.ReadAll(f)
assert.NoError(t, err, "cannot read test file "+testFile)

cache := util.NewPerfMetricsCache()
cache := util.NewPerfMetricsCache(120 * time.Second)
cache.NodeCoresAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 2)
cache.NodeMemAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 146227200)
cache.ContainerMemLimit.Set(util.ContainerUID("default", "nginx-deployment-2303442956-pcqfc", "nginx"), 14622720)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewContainerMetadataEnricher(base, false),
enricher: util.NewContainerMetadataEnricher(base, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down
4 changes: 1 addition & 3 deletions metricbeat/module/kubernetes/state_cronjob/state_cronjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func NewCronJobMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
}
if config.AddMetadata {
ms.enricher = util.NewResourceMetadataEnricher(
base, &kubernetes.CronJob{}, false)
base, &kubernetes.CronJob{}, mod.GetPerfMetricsCache(), false)
}
return &ms, nil
}
Expand Down Expand Up @@ -129,8 +129,6 @@ func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}

return
}

// Close stops this metricset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.ReplicaSet{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.DaemonSet{}, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Deployment{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Deployment{}, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down Expand Up @@ -128,8 +128,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}

return
}

// Close stops this metricset
Expand Down
4 changes: 1 addition & 3 deletions metricbeat/module/kubernetes/state_job/state_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Job{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Job{}, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down Expand Up @@ -145,8 +145,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}

return
}

// Close stops this metricset
Expand Down
4 changes: 1 addition & 3 deletions metricbeat/module/kubernetes/state_node/state_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down Expand Up @@ -151,8 +151,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}

return
}

// Close stops this metricset
Expand Down
4 changes: 1 addition & 3 deletions metricbeat/module/kubernetes/state_pod/state_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down Expand Up @@ -130,8 +130,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}

return
}

// Close stops this metricset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.ReplicaSet{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.ReplicaSet{}, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down Expand Up @@ -127,8 +127,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}

return
}

// Close stops this metricset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,5 +97,4 @@ func (m *ResourceQuotaMetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}
return
}
3 changes: 1 addition & 2 deletions metricbeat/module/kubernetes/state_service/state_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func NewServiceMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
"hostname": p.Label("ingress_hostname"),
},
},
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Service{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Service{}, mod.GetPerfMetricsCache(), false),
}, nil
}

Expand Down Expand Up @@ -122,7 +122,6 @@ func (m *ServiceMetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}
return
}

// Close stops this metricset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.StatefulSet{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.StatefulSet{}, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down Expand Up @@ -127,8 +127,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}

return
}

// Close stops this metricset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,4 @@ func (m *StorageClassMetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}
return
}
Loading

0 comments on commit 924001d

Please sign in to comment.