Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor state_* metricsets to share response from endpoint #25640

Merged
merged 17 commits into from
May 18, 2021
Prev Previous commit
Next Next commit
Tune all state_* metrics
Signed-off-by: chrismark <chrismarkou92@gmail.com>
  • Loading branch information
ChrsMark committed May 17, 2021
commit 7656de5d78b96000029b09e16868a034db0e3401
4 changes: 2 additions & 2 deletions metricbeat/module/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func init() {

type Module interface {
mb.Module
GetSharedFamilies(prometheus p.Prometheus, ms string) ([]*dto.MetricFamily, error)
GetSharedFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error)
}

type familiesCache struct {
Expand Down Expand Up @@ -71,7 +71,7 @@ func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) {
}
}

func (m *module) GetSharedFamilies(prometheus p.Prometheus, ms string) ([]*dto.MetricFamily, error) {
func (m *module) GetSharedFamilies(prometheus p.Prometheus) ([]*dto.MetricFamily, error) {
now := time.Now()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be done after getting the lock? If not, if a call to GetFamilies takes more than the period, all waiting metricsets will request the families again instead of reusing the ones just received, and the waiting metricset that end up requesting the families again will store an "old" timestamp.


hash := generateCacheHash(m.Config().Hosts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
m.enricher.Start()

families, err := m.mod.GetSharedFamilies(m.prometheus, "state_container")
families, err := m.mod.GetSharedFamilies(m.prometheus)
if err != nil {
return errors.Wrap(err, "error getting families")
}
Expand Down
16 changes: 15 additions & 1 deletion metricbeat/module/kubernetes/state_cronjob/state_cronjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package state_cronjob

import (
"fmt"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common"
p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
)

func init() {
Expand All @@ -40,6 +43,7 @@ type CronJobMetricSet struct {
mb.BaseMetricSet
prometheus p.Prometheus
mapping *p.MetricsMapping
mod k8smod.Module
}

// NewCronJobMetricSet returns a prometheus based metricset for CronJobs
Expand All @@ -49,9 +53,15 @@ func NewCronJobMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
return nil, err
}

mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}

return &CronJobMetricSet{
BaseMetricSet: base,
prometheus: prometheus,
mod: mod,
mapping: &p.MetricsMapping{
Metrics: map[string]p.MetricMap{
"kube_cronjob_info": p.InfoMetric(),
Expand All @@ -77,7 +87,11 @@ func NewCronJobMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
//
// Copied from other kube state metrics.
func (m *CronJobMetricSet) Fetch(reporter mb.ReporterV2) error {
events, err := m.prometheus.GetProcessedMetrics(m.mapping)
families, err := m.mod.GetSharedFamilies(m.prometheus)
if err != nil {
return errors.Wrap(err, "error getting family metrics")
}
events, err := m.prometheus.ProcessMetrics(families, m.mapping)
if err != nil {
return errors.Wrap(err, "error getting metrics")
}
Expand Down
17 changes: 16 additions & 1 deletion metricbeat/module/kubernetes/state_daemonset/state_daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package state_daemonset

import (
"fmt"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/kubernetes"
p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
)

Expand Down Expand Up @@ -69,6 +72,7 @@ type MetricSet struct {
mb.BaseMetricSet
prometheus p.Prometheus
enricher util.Enricher
mod k8smod.Module
}

// New create a new instance of the MetricSet
Expand All @@ -79,10 +83,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}
mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.ReplicaSet{}, false),
mod: mod,
}, nil
}

Expand All @@ -92,7 +101,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
m.enricher.Start()

events, err := m.prometheus.GetProcessedMetrics(mapping)
families, err := m.mod.GetSharedFamilies(m.prometheus)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
return
}
events, err := m.prometheus.ProcessMetrics(families, mapping)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package state_deployment

import (
"fmt"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/kubernetes"
p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
)

Expand Down Expand Up @@ -70,6 +73,7 @@ type MetricSet struct {
mb.BaseMetricSet
prometheus p.Prometheus
enricher util.Enricher
mod k8smod.Module
}

// New create a new instance of the MetricSet
Expand All @@ -80,10 +84,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}
mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Deployment{}, false),
mod: mod,
}, nil
}

Expand All @@ -93,7 +102,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
m.enricher.Start()

