Skip to content

Commit

Permalink
Aggregate dimensions and perform dedup
Browse files Browse the repository at this point in the history
  • Loading branch information
kohrapha committed Nov 6, 2020
1 parent b662599 commit e6c1ff3
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 250 deletions.
36 changes: 28 additions & 8 deletions exporter/awsemfexporter/metric_declaration.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func dedupDimensionSet(dimensions []string) (deduped []string, hasDuplicate bool
}

// Init initializes the MetricDeclaration struct. Performs validation and compiles
// regex strings.
// regex strings. Dimensions are deduped and sorted.
func (m *MetricDeclaration) Init(logger *zap.Logger) (err error) {
// Return error if no metric name selectors are defined
if len(m.MetricNameSelectors) == 0 {
Expand Down Expand Up @@ -116,7 +116,7 @@ func (m *MetricDeclaration) Matches(metric *pdata.Metric) bool {
}

// ExtractDimensions extracts dimensions within the MetricDeclaration that only
// contains labels found in `labels`.
// contains labels found in `labels`. Returned order of dimensions are preserved.
func (m *MetricDeclaration) ExtractDimensions(labels map[string]string) (dimensions [][]string) {
for _, dimensionSet := range m.Dimensions {
if len(dimensionSet) == 0 {
Expand All @@ -136,16 +136,36 @@ func (m *MetricDeclaration) ExtractDimensions(labels map[string]string) (dimensi
return
}

// processMetricDeclarations processes a list of MetricDeclarations and returns a
// list of dimensions that matches the given `metric`.
func processMetricDeclarations(metricDeclarations []*MetricDeclaration, metric *pdata.Metric, labels map[string]string) (dimensionsList [][][]string) {
// processMetricDeclarations processes a list of MetricDeclarations and creates a
// list of dimension sets that matches the given `metric`. This list is then aggregated
// together with the rolled-up dimensions. Returned dimension sets
// are deduped and the dimensions in each dimension set are sorted.
// Prerequisite:
// 1. metricDeclarations' dimensions are sorted.
func processMetricDeclarations(metricDeclarations []*MetricDeclaration, metric *pdata.Metric,
labels map[string]string, rolledUpDimensions [][]string) (dimensions [][]string) {
seen := make(map[string]bool)
addDimSet := func(dimSet []string) {
key := strings.Join(dimSet, ",")
// Only add dimension set if not a duplicate
if _, ok := seen[key]; !ok {
dimensions = append(dimensions, dimSet)
seen[key] = true
}
}
// Extract and append dimensions from metric declarations
for _, m := range metricDeclarations {
if m.Matches(metric) {
dimensions := m.ExtractDimensions(labels)
if len(dimensions) > 0 {
dimensionsList = append(dimensionsList, dimensions)
extractedDims := m.ExtractDimensions(labels)
for _, dimSet := range extractedDims {
addDimSet(dimSet)
}
}
}
// Add on rolled-up dimensions
for _, dimSet := range rolledUpDimensions {
sort.Strings(dimSet)
addDimSet(dimSet)
}
return
}
109 changes: 85 additions & 24 deletions exporter/awsemfexporter/metric_declaration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func TestExtractDimensions(t *testing.T) {
err := m.Init(logger)
assert.Nil(t, err)
dimensions := m.ExtractDimensions(tc.labels)
assertDimsEqual(t, tc.extractedDimensions, dimensions)
assert.Equal(t, tc.extractedDimensions, dimensions)
})
}
}
Expand All @@ -238,14 +238,14 @@ func TestProcessMetricDeclarations(t *testing.T) {
metricDeclarations := []*MetricDeclaration{
{
Dimensions: [][]string{{"dim1", "dim2"}},
MetricNameSelectors: []string{"a", "b"},
MetricNameSelectors: []string{"a", "b", "c"},
},
{
Dimensions: [][]string{{"dim1"}},
MetricNameSelectors: []string{"aa", "b"},
},
{
Dimensions: [][]string{{"dim1", "dim2"}, {"dim1"}},
Dimensions: [][]string{{"dim2", "dim1"}, {"dim1"}},
MetricNameSelectors: []string{"a"},
},
}
Expand All @@ -255,43 +255,87 @@ func TestProcessMetricDeclarations(t *testing.T) {
assert.Nil(t, err)
}
testCases := []struct {
testName string
metricName string
labels map[string]string
dimensionsList [][][]string
testName string
metricName string
labels map[string]string
rollUpDims [][]string
expectedDims [][]string
}{
{
"Matching multiple dimensions 1",
"a",
"Matching single declaration",
"c",
map[string]string{
"dim1": "foo",
"dim2": "bar",
},
[][][]string{
{{"dim1", "dim2"}},
{{"dim1", "dim2"}, {"dim1"}},
nil,
[][]string{
{"dim1", "dim2"},
},
},
{
"Match single dimension set",
"a",
map[string]string{
"dim1": "foo",
},
nil,
[][]string{
{"dim1"},
},
},
{
"Matching multiple dimensions 2",
"Match single dimension set w/ rolled-up dims",
"a",
map[string]string{
"dim1": "foo",
"dim3": "car",
},
[][]string{{"dim1"}, {"dim3"}},
[][]string{
{"dim1"},
{"dim3"},
},
},
{
"Matching multiple declarations",
"b",
map[string]string{
"dim1": "foo",
"dim2": "bar",
},
[][][]string{
{{"dim1", "dim2"}},
{{"dim1"}},
nil,
[][]string{
{"dim1", "dim2"},
{"dim1"},
},
},
{
"Match single dimension set",
"Matching multiple declarations w/ duplicate",
"a",
map[string]string{
"dim1": "foo",
"dim2": "bar",
},
[][][]string{
{{"dim1"}},
nil,
[][]string{
{"dim1", "dim2"},
{"dim1"},
},
},
{
"Matching multiple declarations w/ duplicate + rolled-up dims",
"a",
map[string]string{
"dim1": "foo",
"dim2": "bar",
"dim3": "car",
},
[][]string{{"dim2", "dim1"}, {"dim3"}},
[][]string{
{"dim1", "dim2"},
{"dim1"},
{"dim3"},
},
},
{
Expand All @@ -301,6 +345,16 @@ func TestProcessMetricDeclarations(t *testing.T) {
"dim2": "bar",
},
nil,
nil,
},
{
"No matching dimension set w/ rolled-up dims",
"a",
map[string]string{
"dim2": "bar",
},
[][]string{{"dim2"}},
[][]string{{"dim2"}},
},
{
"No matching metric name",
Expand All @@ -309,6 +363,16 @@ func TestProcessMetricDeclarations(t *testing.T) {
"dim1": "foo",
},
nil,
nil,
},
{
"No matching metric name w/ rolled-up dims",
"c",
map[string]string{
"dim1": "foo",
},
[][]string{{"dim1"}},
[][]string{{"dim1"}},
},
}

Expand All @@ -317,11 +381,8 @@ func TestProcessMetricDeclarations(t *testing.T) {
metric.InitEmpty()
metric.SetName(tc.metricName)
t.Run(tc.testName, func(t *testing.T) {
dimensionsList := processMetricDeclarations(metricDeclarations, &metric, tc.labels)
assert.Equal(t, len(tc.dimensionsList), len(dimensionsList))
for i, dimensions := range dimensionsList {
assertDimsEqual(t, tc.dimensionsList[i], dimensions)
}
dimensions := processMetricDeclarations(metricDeclarations, &metric, tc.labels, tc.rollUpDims)
assert.Equal(t, tc.expectedDims, dimensions)
})
}
}
84 changes: 35 additions & 49 deletions exporter/awsemfexporter/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"encoding/json"
"fmt"
"sort"
"strings"
"time"

"go.opentelemetry.io/collector/consumer/pdata"
Expand Down Expand Up @@ -274,40 +273,52 @@ func buildCWMetric(dp DataPoint, pmd *pdata.Metric, namespace string, metricSlic
idx++
})

// Apply single/zero dimension rollup to labels
rollupDimensionArray := dimensionRollup(dimensionRollupOption, labelsSlice, instrumentationLibName)

// Add OTel instrumentation lib name as an additional dimension if it is defined
if instrumentationLibName != noInstrumentationLibraryName {
labels[OTellibDimensionKey] = instrumentationLibName
fields[OTellibDimensionKey] = instrumentationLibName
}

// Create list of dimension sets
var dimensionsArray [][][]string
var dimensions [][]string
if len(metricDeclarations) > 0 {
// Filter metric declarations and map each metric declaration to a list
// of dimension sets
dimensionsArray = processMetricDeclarations(metricDeclarations, pmd, labels)
} else if instrumentationLibName != noInstrumentationLibraryName {
// If no metric declarations defined and OTel instrumentation lib name is defined,
// create a single dimension set containing the list of labels +
// instrumentationLibName dimension key
dimensionsArray = [][][]string{{append(labelsSlice, OTellibDimensionKey)}}
// If metric declarations are defined, extract dimension sets from them
dimensions = processMetricDeclarations(metricDeclarations, pmd, labels, rollupDimensionArray)
} else {
// If no metric declarations or OTel instrumentation lib name defined,
// create a single dimension set containing only label names
dimensionsArray = [][][]string{{labelsSlice}}
}
// If no metric declarations defined, create a single dimension set containing
// the list of labels
dims := labelsSlice
if instrumentationLibName != noInstrumentationLibraryName {
// If OTel instrumentation lib name is defined, add instrumentation lib
// name as a dimension
dims = append(dims, OTellibDimensionKey)
}

// Apply single/zero dimension rollup to labels
rollupDimensionArray := dimensionRollup(dimensionRollupOption, labelsSlice, instrumentationLibName)
if len(rollupDimensionArray) > 0 {
// Perform de-duplication check for edge case with a single label and single roll-up
// is activated
if len(labelsSlice) > 1 || (dimensionRollupOption != SingleDimensionRollupOnly &&
dimensionRollupOption != ZeroAndSingleDimensionRollup) {
dimensions = [][]string{dims}
}
dimensions = append(dimensions, rollupDimensionArray...)
} else {
dimensions = [][]string{dims}
}
}

// Build list of CW Measurements
cwMeasurements := make([]CwMeasurement, len(dimensionsArray))
for i, dimensions := range dimensionsArray {
dimensions = dedupDimensions(dimensions, rollupDimensionArray)
cwMeasurements[i] = CwMeasurement{
Namespace: namespace,
Dimensions: dimensions,
Metrics: metricSlice,
var cwMeasurements []CwMeasurement
if len(dimensions) > 0 {
cwMeasurements = []CwMeasurement{
{
Namespace: namespace,
Dimensions: dimensions,
Metrics: metricSlice,
},
}
}

Expand Down Expand Up @@ -409,6 +420,7 @@ func calculateRate(fields map[string]interface{}, val interface{}, timestamp int
return metricRate
}

// dimensionRollup creates rolled-up dimensions from the metric's label set.
func dimensionRollup(dimensionRollupOption string, originalDimensionSlice []string, instrumentationLibName string) [][]string {
var rollupDimensionArray [][]string
dimensionZero := []string{}
Expand All @@ -431,32 +443,6 @@ func dimensionRollup(dimensionRollupOption string, originalDimensionSlice []stri
return rollupDimensionArray
}

// dedupDimensions appends rolled-up dimensions to existing dimensions and removes duplicate dimension sets.
func dedupDimensions(dimensions, rolledUpDims [][]string) [][]string {
deduped := make([][]string, len(dimensions)+len(rolledUpDims))
seen := make(map[string]bool, len(deduped))
idx := 0

addDeduped := func(dimSet []string) {
sort.Strings(dimSet)
key := strings.Join(dimSet, ",")
if _, ok := seen[key]; ok {
return
}
seen[key] = true
deduped[idx] = dimSet
idx++
}

for _, dimSet := range dimensions {
addDeduped(dimSet)
}
for _, dimSet := range rolledUpDims {
addDeduped(dimSet)
}
return deduped[:idx]
}

func needsCalculateRate(pmd *pdata.Metric) bool {
switch pmd.DataType() {
case pdata.MetricDataTypeIntSum:
Expand Down
Loading

0 comments on commit e6c1ff3

Please sign in to comment.