Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor state_* metricsets to share response from endpoint #25640

Merged
merged 17 commits into from
May 18, 2021
20 changes: 15 additions & 5 deletions metricbeat/helper/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type Prometheus interface {

GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapStr, error)

GetSharedProcessedMetrics(families []*dto.MetricFamily, mapping *MetricsMapping) ([]common.MapStr, error)

ReportProcessedMetrics(mapping *MetricsMapping, r mb.ReporterV2) error
}

Expand Down Expand Up @@ -139,11 +141,7 @@ type MetricsMapping struct {
ExtraFields map[string]string
}

func (p *prometheus) GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapStr, error) {
families, err := p.GetFamilies()
if err != nil {
return nil, err
}
func (p *prometheus) processedMetrics(families []*dto.MetricFamily, mapping *MetricsMapping) ([]common.MapStr, error) {
ChrsMark marked this conversation as resolved.
Show resolved Hide resolved

eventsMap := map[string]common.MapStr{}
infoMetrics := []*infoMetricData{}
Expand Down Expand Up @@ -260,6 +258,18 @@ func (p *prometheus) GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapS
return events, nil
}

func (p *prometheus) GetSharedProcessedMetrics(families []*dto.MetricFamily, mapping *MetricsMapping) ([]common.MapStr, error) {
return p.processedMetrics(families, mapping)
}

func (p *prometheus) GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapStr, error) {
families, err := p.GetFamilies()
if err != nil {
return nil, err
}
return p.processedMetrics(families, mapping)
}

// infoMetricData keeps data about an infoMetric
type infoMetricData struct {
Labels common.MapStr
Expand Down
135 changes: 135 additions & 0 deletions metricbeat/module/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package kubernetes

import (
"sync"
"time"

dto "github.com/prometheus/client_model/go"

"github.com/elastic/beats/v7/libbeat/common/atomic"
p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
)

func init() {
// Register the ModuleFactory function for the "kubernetes" module.
if err := mb.Registry.AddModule("kubernetes", ModuleBuilder()); err != nil {
panic(err)
}
}

type Module interface {
mb.Module
StartSharedFetcher(prometheus p.Prometheus, period time.Duration)
GetSharedFamilies() ([]*dto.MetricFamily, error)
}

type module struct {
mb.BaseModule
lock sync.Mutex

prometheus p.Prometheus

families []*dto.MetricFamily
err error
running atomic.Bool
stateMetricsPeriod time.Duration
}

func ModuleBuilder() func(base mb.BaseModule) (mb.Module, error) {
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
return func(base mb.BaseModule) (mb.Module, error) {
m := module{
BaseModule: base,
}
return &m, nil
}
}

func (m *module) StartSharedFetcher(prometheus p.Prometheus, period time.Duration) {
if m.prometheus == nil {
m.prometheus = prometheus
}
go m.runStateMetricsFetcher(period)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a race condition here. If this function is called by two metricsets at the same time you could end up with several fetchers running.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, what if different metricsets have a different period?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, what if different metricsets have a different period?

Period is defined at the module level, so all metricsets will have the same period.

Copy link
Member Author

@ChrsMark ChrsMark May 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both cases should be handled by https://github.com/elastic/beats/pull/25640/files#diff-9933f42aec81125910c9923f83fa89c9a76918aac6e3bd5a68c430aeeab91084R98-R106.

Taking into consideration that period is set on Module's level then I think that different period's check is redundant. Metricsets will not share the input if they are configured in different Modules like:

- module: kubernetes
  metricsets:
    - state_pod
- module: kubernetes
  metricsets:
    - state_container

So the period adjustment at https://github.com/elastic/beats/pull/25640/files#diff-9933f42aec81125910c9923f83fa89c9a76918aac6e3bd5a68c430aeeab91084R100 could be removed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

- module: kubernetes
  metricsets:
    - state_pod
- module: kubernetes
  metricsets:
    - state_container

Take into account that this configuration will instantiate two different modules, and they won't reuse the shared families. If we want/need to supports configs like these ones, this approach won't work. To reuse the families we would need one of:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you investigate what Agent is doing with a config like that? I think doing this at the module level is good enough if the Agent handles it correctly, and removes the complexity of indexing by endpoint, metrics path and other parameters (including the security part).

ChrsMark marked this conversation as resolved.
Show resolved Hide resolved
}

func (m *module) SetSharedError(err error) {
m.lock.Lock()
m.err = err
m.lock.Unlock()
}

func (m *module) SetSharedFamilies(families []*dto.MetricFamily) {
m.lock.Lock()
m.families = families
m.err = nil
m.lock.Unlock()
}

func (m *module) GetSharedFamilies() ([]*dto.MetricFamily, error) {
m.lock.Lock()
defer m.lock.Unlock()
if m.err != nil {
return nil, m.err
}
return m.families, nil
}

// run ensures that the module is running with the passed subscription
func (m *module) runStateMetricsFetcher(period time.Duration) {
var ticker *time.Ticker
quit := make(chan bool)
if !m.running.CAS(false, true) {
// Module is already running, just check if there is a smaller period to adjust.
if period < m.stateMetricsPeriod {
m.stateMetricsPeriod = period
ticker.Stop()
ticker = time.NewTicker(period)
}
return
}
ticker = time.NewTicker(period)

defer func() { m.running.Store(false) }()

families, err := m.prometheus.GetFamilies()
if err != nil {
// communicate the error to subscribed metricsets
m.SetSharedError(err)
} else {
m.SetSharedFamilies(families)
}

for {
select {
case <-ticker.C:
families, err := m.prometheus.GetFamilies()
if err != nil {
// communicate the error to subscribed metricsets
m.SetSharedError(err)
} else {
m.SetSharedFamilies(families)
}
case <-quit:
ticker.Stop()
return
// quit properly
}
}
}
24 changes: 21 additions & 3 deletions metricbeat/module/kubernetes/state_container/state_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package state_container

import (
"fmt"
"strings"

"github.com/pkg/errors"
Expand All @@ -26,6 +27,7 @@ import (
p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
)

Expand Down Expand Up @@ -87,8 +89,10 @@ func init() {
// multiple fetch calls.
type MetricSet struct {
mb.BaseMetricSet
prometheus p.Prometheus
enricher util.Enricher
prometheus p.Prometheus
enricher util.Enricher
mod k8smod.Module
sharedFetcherStarted bool
}

// New create a new instance of the MetricSet
Expand All @@ -99,10 +103,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}
mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewContainerMetadataEnricher(base, false),
mod: mod,
}, nil
}

