Skip to content

Commit

Permalink
AWSEMFExporter - Add dimension_rollup option and log stream naming lo…
Browse files Browse the repository at this point in the history
…gic, Remove some unnecessary config (#1169)

1. Adding a feature to AWSEMFExporter, which supports customers to configure the option for rolling up their metrics dimensions.
2. Adding a feature to AWSEMFExporter, which constructs the log stream name using identifier of the host where the collector runs. e.g. Use instanceID on EC2, containerID on EKS, ECS.
  • Loading branch information
shaochengwang authored Oct 7, 2020
1 parent 792338d commit 76896b4
Show file tree
Hide file tree
Showing 12 changed files with 86 additions and 48 deletions.
5 changes: 2 additions & 3 deletions exporter/awsemfexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ The following exporter configuration parameters are supported.
| `no_verify_ssl` | Enable or disable TLS certificate verification. | false |
| `proxy_address` | Upload Structured Logs to AWS CloudWatch through a proxy. | |
| `region` | Send Structured Logs to AWS CloudWatch in a specific region. If this field is not present in config, environment variable "AWS_REGION" can then be used to set region.| determined by metadata |
| `resource_arn` | Amazon Resource Name (ARN) of the AWS resource running the collector. | |
| `role_arn` | IAM role to upload segments to a different account. | |
| `max_retries` | Maximum number of retries before abandoning an attempt to post data. | 5 |
| `force_flush_interval`| Specifies in seconds the maximum amount of time that metrics remain in the memory buffer before being sent to the server.| 60 |
| `max_retries` | Maximum number of retries before abandoning an attempt to post data. | 1 |
| `dimension_rollup_option`| DimensionRollupOption is the option for metrics dimension rollup. Three options are available. |"ZeroAndSingleDimensionRollup" (Enable both zero dimension rollup and single dimension rollup)|


## AWS Credential Configuration
Expand Down
7 changes: 5 additions & 2 deletions exporter/awsemfexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@ type Config struct {
ProxyAddress string `mapstructure:"proxy_address"`
// Region is the AWS region where the metric logs are sent to.
Region string `mapstructure:"region"`
// Amazon Resource Name (ARN) of the AWS resource running the collector.
ResourceARN string `mapstructure:"resource_arn"`
// RoleARN is the IAM role used by the collector when communicating
// with the CloudWatch Logs service
RoleARN string `mapstructure:"role_arn"`
// NoVerifySSL is the option to disable TLS certificate verification.
NoVerifySSL bool `mapstructure:"no_verify_ssl"`
// MaxRetries is the maximum number of retries before abandoning an attempt to post data.
MaxRetries int `mapstructure:"max_retries"`
// DimensionRollupOption is the option for metrics dimension rollup. Three options are available, default option is "ZeroAndSingleDimensionRollup".
// "ZeroAndSingleDimensionRollup" - Enable both zero dimension rollup and single dimension rollup
// "SingleDimensionRollupOnly" - Enable single dimension rollup
// "NoDimensionRollup" - No dimension rollup (only keep original metrics which contain all dimensions)
DimensionRollupOption string `mapstructure:"dimension_rollup_option"`
}
4 changes: 2 additions & 2 deletions exporter/awsemfexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ func TestLoadConfig(t *testing.T) {
LogStreamName: "",
Endpoint: "",
RequestTimeoutSeconds: 30,
MaxRetries: 5,
MaxRetries: 1,
NoVerifySSL: false,
ProxyAddress: "",
Region: "us-west-2",
ResourceARN: "arn:aws:ec2:us-east1:123456789:instance/i-293hiuhe0u",
RoleARN: "arn:aws:iam::123456789:role/monitoring-EKS-NodeInstanceRole",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
})
}
9 changes: 9 additions & 0 deletions exporter/awsemfexporter/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -135,6 +136,14 @@ func TestNewAWSSessionWithErr(t *testing.T) {
se, err = conn.newAWSSession(logger, roleArn, region)
assert.NotNil(t, err)
assert.Nil(t, se)
os.Setenv("AWS_SDK_LOAD_CONFIG", "true")
os.Setenv("AWS_STS_REGIONAL_ENDPOINTS", "regional")
se, _ = session.NewSession(&aws.Config{
Region: aws.String("us-east-1"),
})
assert.NotNil(t, se)
_, err = conn.getEC2Region(se)
assert.NotNil(t, err)
}

