Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor kubelet metricsets to share response from endpoint #25782

Merged
merged 8 commits into from
May 28, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Refactor kubelet metricsets to share response from endpoint
Signed-off-by: chrismark <chrismarkou92@gmail.com>
  • Loading branch information
ChrsMark committed May 19, 2021
commit a87b0a087f3d9fd5131ec7964a0c4d8f28bbe70f
67 changes: 57 additions & 10 deletions metricbeat/module/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kubernetes

import (
"github.com/elastic/beats/v7/metricbeat/helper"
"sync"
"time"

Expand All @@ -39,6 +40,7 @@ func init() {
type Module interface {
mb.Module
GetSharedFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error)
GetSharedKubeletStats(http *helper.HTTP) ([]byte, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. I am thinking now that this being shared is an implementation detail, interface and consumers don't care if the response is shared or cached.

Suggested change
GetSharedFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error)
GetSharedKubeletStats(http *helper.HTTP) ([]byte, error)
GetFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error)
GetKubeletStats(http *helper.HTTP) ([]byte, error)

}

type familiesCache struct {
Expand All @@ -61,30 +63,51 @@ func (c *kubeStateMetricsCache) getCacheMapEntry(hash uint64) *familiesCache {
return c.cacheMap[hash]
}

type statsCache struct {
sharedStats []byte
lastFetchErr error
lastFetchTimestamp time.Time
}

type kubeletStatsCache struct {
cacheMap map[uint64]*statsCache
lock sync.Mutex
}

func (c *kubeletStatsCache) getCacheMapEntry(hash uint64) *statsCache {
c.lock.Lock()
defer c.lock.Unlock()
if _, ok := c.cacheMap[hash]; !ok {
c.cacheMap[hash] = &statsCache{}
}
return c.cacheMap[hash]
}

type module struct {
mb.BaseModule

kubeStateMetricsCache *kubeStateMetricsCache
familiesCache *familiesCache
kubeletStatsCache *kubeletStatsCache
cacheHash uint64
}

func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) {
kubeStateMetricsCache := &kubeStateMetricsCache{
cacheMap: make(map[uint64]*familiesCache),
}
kubeletStatsCache := &kubeletStatsCache{
cacheMap: make(map[uint64]*statsCache),
}
return func(base mb.BaseModule) (mb.Module, error) {
hash, err := generateCacheHash(base.Config().Hosts)
if err != nil {
return nil, errors.Wrap(err, "error generating cache hash for kubeStateMetricsCache")
}
// NOTE: These entries will be never removed, this can be a leak if
// metricbeat is used to monitor clusters dynamically created.
// (https://github.com/elastic/beats/pull/25640#discussion_r633395213)
familiesCache := kubeStateMetricsCache.getCacheMapEntry(hash)
m := module{
BaseModule: base,
kubeStateMetricsCache: kubeStateMetricsCache,
familiesCache: familiesCache,
kubeletStatsCache: kubeletStatsCache,
cacheHash: hash,
}
return &m, nil
}
Expand All @@ -96,12 +119,36 @@ func (m *module) GetSharedFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily

now := time.Now()

if m.familiesCache.lastFetchTimestamp.IsZero() || now.Sub(m.familiesCache.lastFetchTimestamp) > m.Config().Period {
m.familiesCache.sharedFamilies, m.familiesCache.lastFetchErr = prometheus.GetFamilies()
m.familiesCache.lastFetchTimestamp = now
// NOTE: These entries will be never removed, this can be a leak if
// metricbeat is used to monitor clusters dynamically created.
// (https://github.com/elastic/beats/pull/25640#discussion_r633395213)
familiesCache := m.kubeStateMetricsCache.getCacheMapEntry(m.cacheHash)

if familiesCache.lastFetchTimestamp.IsZero() || now.Sub(familiesCache.lastFetchTimestamp) > m.Config().Period {
familiesCache.sharedFamilies, familiesCache.lastFetchErr = prometheus.GetFamilies()
familiesCache.lastFetchTimestamp = now
}

return familiesCache.sharedFamilies, familiesCache.lastFetchErr
}

func (m *module) GetSharedKubeletStats(http *helper.HTTP) ([]byte, error) {
m.kubeletStatsCache.lock.Lock()
defer m.kubeletStatsCache.lock.Unlock()

now := time.Now()

// NOTE: These entries will be never removed, this can be a leak if
// metricbeat is used to monitor clusters dynamically created.
// (https://github.com/elastic/beats/pull/25640#discussion_r633395213)
statsCache := m.kubeletStatsCache.getCacheMapEntry(m.cacheHash)

if statsCache.lastFetchTimestamp.IsZero() || now.Sub(statsCache.lastFetchTimestamp) > m.Config().Period {
statsCache.sharedStats, statsCache.lastFetchErr = http.FetchContent()
statsCache.lastFetchTimestamp = now
}

return m.familiesCache.sharedFamilies, m.familiesCache.lastFetchErr
return statsCache.sharedStats, statsCache.lastFetchErr
}

func generateCacheHash(host []string) (uint64, error) {
Expand Down
11 changes: 9 additions & 2 deletions metricbeat/module/kubernetes/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package node

import (
"fmt"
"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
)

Expand Down Expand Up @@ -60,6 +62,7 @@ type MetricSet struct {
mb.BaseMetricSet
http *helper.HTTP
enricher util.Enricher
mod k8smod.Module
}

// New create a new instance of the MetricSet
Expand All @@ -70,11 +73,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}

mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, false),
mod: mod,
}, nil
}