Expand All @@ -112,7 +121,16 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
m.enricher.Start()

events, err := m.prometheus.GetProcessedMetrics(mapping)
if !m.sharedFetcherStarted {
m.mod.StartSharedFetcher(m.prometheus, m.Module().Config().Period)
m.sharedFetcherStarted = true
}

families, err := m.mod.GetSharedFamilies()
if err != nil {
return errors.Wrap(err, "error getting families")
}
events, err := m.prometheus.GetSharedProcessedMetrics(families, mapping)
if err != nil {
return errors.Wrap(err, "error getting event")
}
Expand Down
28 changes: 24 additions & 4 deletions metricbeat/module/kubernetes/state_pod/state_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package state_pod

import (
"fmt"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/kubernetes"
p "github.com/elastic/beats/v7/metricbeat/helper/prometheus"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes"
"github.com/elastic/beats/v7/metricbeat/module/kubernetes/util"
)

Expand Down Expand Up @@ -70,8 +73,10 @@ func init() {
// multiple fetch calls.
type MetricSet struct {
mb.BaseMetricSet
prometheus p.Prometheus
enricher util.Enricher
prometheus p.Prometheus
enricher util.Enricher
mod k8smod.Module
sharedFetcherStarted bool
}

// New create a new instance of the MetricSet
Expand All @@ -82,11 +87,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
if err != nil {
return nil, err
}

mod, ok := base.Module().(k8smod.Module)
if !ok {
return nil, fmt.Errorf("must be child of kubernetes module")
}
return &MetricSet{
BaseMetricSet: base,
prometheus: prometheus,
enricher: util.NewResourceMetadataEnricher(base, &kubernetes.Pod{}, false),
mod: mod,
}, nil
}

Expand All @@ -96,7 +105,18 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
m.enricher.Start()

events, err := m.prometheus.GetProcessedMetrics(mapping)
if !m.sharedFetcherStarted {
m.mod.StartSharedFetcher(m.prometheus, m.Module().Config().Period)
m.sharedFetcherStarted = true
}

families, err := m.mod.GetSharedFamilies()
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
return
}
events, err := m.prometheus.GetSharedProcessedMetrics(families, mapping)
if err != nil {
m.Logger().Error(err)
reporter.Error(err)
Expand Down