func TestGetSTSCredsFromPrimaryRegionEndpoint(t *testing.T) {
Expand Down
14 changes: 9 additions & 5 deletions exporter/awsemfexporter/emf_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/google/uuid"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer/consumererror"
Expand All @@ -39,6 +40,7 @@ type emfExporter struct {

pusherMapLock sync.Mutex
retryCnt int
collectorID string
}

// New func creates an EMF Exporter instance with data push callback func
Expand All @@ -59,12 +61,14 @@ func New(

// create CWLogs client with aws session config
svcStructuredLog := NewCloudWatchLogsClient(logger, awsConfig, session)
collectorIdentifier, _ := uuid.NewRandom()

emfExporter := &emfExporter{
svcStructuredLog: svcStructuredLog,
config: config,
retryCnt: *awsConfig.MaxRetries,
logger: logger,
collectorID: collectorIdentifier.String(),
}
emfExporter.groupStreamToPusherMap = map[string]map[string]Pusher{}

Expand All @@ -73,10 +77,11 @@ func New(

func (emf *emfExporter) pushMetricsData(_ context.Context, md pdata.Metrics) (droppedTimeSeries int, err error) {
expConfig := emf.config.(*Config)
dimensionRollupOption := expConfig.DimensionRollupOption
logGroup := "/metrics/default"
logStream := "otel-stream"
logStream := fmt.Sprintf("otel-stream-%s", emf.collectorID)
// override log group if customer has specified Resource Attributes service.name or service.namespace
putLogEvents, totalDroppedMetrics, namespace := generateLogEventFromMetric(md)
putLogEvents, totalDroppedMetrics, namespace := generateLogEventFromMetric(md, dimensionRollupOption, expConfig.Namespace)
if namespace != "" {
logGroup = fmt.Sprintf("/metrics/%s", namespace)
}
Expand Down Expand Up @@ -164,19 +169,18 @@ func (emf *emfExporter) Start(ctx context.Context, host component.Host) error {
return nil
}

func generateLogEventFromMetric(metric pdata.Metrics) ([]*LogEvent, int, string) {
func generateLogEventFromMetric(metric pdata.Metrics, dimensionRollupOption string, namespace string) ([]*LogEvent, int, string) {
rms := metric.ResourceMetrics()
cwMetricLists := []*CWMetrics{}
var cwm []*CWMetrics
var namespace string
var totalDroppedMetrics int

for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
if rm.IsNil() {
continue
}
cwm, totalDroppedMetrics = TranslateOtToCWMetric(&rm)
cwm, totalDroppedMetrics = TranslateOtToCWMetric(&rm, dimensionRollupOption, namespace)
if len(cwm) > 0 && len(cwm[0].Measurements) > 0 {
namespace = cwm[0].Measurements[0].Namespace
}
Expand Down
5 changes: 3 additions & 2 deletions exporter/awsemfexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@ func createDefaultConfig() configmodels.Exporter {
},
LogGroupName: "",
LogStreamName: "",
Namespace: "",
Endpoint: "",
RequestTimeoutSeconds: 30,
MaxRetries: 5,
MaxRetries: 1,
NoVerifySSL: false,
ProxyAddress: "",
Region: "",
ResourceARN: "",
RoleARN: "",
DimensionRollupOption: "ZeroAndSingleDimensionRollup",
}
}

Expand Down
1 change: 1 addition & 0 deletions exporter/awsemfexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/aws/aws-sdk-go v1.34.9
github.com/census-instrumentation/opencensus-proto v0.3.0
github.com/golang/protobuf v1.4.2
github.com/google/uuid v1.1.2
github.com/stretchr/testify v1.6.1
go.opentelemetry.io/collector v0.11.1-0.20201006165100-07236c11fb27
go.uber.org/zap v1.16.0
Expand Down
80 changes: 50 additions & 30 deletions exporter/awsemfexporter/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ const (

// See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
maximumLogEventsPerPut = 10000

// DimensionRollupOptions
ZeroAndSingleDimensionRollup = "ZeroAndSingleDimensionRollup"
SingleDimensionRollupOnly = "SingleDimensionRollupOnly"

FakeMetricValue = 0
)

var currentState = mapwithexpiry.NewMapWithExpiry(CleanInterval)
Expand Down Expand Up @@ -70,12 +76,11 @@ type CWMetricStats struct {
}

// TranslateOtToCWMetric converts OT metrics to CloudWatch Metric format
func TranslateOtToCWMetric(rm *pdata.ResourceMetrics) ([]*CWMetrics, int) {
func TranslateOtToCWMetric(rm *pdata.ResourceMetrics, dimensionRollupOption string, namespace string) ([]*CWMetrics, int) {
var cwMetricLists []*CWMetrics
namespace := defaultNameSpace
totalDroppedMetrics := 0

if !rm.Resource().IsNil() {
if len(namespace) == 0 && !rm.Resource().IsNil() {
serviceName, svcNameOk := rm.Resource().Attributes().Get(conventions.AttributeServiceName)
serviceNamespace, svcNsOk := rm.Resource().Attributes().Get(conventions.AttributeServiceNamespace)
if svcNameOk && svcNsOk && serviceName.Type() == pdata.AttributeValueSTRING && serviceNamespace.Type() == pdata.AttributeValueSTRING {
Expand All @@ -87,6 +92,10 @@ func TranslateOtToCWMetric(rm *pdata.ResourceMetrics) ([]*CWMetrics, int) {
}
}

if len(namespace) == 0 {
namespace = defaultNameSpace
}

ilms := rm.InstrumentationLibraryMetrics()
for j := 0; j < ilms.Len(); j++ {
ilm := ilms.At(j)
Expand All @@ -106,7 +115,7 @@ func TranslateOtToCWMetric(rm *pdata.ResourceMetrics) ([]*CWMetrics, int) {
totalDroppedMetrics++
continue
}
cwMetricList := getMeasurements(&metric, namespace, OTLib)
cwMetricList := getMeasurements(&metric, namespace, OTLib, dimensionRollupOption)
cwMetricLists = append(cwMetricLists, cwMetricList...)
}
}
Expand Down Expand Up @@ -139,7 +148,7 @@ func TranslateCWMetricToEMF(cwMetricLists []*CWMetrics) []*LogEvent {
return ples
}

func getMeasurements(metric *pdata.Metric, namespace string, OTLib string) []*CWMetrics {
func getMeasurements(metric *pdata.Metric, namespace string, OTLib string, dimensionRollupOption string) []*CWMetrics {
var result []*CWMetrics

// metric measure data from OT
Expand All @@ -161,7 +170,7 @@ func getMeasurements(metric *pdata.Metric, namespace string, OTLib string) []*CW
if dp.IsNil() {
continue
}
cwMetric := buildCWMetricFromDP(dp, metric, namespace, metricSlice, OTLib)
cwMetric := buildCWMetricFromDP(dp, metric, namespace, metricSlice, OTLib, dimensionRollupOption)
if cwMetric != nil {
result = append(result, cwMetric)
}
Expand All @@ -176,7 +185,7 @@ func getMeasurements(metric *pdata.Metric, namespace string, OTLib string) []*CW
if dp.IsNil() {
continue
}
cwMetric := buildCWMetricFromDP(dp, metric, namespace, metricSlice, OTLib)
cwMetric := buildCWMetricFromDP(dp, metric, namespace, metricSlice, OTLib, dimensionRollupOption)
if cwMetric != nil {
result = append(result, cwMetric)
}
Expand All @@ -191,7 +200,7 @@ func getMeasurements(metric *pdata.Metric, namespace string, OTLib string) []*CW
if dp.IsNil() {
continue
}
cwMetric := buildCWMetricFromDP(dp, metric, namespace, metricSlice, OTLib)
cwMetric := buildCWMetricFromDP(dp, metric, namespace, metricSlice, OTLib, dimensionRollupOption)
if cwMetric != nil {
result = append(result, cwMetric)
}
Expand All @@ -206,7 +215,7 @@ func getMeasurements(metric *pdata.Metric, namespace string, OTLib string) []*CW
if dp.IsNil() {
continue
}
cwMetric := buildCWMetricFromDP(dp, metric, namespace, metricSlice, OTLib)
cwMetric := buildCWMetricFromDP(dp, metric, namespace, metricSlice, OTLib, dimensionRollupOption)
if cwMetric != nil {
result = append(result, cwMetric)
}
Expand All @@ -221,7 +230,7 @@ func getMeasurements(metric *pdata.Metric, namespace string, OTLib string) []*CW
if dp.IsNil() {
continue
}
cwMetric := buildCWMetricFromHistogram(dp, metric, namespace, metricSlice, OTLib)
cwMetric := buildCWMetricFromHistogram(dp, metric, namespace, metricSlice, OTLib, dimensionRollupOption)
if cwMetric != nil {
result = append(result, cwMetric)
}
Expand All @@ -230,7 +239,7 @@ func getMeasurements(metric *pdata.Metric, namespace string, OTLib string) []*CW
return result
}

func buildCWMetricFromDP(dp interface{}, pmd *pdata.Metric, namespace string, metricSlice []map[string]string, OTLib string) *CWMetrics {
func buildCWMetricFromDP(dp interface{}, pmd *pdata.Metric, namespace string, metricSlice []map[string]string, OTLib string, dimensionRollupOption string) *CWMetrics {
// fields contains metric and dimensions key/value pairs
fieldsPairs := make(map[string]interface{})
var dimensionArray [][]string
Expand All @@ -256,10 +265,12 @@ func buildCWMetricFromDP(dp interface{}, pmd *pdata.Metric, namespace string, me
var metricVal interface{}
switch metric := dp.(type) {
case pdata.IntDataPoint:
fieldsPairs[pmd.Name()] = metric.Value()
// Put a fake but identical metric value here in order to add metric name into fieldsPairs
// since calculateRate() needs metric name as one of metric identifiers
fieldsPairs[pmd.Name()] = int64(FakeMetricValue)
metricVal = calculateRate(fieldsPairs, metric.Value(), timestamp)
case pdata.DoubleDataPoint:
fieldsPairs[pmd.Name()] = metric.Value()
fieldsPairs[pmd.Name()] = float64(FakeMetricValue)
metricVal = calculateRate(fieldsPairs, metric.Value(), timestamp)
}
if metricVal == nil {
Expand All @@ -268,14 +279,9 @@ func buildCWMetricFromDP(dp interface{}, pmd *pdata.Metric, namespace string, me
fieldsPairs[pmd.Name()] = metricVal

// EMF dimension attr takes list of list on dimensions. Including single/zero dimension rollup
//"Zero" dimension rollup
dimensionZero := []string{OtlibDimensionKey}
if len(dimensionSlice) > 0 {
dimensionArray = append(dimensionArray, dimensionZero)
}
//"One" dimension rollup
for _, dimensionKey := range dimensionSlice {
dimensionArray = append(dimensionArray, append(dimensionZero, dimensionKey))
rollupDimensionArray := dimensionRollup(dimensionRollupOption, dimensionSlice)
if len(rollupDimensionArray) > 0 {
dimensionArray = append(dimensionArray, rollupDimensionArray...)
}

cwMeasurement := &CwMeasurement{
Expand All @@ -293,7 +299,7 @@ func buildCWMetricFromDP(dp interface{}, pmd *pdata.Metric, namespace string, me
return cwMetric
}

func buildCWMetricFromHistogram(metric pdata.DoubleHistogramDataPoint, pmd *pdata.Metric, namespace string, metricSlice []map[string]string, OTLib string) *CWMetrics {
func buildCWMetricFromHistogram(metric pdata.DoubleHistogramDataPoint, pmd *pdata.Metric, namespace string, metricSlice []map[string]string, OTLib string, dimensionRollupOption string) *CWMetrics {
// fields contains metric and dimensions key/value pairs
fieldsPairs := make(map[string]interface{})
var dimensionArray [][]string
Expand Down Expand Up @@ -321,14 +327,9 @@ func buildCWMetricFromHistogram(metric pdata.DoubleHistogramDataPoint, pmd *pdat
fieldsPairs[pmd.Name()] = metricStats

// EMF dimension attr takes list of list on dimensions. Including single/zero dimension rollup
//"Zero" dimension rollup
dimensionZero := []string{OtlibDimensionKey}
if len(dimensionSlice) > 0 {
dimensionArray = append(dimensionArray, dimensionZero)
}
//"One" dimension rollup
for _, dimensionKey := range dimensionSlice {
dimensionArray = append(dimensionArray, append(dimensionZero, dimensionKey))
rollupDimensionArray := dimensionRollup(dimensionRollupOption, dimensionSlice)
if len(rollupDimensionArray) > 0 {
dimensionArray = append(dimensionArray, rollupDimensionArray...)
}

cwMeasurement := &CwMeasurement{
Expand Down Expand Up @@ -402,3 +403,22 @@ func calculateRate(fields map[string]interface{}, val interface{}, timestamp int
}
return metricRate
}

func dimensionRollup(dimensionRollupOption string, originalDimensionSlice []string) [][]string {
var rollupDimensionArray [][]string
dimensionZero := []string{OtlibDimensionKey}
if dimensionRollupOption == ZeroAndSingleDimensionRollup {
//"Zero" dimension rollup
if len(originalDimensionSlice) > 0 {
rollupDimensionArray = append(rollupDimensionArray, dimensionZero)
}
}
if dimensionRollupOption == ZeroAndSingleDimensionRollup || dimensionRollupOption == SingleDimensionRollupOnly {
//"One" dimension rollup
for _, dimensionKey := range originalDimensionSlice {
rollupDimensionArray = append(rollupDimensionArray, append(dimensionZero, dimensionKey))
}
}

return rollupDimensionArray
}
6 changes: 3 additions & 3 deletions exporter/awsemfexporter/metric_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func TestTranslateOtToCWMetric(t *testing.T) {
},
}
rm := internaldata.OCToMetrics(md).ResourceMetrics().At(0)
cwm, totalDroppedMetrics := TranslateOtToCWMetric(&rm)
cwm, totalDroppedMetrics := TranslateOtToCWMetric(&rm, ZeroAndSingleDimensionRollup, "")
assert.Equal(t, 1, totalDroppedMetrics)
assert.NotNil(t, cwm)
assert.Equal(t, 5, len(cwm))
Expand Down Expand Up @@ -294,7 +294,7 @@ func TestTranslateOtToCWMetricWithNameSpace(t *testing.T) {
Metrics: []*metricspb.Metric{},
}
rm := internaldata.OCToMetrics(md).ResourceMetrics().At(0)
cwm, totalDroppedMetrics := TranslateOtToCWMetric(&rm)
cwm, totalDroppedMetrics := TranslateOtToCWMetric(&rm, ZeroAndSingleDimensionRollup, "")
assert.Equal(t, 0, totalDroppedMetrics)
assert.Nil(t, cwm)
assert.Equal(t, 0, len(cwm))
Expand Down Expand Up @@ -386,7 +386,7 @@ func TestTranslateOtToCWMetricWithNameSpace(t *testing.T) {
},
}
rm = internaldata.OCToMetrics(md).ResourceMetrics().At(0)
cwm, totalDroppedMetrics = TranslateOtToCWMetric(&rm)
cwm, totalDroppedMetrics = TranslateOtToCWMetric(&rm, ZeroAndSingleDimensionRollup, "")
assert.Equal(t, 0, totalDroppedMetrics)
assert.NotNil(t, cwm)
assert.Equal(t, 1, len(cwm))
Expand Down
1 change: 0 additions & 1 deletion exporter/awsemfexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ exporters:
awsemf:
awsemf/1:
region: 'us-west-2'
resource_arn: "arn:aws:ec2:us-east1:123456789:instance/i-293hiuhe0u"
role_arn: "arn:aws:iam::123456789:role/monitoring-EKS-NodeInstanceRole"

service:
Expand Down
1 change: 1 addition & 0 deletions exporter/awsemfexporter/testdata/mockcgroup
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1:asddnasodhasdnsan-containerIDstart-21301923712841283901283901842132-containerIDend
1 change: 1 addition & 0 deletions exporter/awsemfexporter/testdata/mockcgroupWithErr
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1:line-is-too-short

0 comments on commit 76896b4

Please sign in to comment.