Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/metricbeat/module/gcp: CrossSeriesReducer Support #40614

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add AWS OwningAccount support for cross account monitoring {issue}40570[40570] {pull}40691[40691]
- Use namespace for GetListMetrics when exists in AWS {pull}41022[41022]
- Fix http server helper SSL config. {pull}39405[39405]
- Add `reducer` field to the GCP configuration to aggregate data points from multiple time series into a single time series. {pull}40614[40614]

*Osquerybeat*

Expand Down
5 changes: 5 additions & 0 deletions metricbeat/docs/modules/gcp.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,11 @@ metricbeat.modules:
period: 1m
metrics:
- aligner: ALIGN_NONE
reducer: REDUCE_NONE
groupBy:
- "resource.labels.instance_id"
- "resource.labels.zone"
- "metric.labels.instance_name"
service: compute
metric_types:
- "instance/cpu/reserved_cores"
Expand Down
5 changes: 5 additions & 0 deletions x-pack/metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,11 @@ metricbeat.modules:
period: 1m
metrics:
- aligner: ALIGN_NONE
reducer: REDUCE_NONE
groupBy:
- "resource.labels.instance_id"
- "resource.labels.zone"
- "metric.labels.instance_name"
service: compute
metric_types:
- "instance/cpu/reserved_cores"
Expand Down
5 changes: 5 additions & 0 deletions x-pack/metricbeat/module/gcp/_meta/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@
period: 1m
metrics:
- aligner: ALIGN_NONE
reducer: REDUCE_NONE
groupBy:
- "resource.labels.instance_id"
- "resource.labels.zone"
- "metric.labels.instance_name"
service: compute
metric_types:
- "instance/cpu/reserved_cores"
Expand Down
1 change: 1 addition & 0 deletions x-pack/metricbeat/module/gcp/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ var AlignersMapToGCP = map[string]monitoringpb.Aggregation_Aligner{

const (
DefaultAligner = "ALIGN_NONE"
DefaultReducer = "REDUCE_NONE"
)

var AlignersMapToSuffix = map[string]string{
Expand Down
12 changes: 12 additions & 0 deletions x-pack/metricbeat/module/gcp/metrics/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ services under "Google Cloud metrics", but does not work for other services
(`kubernetes` aka GKE for example).
This option allow to override the default and specify an arbitrary metric prefix.

* *reducer*: A Reducer operation describes how to aggregate data points from multiple
time series into a single time series, where the value of each data point in the resulting series
is a function of all the already aligned values in the input time series. Default value is REDUCE_NONE but
other values supported are REDUCE_MEAN, REDUCE_MIN, REDUCE_MAX etc.
Please check https://cloud.google.com/monitoring/api/ref_v3/rpc/google.monitoring.v3#reducer[Reducers] for the full list.

* *groupBy*: A groupBy operation is used to group time series data based on one or more labels.
The time series data are grouped according to their values for the selected labels.
When using a `reducer` that aggregates data points from multiple time series into a single time series,
the `groupBy` operation will group the result based on the selected labels and maintain those labels in the resulting data.
It is essential to use `groupBy` with `reducer` operation to retrieve the time series results along with the selected label values.

[float]
=== Example Configuration
* `metrics` metricset is enabled to collect metrics from all zones under
Expand Down
25 changes: 16 additions & 9 deletions x-pack/metricbeat/module/gcp/metrics/metrics_requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type timeSeriesWithAligner struct {
aligner string
}

func (r *metricsRequester) Metric(ctx context.Context, serviceName, metricType string, timeInterval *monitoringpb.TimeInterval, aligner string) timeSeriesWithAligner {
func (r *metricsRequester) Metric(ctx context.Context, serviceName, metricType string, timeInterval *monitoringpb.TimeInterval, aligner string, reducer string, groupBy []string) timeSeriesWithAligner {
timeSeries := make([]*monitoringpb.TimeSeries, 0)

req := &monitoringpb.ListTimeSeriesRequest{
Expand All @@ -44,8 +44,10 @@ func (r *metricsRequester) Metric(ctx context.Context, serviceName, metricType s
View: monitoringpb.ListTimeSeriesRequest_FULL,
Filter: r.getFilterForMetric(serviceName, metricType),
Aggregation: &monitoringpb.Aggregation{
PerSeriesAligner: gcp.AlignersMapToGCP[aligner],
AlignmentPeriod: r.config.period,
PerSeriesAligner: gcp.AlignersMapToGCP[aligner],
AlignmentPeriod: r.config.period,
CrossSeriesReducer: monitoringpb.Aggregation_Reducer(monitoringpb.Aggregation_Reducer_value[reducer]),
GroupByFields: groupBy,
},
}

Expand Down Expand Up @@ -73,7 +75,7 @@ func (r *metricsRequester) Metric(ctx context.Context, serviceName, metricType s
return out
}

func (r *metricsRequester) Metrics(ctx context.Context, serviceName string, aligner string, metricsToCollect map[string]metricMeta) ([]timeSeriesWithAligner, error) {
func (r *metricsRequester) Metrics(ctx context.Context, serviceName string, aligner string, reducer string, groupBy []string, metricsToCollect map[string]metricMeta) ([]timeSeriesWithAligner, error) {
var lock sync.Mutex
var wg sync.WaitGroup
results := make([]timeSeriesWithAligner, 0)
Expand Down Expand Up @@ -136,9 +138,9 @@ func (r *metricsRequester) Metrics(ctx context.Context, serviceName string, alig
go func(mt string) {
defer wg.Done()

r.logger.Debugf("For metricType %s, metricMeta = %d, aligner = %s", mt, metricMeta, aligner)
interval, aligner := getTimeIntervalAligner(largestDelay, metricMeta.samplePeriod, r.config.period, aligner)
ts := r.Metric(ctx, serviceName, mt, interval, aligner)
r.logger.Debugf("For metricType %s, metricMeta = %d, aligner = %s, reducer=%s,groupBy= %s", mt, metricMeta, aligner, reducer, groupBy)
interval, aligner, reducer := getTimeIntervalAligner(largestDelay, metricMeta.samplePeriod, r.config.period, aligner, reducer)
ts := r.Metric(ctx, serviceName, mt, interval, aligner, reducer, groupBy)
lock.Lock()
defer lock.Unlock()
results = append(results, ts)
Expand Down Expand Up @@ -260,7 +262,7 @@ func (r *metricsRequester) getFilterForMetric(serviceName, m string) string {
}

// Returns a GCP TimeInterval based on the ingestDelay and samplePeriod from ListMetricDescriptor
func getTimeIntervalAligner(ingestDelay time.Duration, samplePeriod time.Duration, collectionPeriod *durationpb.Duration, inputAligner string) (*monitoringpb.TimeInterval, string) {
func getTimeIntervalAligner(ingestDelay time.Duration, samplePeriod time.Duration, collectionPeriod *durationpb.Duration, inputAligner string, inputReducer string) (*monitoringpb.TimeInterval, string, string) {
var startTime, endTime, currentTime time.Time
var needsAggregation bool
currentTime = time.Now().UTC()
Expand Down Expand Up @@ -297,6 +299,11 @@ func getTimeIntervalAligner(ingestDelay time.Duration, samplePeriod time.Duratio
if needsAggregation && inputAligner != "" {
updatedAligner = inputAligner
}
// Default reducer for aggregation is REDUCE_NONE if it's not given
updatedReducer := gcp.DefaultReducer
if inputAligner != "" && inputReducer != "" {
updatedReducer = inputReducer
}

return interval, updatedAligner
return interval, updatedAligner, updatedReducer
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func TestGetTimeIntervalAligner(t *testing.T) {
collectionPeriod *durationpb.Duration
inputAligner string
expectedAligner string
inputReducer string
}{
{
"test collectionPeriod equals to samplePeriod",
Expand All @@ -34,6 +35,7 @@ func TestGetTimeIntervalAligner(t *testing.T) {
},
"",
"ALIGN_NONE",
"REDUCE_NONE",
},
{
"test collectionPeriod larger than samplePeriod",
Expand All @@ -44,6 +46,7 @@ func TestGetTimeIntervalAligner(t *testing.T) {
},
"ALIGN_MEAN",
"ALIGN_MEAN",
"REDUCE_MEAN",
},
{
"test collectionPeriod smaller than samplePeriod",
Expand All @@ -54,6 +57,7 @@ func TestGetTimeIntervalAligner(t *testing.T) {
},
"ALIGN_MAX",
"ALIGN_NONE",
"REDUCE_MEAN",
},
{
"test collectionPeriod equals to samplePeriod with given aligner",
Expand All @@ -64,13 +68,14 @@ func TestGetTimeIntervalAligner(t *testing.T) {
},
"ALIGN_MEAN",
"ALIGN_NONE",
"REDUCE_MEAN",
},
}

for _, c := range cases {
t.Run(c.title, func(t *testing.T) {
_, aligner := getTimeIntervalAligner(c.ingestDelay, c.samplePeriod, c.collectionPeriod, c.inputAligner)
assert.Equal(t, c.expectedAligner, aligner)
_, aligner, reducer := getTimeIntervalAligner(c.ingestDelay, c.samplePeriod, c.collectionPeriod, c.inputAligner, c.inputReducer)
assert.Equal(t, c.expectedAligner, aligner, reducer)
})
}
}
Expand Down
8 changes: 6 additions & 2 deletions x-pack/metricbeat/module/gcp/metrics/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type metricsConfig struct {
ServiceMetricPrefix string `config:"service_metric_prefix"`
MetricTypes []string `config:"metric_types" validate:"required"`
Aligner string `config:"aligner"`
Reducer string `config:"reducer"`
GroupBy []string `config:"groupBy"`
}

// prefix returns the service metric prefix, falling back to the Google Cloud
Expand Down Expand Up @@ -200,9 +202,11 @@ func (m *MetricSet) Fetch(ctx context.Context, reporter mb.ReporterV2) (err erro
for _, v := range sdc.MetricTypes {
metricsToCollect[sdc.AddPrefixTo(v)] = m.metricsMeta[sdc.AddPrefixTo(v)]
}

if sdc.Aligner == "" && sdc.Reducer != "" {
m.Logger().Warnf("Reducer value will be reset from %s to `REDUCE_NONE` because the Aligner is missing.", sdc.Reducer)
}
// Collect time series values from Google Cloud Monitoring API
timeSeries, err := m.requester.Metrics(ctx, sdc.ServiceName, sdc.Aligner, metricsToCollect)
timeSeries, err := m.requester.Metrics(ctx, sdc.ServiceName, sdc.Aligner, sdc.Reducer, sdc.GroupBy, metricsToCollect)
if err != nil {
err = fmt.Errorf("error trying to get metrics for project '%s' and zone '%s' or region '%s': %w", m.config.ProjectID, m.config.Zone, m.config.Region, err)
m.Logger().Error(err)
Expand Down
8 changes: 4 additions & 4 deletions x-pack/metricbeat/module/gcp/metrics/metricset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ package metrics
import "testing"

var fakeMetricsConfig = []metricsConfig{
{"billing", "", []string{}, ""},
{"billing", "foobar/", []string{}, ""},
{"billing", "foobar", []string{}, ""},
{"billing", "", []string{}, "", "", []string{}},
{"billing", "foobar/", []string{}, "", "", []string{}},
{"billing", "foobar", []string{}, "", "", []string{}},
}

func Test_metricsConfig_AddPrefixTo(t *testing.T) {
Expand All @@ -36,7 +36,7 @@ func Test_metricsConfig_AddPrefixTo(t *testing.T) {
},
{
name: "service metric prefix override (w/ dot)",
fields: metricsConfig{"billing", "foo.bar/", []string{}, ""},
fields: metricsConfig{"billing", "foo.bar/", []string{}, "", "", []string{}},
want: "foo.bar/" + metric,
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

func TestCleanMetricNameString(t *testing.T) {
computeMC := metricsConfig{"compute", "", []string{}, ""}
computeMC := metricsConfig{"compute", "", []string{}, "", "", []string{}}

cases := []struct {
title string
Expand Down
5 changes: 5 additions & 0 deletions x-pack/metricbeat/modules.d/gcp.yml.disabled
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
period: 1m
metrics:
- aligner: ALIGN_NONE
reducer: REDUCE_NONE
groupBy:
- "resource.labels.instance_id"
- "resource.labels.zone"
- "metric.labels.instance_name"
service: compute
metric_types:
- "instance/cpu/reserved_cores"
Expand Down
Loading