Skip to content

Commit

Permalink
[awsemfexporter] Implement metric filtering and dimension setting (op…
Browse files Browse the repository at this point in the history
…en-telemetry#1503)

* Create MetricDeclaration struct

* Implement dimension dedup logic when adding rolledup dimensions (open-telemetry#6)

* Implement dimension dedup logic when adding rolledup dimensions

* Remove duplicated dimensions in metric declaration

* Create benchmark for filtering with metric declarations and use assertCwMeasurementEqual for tests

* Move helper test code to the top of file

* Update dimRollup test

* Aggregate dimensions and perform dedup

* Minor code change

* Fix test failure from merging new changes
  • Loading branch information
kohrapha authored and shaochengwang committed Nov 13, 2020
1 parent cb4270e commit 0217d55
Show file tree
Hide file tree
Showing 10 changed files with 1,718 additions and 465 deletions.
10 changes: 9 additions & 1 deletion exporter/awsemfexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,15 @@ The following exporter configuration parameters are supported.
| `max_retries` | Maximum number of retries before abandoning an attempt to post data. | 1 |
| `dimension_rollup_option`| DimensionRollupOption is the option for metrics dimension rollup. Three options are available. |"ZeroAndSingleDimensionRollup" (Enable both zero dimension rollup and single dimension rollup)|
| `resource_to_telemetry_conversion` | "resource_to_telemetry_conversion" is the option for converting resource attributes to telemetry attributes. It has only one config onption- `enabled`. For metrics, if `enabled=true`, all the resource attributes will be converted to metric labels by default. See `Resource Attributes to Metric Labels` section below for examples. | `enabled=false` |

| [`metric_declarations`](#metric_declaration) | List of rules for filtering exported metrics and their dimensions. | [ ] |

### <metric_declaration>
A metric_declaration section characterizes a rule to be used to set dimensions for exported metrics, filtered by the incoming metrics' metric names.
| Name | Description | Default |
| :---------------- | :--------------------------------------------------------------------- | ------- |
| `dimensions` | List of dimension sets to be exported. | [[ ]] |
| `metric_name_selectors` | List of regex strings to filter metric names by. | [ ] |


## AWS Credential Configuration

Expand Down
2 changes: 2 additions & 0 deletions exporter/awsemfexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type Config struct {
// "SingleDimensionRollupOnly" - Enable single dimension rollup
// "NoDimensionRollup" - No dimension rollup (only keep original metrics which contain all dimensions)
DimensionRollupOption string `mapstructure:"dimension_rollup_option"`
// MetricDeclarations is a list of rules to be used to set dimensions for exported metrics.
MetricDeclarations []*MetricDeclaration `mapstructure:"metric_declarations"`

// ResourceToTelemetrySettings is the option for converting resource attrihutes to telemetry attributes.
// "Enabled" - A boolean field to enable/disable this option. Default is `false`.
Expand Down
2 changes: 2 additions & 0 deletions exporter/awsemfexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func TestLoadConfig(t *testing.T) {
Region: "us-west-2",
RoleARN: "arn:aws:iam::123456789:role/monitoring-EKS-NodeInstanceRole",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
MetricDeclarations: []*MetricDeclaration{},
})

r2 := cfg.Exporters["awsemf/resource_attr_to_label"].(*Config)
Expand All @@ -75,5 +76,6 @@ func TestLoadConfig(t *testing.T) {
RoleARN: "",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
ResourceToTelemetrySettings: exporterhelper.ResourceToTelemetrySettings{Enabled: true},
MetricDeclarations: []*MetricDeclaration{},
})
}
15 changes: 14 additions & 1 deletion exporter/awsemfexporter/emf_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,19 @@ func New(
svcStructuredLog := NewCloudWatchLogsClient(logger, awsConfig, session)
collectorIdentifier, _ := uuid.NewRandom()

// Initialize metric declarations and filter out invalid ones
emfConfig := config.(*Config)
var validDeclarations []*MetricDeclaration
for _, declaration := range emfConfig.MetricDeclarations {
err := declaration.Init(logger)
if err != nil {
logger.Warn("Dropped metric declaration. Error: " + err.Error() + ".")
} else {
validDeclarations = append(validDeclarations, declaration)
}
}
emfConfig.MetricDeclarations = validDeclarations

emfExporter := &emfExporter{
svcStructuredLog: svcStructuredLog,
config: config,
Expand Down Expand Up @@ -212,7 +225,7 @@ func generateLogEventFromMetric(metric pdata.Metrics, config *Config) ([]*LogEve
cwMetricLists = append(cwMetricLists, cwm...)
}

return TranslateCWMetricToEMF(cwMetricLists), totalDroppedMetrics, namespace
return TranslateCWMetricToEMF(cwMetricLists, config.logger), totalDroppedMetrics, namespace
}

func wrapErrorIfBadRequest(err *error) error {
Expand Down
57 changes: 57 additions & 0 deletions exporter/awsemfexporter/emf_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/translator/internaldata"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
)

func init() {
Expand Down Expand Up @@ -236,6 +238,61 @@ func TestNewExporterWithoutConfig(t *testing.T) {
assert.NotNil(t, expCfg.logger)
}

func TestNewExporterWithMetricDeclarations(t *testing.T) {
factory := NewFactory()
expCfg := factory.CreateDefaultConfig().(*Config)
expCfg.Region = "us-west-2"
expCfg.MaxRetries = defaultRetryCount
expCfg.LogGroupName = "test-logGroupName"
expCfg.LogStreamName = "test-logStreamName"
mds := []*MetricDeclaration{
{
MetricNameSelectors: []string{"a", "b"},
},
{
MetricNameSelectors: []string{"c", "d"},
},
{
MetricNameSelectors: nil,
},
{
Dimensions: [][]string{
{"foo"},
{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k"},
},
MetricNameSelectors: []string{"a"},
},
}
expCfg.MetricDeclarations = mds

obs, logs := observer.New(zap.WarnLevel)
logger := zap.New(obs)
exp, err := New(expCfg, component.ExporterCreateParams{Logger: logger})
assert.Nil(t, err)
assert.NotNil(t, exp)

emfExporter := exp.(*emfExporter)
config := emfExporter.config.(*Config)
// Invalid metric declaration should be filtered out
assert.Equal(t, 3, len(config.MetricDeclarations))
// Invalid dimensions (> 10 dims) should be filtered out
assert.Equal(t, 1, len(config.MetricDeclarations[2].Dimensions))

// Test output warning logs
expectedLogs := []observer.LoggedEntry{
{
Entry: zapcore.Entry{Level: zap.WarnLevel, Message: "Dropped metric declaration. Error: invalid metric declaration: no metric name selectors defined."},
Context: []zapcore.Field{},
},
{
Entry: zapcore.Entry{Level: zap.WarnLevel, Message: "Dropped dimension set: > 10 dimensions specified."},
Context: []zapcore.Field{zap.String("dimensions", "a,b,c,d,e,f,g,h,i,j,k")},
},
}
assert.Equal(t, 2, logs.Len())
assert.Equal(t, expectedLogs, logs.AllUntimed())
}

func TestNewExporterWithoutSession(t *testing.T) {
exp, err := New(nil, component.ExporterCreateParams{Logger: zap.NewNop()})
assert.NotNil(t, err)
Expand Down
1 change: 1 addition & 0 deletions exporter/awsemfexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func createDefaultConfig() configmodels.Exporter {
Region: "",
RoleARN: "",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
MetricDeclarations: make([]*MetricDeclaration, 0),
logger: nil,
}
}
Expand Down
171 changes: 171 additions & 0 deletions exporter/awsemfexporter/metric_declaration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package awsemfexporter

import (
"errors"
"regexp"
"sort"
"strings"

"go.opentelemetry.io/collector/consumer/pdata"
"go.uber.org/zap"
)

// MetricDeclaration characterizes a rule to be used to set dimensions for certain
// incoming metrics, filtered by their metric names.
type MetricDeclaration struct {
// Dimensions is a list of dimension sets (which are lists of dimension names) to be
// included in exported metrics. If the metric does not contain any of the specified
// dimensions, the metric would be dropped (will only show up in logs).
Dimensions [][]string `mapstructure:"dimensions"`
// MetricNameSelectors is a list of regex strings to be matched against metric names
// to determine which metrics should be included with this metric declaration rule.
MetricNameSelectors []string `mapstructure:"metric_name_selectors"`

// metricRegexList is a list of compiled regexes for metric name selectors.
metricRegexList []*regexp.Regexp
}

// Remove duplicated entries from dimension set.
func dedupDimensionSet(dimensions []string) (deduped []string, hasDuplicate bool) {
seen := make(map[string]bool, len(dimensions))
for _, v := range dimensions {
seen[v] = true
}
hasDuplicate = (len(seen) < len(dimensions))
if !hasDuplicate {
deduped = dimensions
return
}
deduped = make([]string, len(seen))
idx := 0
for dim := range seen {
deduped[idx] = dim
idx++
}
return
}

// Init initializes the MetricDeclaration struct. Performs validation and compiles
// regex strings. Dimensions are deduped and sorted.
func (m *MetricDeclaration) Init(logger *zap.Logger) (err error) {
// Return error if no metric name selectors are defined
if len(m.MetricNameSelectors) == 0 {
return errors.New("invalid metric declaration: no metric name selectors defined")
}

// Filter out duplicate dimension sets and those with more than 10 elements
validDims := make([][]string, 0, len(m.Dimensions))
seen := make(map[string]bool, len(m.Dimensions))
for _, dimSet := range m.Dimensions {
concatenatedDims := strings.Join(dimSet, ",")
if len(dimSet) > 10 {
logger.Warn("Dropped dimension set: > 10 dimensions specified.", zap.String("dimensions", concatenatedDims))
continue
}

// Dedup dimensions within dimension set
dedupedDims, hasDuplicate := dedupDimensionSet(dimSet)
if hasDuplicate {
logger.Warn("Removed duplicates from dimension set.", zap.String("dimensions", concatenatedDims))
}

// Sort dimensions
sort.Strings(dedupedDims)

// Dedup dimension sets
key := strings.Join(dedupedDims, ",")
if _, ok := seen[key]; ok {
logger.Warn("Dropped dimension set: duplicated dimension set.", zap.String("dimensions", concatenatedDims))
continue
}
seen[key] = true
validDims = append(validDims, dedupedDims)
}
m.Dimensions = validDims

m.metricRegexList = make([]*regexp.Regexp, len(m.MetricNameSelectors))
for i, selector := range m.MetricNameSelectors {
m.metricRegexList[i] = regexp.MustCompile(selector)
}
return
}

// Matches returns true if the given OTLP Metric's name matches any of the Metric
// Declaration's metric name selectors.
func (m *MetricDeclaration) Matches(metric *pdata.Metric) bool {
for _, regex := range m.metricRegexList {
if regex.MatchString(metric.Name()) {
return true
}
}
return false
}

// ExtractDimensions extracts dimensions within the MetricDeclaration that only
// contains labels found in `labels`. Returned order of dimensions are preserved.
func (m *MetricDeclaration) ExtractDimensions(labels map[string]string) (dimensions [][]string) {
for _, dimensionSet := range m.Dimensions {
if len(dimensionSet) == 0 {
continue
}
includeSet := true
for _, dim := range dimensionSet {
if _, ok := labels[dim]; !ok {
includeSet = false
break
}
}
if includeSet {
dimensions = append(dimensions, dimensionSet)
}
}
return
}

// processMetricDeclarations processes a list of MetricDeclarations and creates a
// list of dimension sets that matches the given `metric`. This list is then aggregated
// together with the rolled-up dimensions. Returned dimension sets
// are deduped and the dimensions in each dimension set are sorted.
// Prerequisite:
// 1. metricDeclarations' dimensions are sorted.
func processMetricDeclarations(metricDeclarations []*MetricDeclaration, metric *pdata.Metric,
labels map[string]string, rolledUpDimensions [][]string) (dimensions [][]string) {
seen := make(map[string]bool)
addDimSet := func(dimSet []string) {
key := strings.Join(dimSet, ",")
// Only add dimension set if not a duplicate
if _, ok := seen[key]; !ok {
dimensions = append(dimensions, dimSet)
seen[key] = true
}
}
// Extract and append dimensions from metric declarations
for _, m := range metricDeclarations {
if m.Matches(metric) {
extractedDims := m.ExtractDimensions(labels)
for _, dimSet := range extractedDims {
addDimSet(dimSet)
}
}
}
// Add on rolled-up dimensions
for _, dimSet := range rolledUpDimensions {
sort.Strings(dimSet)
addDimSet(dimSet)
}
return
}
Loading

0 comments on commit 0217d55

Please sign in to comment.