Skip to content

Commit

Permalink
[8.2](backport elastic#31785) Feature/cache expiration (elastic#31977)
Browse files Browse the repository at this point in the history
* Feature/cache expiration (elastic#31785)
  • Loading branch information
mergify[bot] authored Jun 20, 2022
1 parent ffe1cba commit 580b653
Show file tree
Hide file tree
Showing 25 changed files with 193 additions and 93 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]

*Metricbeat*

- Fix kubernetes module's internal cache expiration issue. This avoid metrics like `kubernetes.container.cpu.usage.limit.pct` from not being populated. {pull}31785[31785]

*Packetbeat*

Expand Down
4 changes: 3 additions & 1 deletion libbeat/common/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,9 @@ func (c *Cache) StartJanitor(interval time.Duration) {

// StopJanitor stops the goroutine created by StartJanitor.
func (c *Cache) StopJanitor() {
close(c.janitorQuit)
if c.janitorQuit != nil {
close(c.janitorQuit)
}
}

// get returns the non-expired values from the cache.
Expand Down
12 changes: 12 additions & 0 deletions libbeat/common/kubernetes/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,18 @@ func NewInformer(client kubernetes.Interface, resource Resource, opts WatchOptio
}

objType = "statefulset"
case *DaemonSet:
ss := client.AppsV1().DaemonSets(opts.Namespace)
listwatch = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return ss.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return ss.Watch(ctx, options)
},
}

objType = "daemonset"
case *Service:
svc := client.CoreV1().Services(opts.Namespace)
listwatch = &cache.ListWatch{
Expand Down
3 changes: 3 additions & 0 deletions libbeat/common/kubernetes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ type ReplicaSet = appsv1.ReplicaSet
// StatefulSet data
type StatefulSet = appsv1.StatefulSet

// DaemonSet data
type DaemonSet = appsv1.DaemonSet

// Service data
type Service = v1.Service

Expand Down
4 changes: 2 additions & 2 deletions metricbeat/module/kubernetes/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewContainerMetadataEnricher(base, true),
enricher: util.NewContainerMetadataEnricher(base, mod.GetPerfMetricsCache(), true),
mod: mod,
}, nil
}
Expand All @@ -93,7 +93,7 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}

events, err := eventMapping(body, util.PerfMetrics)
events, err := eventMapping(body, m.mod.GetPerfMetricsCache())
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
3 changes: 2 additions & 1 deletion metricbeat/module/kubernetes/container/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io/ioutil"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand All @@ -40,7 +41,7 @@ func TestEventMapping(t *testing.T) {
body, err := ioutil.ReadAll(f)
assert.NoError(t, err, "cannot read test file "+testFile)

cache := util.NewPerfMetricsCache()
cache := util.NewPerfMetricsCache(120 * time.Second)
cache.NodeCoresAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 2)
cache.NodeMemAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 146227200)
cache.ContainerMemLimit.Set(util.ContainerUID("default", "nginx-deployment-2303442956-pcqfc", "nginx"), 14622720)
Expand Down
21 changes: 19 additions & 2 deletions metricbeat/module/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
package kubernetes

import (
"fmt"
"sync"
"time"

"github.com/mitchellh/hashstructure"
"github.com/pkg/errors"
dto "github.com/prometheus/client_model/go"

"github.com/elastic/beats/v7/metricbeat/helper"
p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
)

func init() {
Expand All @@ -41,6 +42,7 @@ type Module interface {
mb.Module
GetStateMetricsFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error)
GetKubeletStats(http *helper.HTTP) ([]byte, error)
GetPerfMetricsCache() *util.PerfMetricsCache
}

type familiesCache struct {
Expand Down Expand Up @@ -84,6 +86,7 @@ type module struct {

kubeStateMetricsCache *kubeStateMetricsCache
kubeletStatsCache *kubeletStatsCache
perfMetrics *util.PerfMetricsCache
cacheHash uint64
}

Expand All @@ -94,15 +97,25 @@ func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) {
kubeletStatsCache := &kubeletStatsCache{
cacheMap: make(map[uint64]*statsCache),
}
perfMetrics := util.NewPerfMetricsCache(0)
return func(base mb.BaseModule) (mb.Module, error) {
hash, err := generateCacheHash(base.Config().Hosts)
if err != nil {
return nil, errors.Wrap(err, "error generating cache hash for kubeStateMetricsCache")
return nil, fmt.Errorf("error generating cache hash for kubeStateMetricsCache: %w", err)
}

// NOTE: `Period * 2` is an arbitrary value to make the cache NEVER to expire before the next scraping run
// if different metricsets have different periods, we will effectively set (timeout = max(Period) * 2)
minCacheExpirationTime := base.Config().Period * 2
if perfMetrics.GetTimeout() < minCacheExpirationTime {
perfMetrics.SetOrUpdateTimeout(minCacheExpirationTime)
}

m := module{
BaseModule: base,
kubeStateMetricsCache: kubeStateMetricsCache,
kubeletStatsCache: kubeletStatsCache,
perfMetrics: perfMetrics,
cacheHash: hash,
}
return &m, nil
Expand Down Expand Up @@ -153,3 +166,7 @@ func generateCacheHash(host []string) (uint64, error) {
}
return id, nil
}

func (m *module) GetPerfMetricsCache() *util.PerfMetricsCache {
return m.perfMetrics
}
7 changes: 1 addition & 6 deletions metricbeat/module/kubernetes/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/kubernetes"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
Expand All @@ -40,8 +39,6 @@ var (
DefaultScheme: defaultScheme,
DefaultPath: defaultPath,
}.Build()

logger = logp.NewLogger("kubernetes.node")
)