events, err := m.prometheus.GetProcessedMetrics(mapping)
families, err := m.mod.GetSharedFamilies(m.prometheus)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
return
}
events, err := m.prometheus.ProcessMetrics(families, mapping)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
16 changes: 14 additions & 2 deletions metricbeat/module/kubernetes/state_node/state_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
package state_node

import (
"fmt"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/common/kubernetes"
p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
)

Expand Down Expand Up @@ -81,6 +84,7 @@ type MetricSet struct {
mb.BaseMetricSet
prometheus p.Prometheus
enricher util.Enricher
mod k8smod.Module
}

// New create a new instance of the MetricSet
Expand All @@ -91,11 +95,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}

mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Node{}, false),
mod: mod,
}, nil
}

Expand All @@ -105,7 +113,11 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
m.enricher.Start()

events, err := m.prometheus.GetProcessedMetrics(mapping)
families, err := m.mod.GetSharedFamilies(m.prometheus)
if err != nil {
return errors.Wrap(err, "error doing HTTP request to fetch 'state_node' Metricset data")
}
events, err := m.prometheus.ProcessMetrics(families, mapping)
if err != nil {
return errors.Wrap(err, "error doing HTTP request to fetch 'state_node' Metricset data")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package state_persistentvolume

import (
"fmt"

p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
)

func init() {
Expand All @@ -34,6 +37,7 @@ type PersistentVolumeMetricSet struct {
mb.BaseMetricSet
prometheus p.Prometheus
mapping *p.MetricsMapping
mod k8smod.Module
}

// NewPersistentVolumeMetricSet returns a prometheus based metricset for Persistent Volumes
Expand All @@ -42,10 +46,14 @@ func NewPersistentVolumeMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}

mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}
return &PersistentVolumeMetricSet{
BaseMetricSet: base,
prometheus: prometheus,
mod: mod,
mapping: &p.MetricsMapping{
Metrics: map[string]p.MetricMap{
"kube_persistentvolume_capacity_bytes": p.Metric("capacity.bytes"),
Expand All @@ -69,7 +77,13 @@ func NewPersistentVolumeMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch prometheus metrics and treats those prefixed by mb.ModuleDataKey as
// module rooted fields at the event that gets reported
func (m *PersistentVolumeMetricSet) Fetch(reporter mb.ReporterV2) {
events, err := m.prometheus.GetProcessedMetrics(m.mapping)
families, err := m.mod.GetSharedFamilies(m.prometheus)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
return
}
events, err := m.prometheus.ProcessMetrics(families, m.mapping)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package state_persistentvolumeclaim

import (
"fmt"

p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
)

func init() {
Expand All @@ -34,6 +37,7 @@ type persistentvolumeclaimMetricSet struct {
mb.BaseMetricSet
prometheus p.Prometheus
mapping *p.MetricsMapping
mod k8smod.Module
}

// NewpersistentvolumeclaimMetricSet returns a prometheus based metricset for Persistent Volumes
Expand All @@ -42,10 +46,14 @@ func NewpersistentvolumeclaimMetricSet(base mb.BaseMetricSet) (mb.MetricSet, err
if err != nil {
return nil, err
}

mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}
return &persistentvolumeclaimMetricSet{
BaseMetricSet: base,
prometheus: prometheus,
mod: mod,
mapping: &p.MetricsMapping{
Metrics: map[string]p.MetricMap{

Expand Down Expand Up @@ -73,7 +81,12 @@ func NewpersistentvolumeclaimMetricSet(base mb.BaseMetricSet) (mb.MetricSet, err
// Fetch prometheus metrics and treats those prefixed by mb.ModuleDataKey as
// module rooted fields at the event that gets reported
func (m *persistentvolumeclaimMetricSet) Fetch(reporter mb.ReporterV2) error {
events, err := m.prometheus.GetProcessedMetrics(m.mapping)

families, err := m.mod.GetSharedFamilies(m.prometheus)
if err != nil {
return err
}
events, err := m.prometheus.ProcessMetrics(families, m.mapping)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/kubernetes/state_pod/state_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
m.enricher.Start()

families, err := m.mod.GetSharedFamilies(m.prometheus, "state_pod")
families, err := m.mod.GetSharedFamilies(m.prometheus)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down
Loading