Expand All @@ -84,7 +91,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
m.enricher.Start()

body, err := m.http.FetchContent()
body, err := m.mod.GetSharedKubeletStats(m.http)
if err != nil {
return errors.Wrap(err, "error doing HTTP request to fetch 'node' Metricset data")

Expand Down
13 changes: 10 additions & 3 deletions metricbeat/module/kubernetes/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package pod

import (
"fmt"
"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common/kubernetes"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
)

Expand Down Expand Up @@ -59,6 +61,7 @@ type MetricSet struct {
mb.BaseMetricSet
http *helper.HTTP
enricher util.Enricher
mod k8smod.Module
}

// New create a new instance of the MetricSet
Expand All @@ -69,11 +72,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}

mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}
return &MetricSet{
BaseMetricSet: base,
http: http,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, true),
mod: mod,
}, nil
}

Expand All @@ -83,9 +90,9 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
m.enricher.Start()

body, err := m.http.FetchContent()
body, err := m.mod.GetSharedKubeletStats(m.http)
if err != nil {
return errors.Wrap(err, "error doing HTTP request to fetch 'pod' Metricset data")
return errors.Wrap(err, "error fetching shared data for 'pod' Metricset")
}

events, err := eventMapping(body, util.PerfMetrics)
Expand Down
10 changes: 9 additions & 1 deletion metricbeat/module/kubernetes/system/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package system

import (
"fmt"
"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
)

const (
Expand Down Expand Up @@ -56,6 +58,7 @@ func init() {
type MetricSet struct {
mb.BaseMetricSet
http *helper.HTTP
mod k8smod.Module
}

// New create a new instance of the MetricSet
Expand All @@ -66,17 +69,22 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}
mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}
return &MetricSet{
BaseMetricSet: base,
http: http,
mod: mod,
}, nil
}

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
body, err := m.http.FetchContent()
body, err := m.mod.GetSharedKubeletStats(m.http)
if err != nil {
return errors.Wrap(err, "error doing HTTP request to fetch 'system' Metricset data")
}
Expand Down
10 changes: 9 additions & 1 deletion metricbeat/module/kubernetes/volume/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package volume

import (
"fmt"
"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/helper"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
)

const (
Expand Down Expand Up @@ -56,6 +58,7 @@ func init() {
type MetricSet struct {
mb.BaseMetricSet
http *helper.HTTP
mod k8smod.Module
}

// New create a new instance of the MetricSet
Expand All @@ -66,17 +69,22 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}
mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}
return &MetricSet{
BaseMetricSet: base,
http: http,
mod: mod,
}, nil
}

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
body, err := m.http.FetchContent()
body, err := m.mod.GetSharedKubeletStats(m.http)
if err != nil {
return errors.Wrap(err, "error doing HTTP request to fetch 'volume' Metricset data")
}
Expand Down