// init registers the MetricSet with the central registry.
Expand Down Expand Up @@ -79,7 +76,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down Expand Up @@ -115,8 +112,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
m.Logger().Debug("error trying to emit event")
return
}

return
}

// Close stops this metricset
Expand Down
4 changes: 2 additions & 2 deletions metricbeat/module/kubernetes/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, true),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, mod.GetPerfMetricsCache(), true),
mod: mod,
}, nil
}
Expand All @@ -94,7 +94,7 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}

events, err := eventMapping(body, util.PerfMetrics)
events, err := eventMapping(body, m.mod.GetPerfMetricsCache())
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
3 changes: 2 additions & 1 deletion metricbeat/module/kubernetes/pod/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"io/ioutil"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"

Expand All @@ -40,7 +41,7 @@ func TestEventMapping(t *testing.T) {
body, err := ioutil.ReadAll(f)
assert.NoError(t, err, "cannot read test file "+testFile)

cache := util.NewPerfMetricsCache()
cache := util.NewPerfMetricsCache(120 * time.Second)
cache.NodeCoresAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 2)
cache.NodeMemAllocatable.Set("gke-beats-default-pool-a5b33e2e-hdww", 146227200)
cache.ContainerMemLimit.Set(util.ContainerUID("default", "nginx-deployment-2303442956-pcqfc", "nginx"), 14622720)
Expand Down
29 changes: 16 additions & 13 deletions metricbeat/module/kubernetes/state_container/state_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"fmt"
"strings"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
Expand All @@ -34,8 +32,6 @@ import (
const (
defaultScheme = "http"
defaultPath = "/metrics"
// Nanocores conversion 10^9
nanocores = 1000000000
)

var (
Expand Down Expand Up @@ -122,7 +118,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewContainerMetadataEnricher(base, false),
enricher: util.NewContainerMetadataEnricher(base, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand All @@ -135,11 +131,11 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {

families, err := m.mod.GetStateMetricsFamilies(m.prometheus)
if err != nil {
return errors.Wrap(err, "error getting families")
return fmt.Errorf("error getting families: %w", err)
}
events, err := m.prometheus.ProcessMetrics(families, mapping)
if err != nil {
return errors.Wrap(err, "error getting event")
return fmt.Errorf("error getting event: %w", err)
}

m.enricher.Enrich(events)
Expand All @@ -151,18 +147,25 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
if containerID, ok := event["id"]; ok {
// we don't expect errors here, but if any we would obtain an
// empty string
cID := (containerID).(string)
cID, ok := (containerID).(string)
if !ok {
m.Logger().Debugf("Error while casting containerID: %s", ok)
}
split := strings.Index(cID, "://")
if split != -1 {
containerFields.Put("runtime", cID[:split])
containerFields.Put("id", cID[split+3:])
util.ShouldPut(containerFields, "runtime", cID[:split], m.Logger())
util.ShouldPut(containerFields, "id", cID[split+3:], m.Logger())
}
}
if containerImage, ok := event["image"]; ok {
cImage := (containerImage).(string)
containerFields.Put("image.name", cImage)
cImage, ok := (containerImage).(string)
if !ok {
m.Logger().Debugf("Error while casting containerImage: %s", ok)
}

util.ShouldPut(containerFields, "image.name", cImage, m.Logger())
// remove kubernetes.container.image field as value is the same as ECS container.image.name field
event.Delete("image")
util.ShouldDelete(event, "image", m.Logger())
}

e, err := util.CreateEvent(event, "kubernetes.container")
Expand Down
4 changes: 1 addition & 3 deletions metricbeat/module/kubernetes/state_cronjob/state_cronjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewCronJobMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
BaseMetricSet: base,
prometheus: prometheus,
mod: mod,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.CronJob{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.CronJob{}, mod.GetPerfMetricsCache(), false),
mapping: &p.MetricsMapping{
Metrics: map[string]p.MetricMap{
"kube_cronjob_info": p.InfoMetric(),
Expand Down Expand Up @@ -122,8 +122,6 @@ func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}

return
}

// Close stops this metricset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.ReplicaSet{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.DaemonSet{}, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down Expand Up @@ -126,8 +126,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}

return
}

// Close stops this metricset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Deployment{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Deployment{}, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down Expand Up @@ -128,8 +128,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}

return
}

// Close stops this metricset
Expand Down
4 changes: 1 addition & 3 deletions metricbeat/module/kubernetes/state_job/state_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Job{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Job{}, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down Expand Up @@ -145,8 +145,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}

return
}

// Close stops this metricset
Expand Down
4 changes: 1 addition & 3 deletions metricbeat/module/kubernetes/state_node/state_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, false),
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, mod.GetPerfMetricsCache(), false),
mod: mod,
}, nil
}
Expand Down Expand Up @@ -151,8 +151,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
return
}
}

return
}

// Close stops this metricset
Expand Down
Loading

0 comments on commit 580b653

Please sign in to comment.