Skip to content

Commit

Permalink
Move convertPeriodToDuration and getRegions into aws.go (#10474)
Browse files Browse the repository at this point in the history
* move convertPeriodToDuration and getRegions into aws.go

* Fix unit test
  • Loading branch information
kaiyan-sheng authored Feb 1, 2019
1 parent 4ea1e12 commit ff0dead
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 145 deletions.
92 changes: 90 additions & 2 deletions x-pack/metricbeat/module/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,17 @@

package aws

import "github.com/elastic/beats/metricbeat/mb"
import (
"strconv"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/defaults"
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/aws/aws-sdk-go-v2/service/ec2/ec2iface"
"github.com/pkg/errors"

"github.com/elastic/beats/metricbeat/mb"
)

// Config defines all required and optional parameters for aws metricsets
type Config struct {
Expand All @@ -18,6 +28,10 @@ type Config struct {
// MetricSet is the base metricset for all aws metricsets
type MetricSet struct {
mb.BaseMetricSet
RegionsList []string
DurationString string
PeriodInSec int
AwsConfig *awssdk.Config
}

// ModuleName is the name of this module.
Expand All @@ -44,5 +58,79 @@ func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) {
if err != nil {
return nil, err
}
return &MetricSet{BaseMetricSet: base}, nil

awsConfig := defaults.Config()
awsCreds := awssdk.Credentials{
AccessKeyID: config.AccessKeyID,
SecretAccessKey: config.SecretAccessKey,
}
if config.SessionToken != "" {
awsCreds.SessionToken = config.SessionToken
}

awsConfig.Credentials = awssdk.StaticCredentialsProvider{
Value: awsCreds,
}

awsConfig.Region = config.DefaultRegion

svcEC2 := ec2.New(awsConfig)
regionsList, err := getRegions(svcEC2)
if err != nil {
return nil, err
}

// Calculate duration based on period
durationString, periodSec, err := convertPeriodToDuration(config.Period)
if err != nil {
return nil, err
}

// Construct MetricSet
metricSet := MetricSet{
BaseMetricSet: base,
RegionsList: regionsList,
DurationString: durationString,
PeriodInSec: periodSec,
AwsConfig: &awsConfig,
}
return &metricSet, nil
}

func getRegions(svc ec2iface.EC2API) (regionsList []string, err error) {
input := &ec2.DescribeRegionsInput{}
req := svc.DescribeRegionsRequest(input)
output, err := req.Send()
if err != nil {
err = errors.Wrap(err, "Failed DescribeRegions")
return
}
for _, region := range output.Regions {
regionsList = append(regionsList, *region.RegionName)
}
return
}

func convertPeriodToDuration(period string) (string, int, error) {
// Set starttime double the default frequency earlier than the endtime in order to make sure
// GetMetricDataRequest gets the latest data point for each metric.
numberPeriod, err := strconv.Atoi(period[0 : len(period)-1])
if err != nil {
return "", 0, err
}

unitPeriod := period[len(period)-1:]
switch unitPeriod {
case "s":
duration := "-" + strconv.Itoa(numberPeriod*2) + unitPeriod
return duration, numberPeriod, nil
case "m":
duration := "-" + strconv.Itoa(numberPeriod*2) + unitPeriod
periodInSec := numberPeriod * 60
return duration, periodInSec, nil
default:
err = errors.New("invalid period in config. Please reset period in config")
duration := "-" + strconv.Itoa(numberPeriod*2) + "s"
return duration, numberPeriod, err
}
}
81 changes: 81 additions & 0 deletions x-pack/metricbeat/module/aws/aws_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build !integration

package aws

import (
"fmt"
"testing"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/aws/aws-sdk-go-v2/service/ec2/ec2iface"
"github.com/stretchr/testify/assert"
)

// MockEC2Client struct is used for unit tests.
type MockEC2Client struct {
ec2iface.EC2API
}

var regionName = "us-west-1"

func (m *MockEC2Client) DescribeRegionsRequest(input *ec2.DescribeRegionsInput) ec2.DescribeRegionsRequest {
return ec2.DescribeRegionsRequest{
Request: &awssdk.Request{
Data: &ec2.DescribeRegionsOutput{
Regions: []ec2.Region{
{
RegionName: &regionName,
},
},
},
},
}
}

func TestGetRegions(t *testing.T) {
mockSvc := &MockEC2Client{}
regionsList, err := getRegions(mockSvc)
if err != nil {
fmt.Println("failed getRegions: ", err)
t.FailNow()
}
assert.Equal(t, 1, len(regionsList))
assert.Equal(t, regionName, regionsList[0])
}

func TestConvertPeriodToDuration(t *testing.T) {
period1 := "300s"
duration1, periodSec1, err := convertPeriodToDuration(period1)
assert.NoError(t, nil, err)
assert.Equal(t, "-600s", duration1)
assert.Equal(t, 300, periodSec1)

period2 := "30ss"
duration2, periodSec2, err := convertPeriodToDuration(period2)
assert.Error(t, err)
assert.Equal(t, "", duration2)
assert.Equal(t, 0, periodSec2)

period3 := "10m"
duration3, periodSec3, err := convertPeriodToDuration(period3)
assert.NoError(t, nil, err)
assert.Equal(t, "-20m", duration3)
assert.Equal(t, 600, periodSec3)

period4 := "30s"
duration4, periodSec4, err := convertPeriodToDuration(period4)
assert.NoError(t, nil, err)
assert.Equal(t, "-60s", duration4)
assert.Equal(t, 30, periodSec4)

period5 := "60s"
duration5, periodSec5, err := convertPeriodToDuration(period5)
assert.NoError(t, nil, err)
assert.Equal(t, "-120s", duration5)
assert.Equal(t, 60, periodSec5)
}
101 changes: 10 additions & 91 deletions x-pack/metricbeat/module/aws/ec2/ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@ package ec2

import (
"fmt"
"strconv"
"time"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/defaults"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/cloudwatchiface"
"github.com/aws/aws-sdk-go-v2/service/ec2"
Expand Down Expand Up @@ -41,12 +39,7 @@ func init() {
// interface methods except for Fetch.
type MetricSet struct {
*aws.MetricSet
moduleConfig *aws.Config
awsConfig *awssdk.Config
regionsList []string
durationString string
periodInSec int
logger *logp.Logger
logger *logp.Logger
}

// metricIDNameMap is a translating map between createMetricDataQuery id
Expand Down Expand Up @@ -90,38 +83,9 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return nil, errors.Wrap(err, "error creating aws metricset")
}

// Get a list of regions
awsConfig := defaults.Config()
awsCreds := awssdk.Credentials{
AccessKeyID: moduleConfig.AccessKeyID,
SecretAccessKey: moduleConfig.SecretAccessKey,
}
if moduleConfig.SessionToken != "" {
awsCreds.SessionToken = moduleConfig.SessionToken
}

awsConfig.Credentials = awssdk.StaticCredentialsProvider{
Value: awsCreds,
}

awsConfig.Region = moduleConfig.DefaultRegion
svcEC2 := ec2.New(awsConfig)
regionsList, err := getRegions(svcEC2)
if err != nil {
err = errors.Wrap(err, "getRegions failed")
ec2Logger.Error(err.Error())
}

// Calculate duration based on period
durationString, periodSec, err := convertPeriodToDuration(moduleConfig.Period)
if err != nil {
ec2Logger.Error(err.Error())
return nil, err
}

// Check if period is set to be multiple of 60s or 300s
remainder300 := periodSec % 300
remainder60 := periodSec % 60
remainder300 := metricSet.PeriodInSec % 300
remainder60 := metricSet.PeriodInSec % 60
if remainder300 != 0 || remainder60 != 0 {
err := errors.New("period needs to be set to 60s (or a multiple of 60s) if detailed monitoring is " +
"enabled for EC2 instances or set to 300s (or a multiple of 300s) if EC2 instances has basic monitoring. " +
Expand All @@ -130,23 +94,18 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
}

return &MetricSet{
MetricSet: metricSet,
moduleConfig: &moduleConfig,
awsConfig: &awsConfig,
regionsList: regionsList,
durationString: durationString,
periodInSec: periodSec,
logger: ec2Logger,
MetricSet: metricSet,
logger: ec2Logger,
}, nil
}

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) {
for _, regionName := range m.regionsList {
m.awsConfig.Region = regionName
svcEC2 := ec2.New(*m.awsConfig)
for _, regionName := range m.MetricSet.RegionsList {
m.MetricSet.AwsConfig.Region = regionName
svcEC2 := ec2.New(*m.MetricSet.AwsConfig)
instanceIDs, instancesOutputs, err := getInstancesPerRegion(svcEC2)
if err != nil {
err = errors.Wrap(err, "getInstancesPerRegion failed, skipping region "+regionName)
Expand All @@ -155,13 +114,13 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) {
continue
}

svcCloudwatch := cloudwatch.New(*m.awsConfig)
svcCloudwatch := cloudwatch.New(*m.MetricSet.AwsConfig)
for _, instanceID := range instanceIDs {
init := true
getMetricDataOutput := &cloudwatch.GetMetricDataOutput{NextToken: nil}
for init || getMetricDataOutput.NextToken != nil {
init = false
output, err := getMetricDataPerRegion(m.durationString, m.periodInSec, instanceID, getMetricDataOutput.NextToken, svcCloudwatch)
output, err := getMetricDataPerRegion(m.MetricSet.DurationString, m.MetricSet.PeriodInSec, instanceID, getMetricDataOutput.NextToken, svcCloudwatch)
if err != nil {
err = errors.Wrap(err, "getMetricDataPerRegion failed, skipping region "+regionName+" for instance "+instanceID)
m.logger.Error(err.Error())
Expand All @@ -187,20 +146,6 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) {
}
}

func getRegions(svc ec2iface.EC2API) (regionsList []string, err error) {
input := &ec2.DescribeRegionsInput{}
req := svc.DescribeRegionsRequest(input)
output, err := req.Send()
if err != nil {
err = errors.Wrap(err, "Failed DescribeRegions")
return
}
for _, region := range output.Regions {
regionsList = append(regionsList, *region.RegionName)
}
return
}

func createCloudWatchEvents(getMetricDataOutput *cloudwatch.GetMetricDataOutput, instanceID string, instanceOutput ec2.Instance, regionName string) (event mb.Event, info string, err error) {
event.Service = metricsetName
event.RootFields = common.MapStr{}
Expand Down Expand Up @@ -313,32 +258,6 @@ func getInstancesPerRegion(svc ec2iface.EC2API) (instanceIDs []string, instances
return
}

func convertPeriodToDuration(period string) (string, int, error) {
// Amazon EC2 sends metrics to Amazon CloudWatch with 5-minute default frequency.
// If detailed monitoring is enabled, then data will be available in 1-minute period.
// Set starttime double the default frequency earlier than the endtime in order to make sure
// GetMetricDataRequest gets the latest data point for each metric.
numberPeriod, err := strconv.Atoi(period[0 : len(period)-1])
if err != nil {
return "", 0, err
}

unitPeriod := period[len(period)-1:]
switch unitPeriod {
case "s":
duration := "-" + strconv.Itoa(numberPeriod*2) + unitPeriod
return duration, numberPeriod, nil
case "m":
duration := "-" + strconv.Itoa(numberPeriod*2) + unitPeriod
periodInSec := numberPeriod * 60
return duration, periodInSec, nil
default:
err = errors.New("invalid period in config. Please reset period in config")
duration := "-" + strconv.Itoa(numberPeriod*2) + "s"
return duration, numberPeriod, err
}
}

func getMetricDataPerRegion(durationString string, periodInSec int, instanceID string, nextToken *string, svc cloudwatchiface.CloudWatchAPI) (*cloudwatch.GetMetricDataOutput, error) {
endTime := time.Now()
duration, err := time.ParseDuration(durationString)
Expand Down
Loading

0 comments on commit ff0dead

Please sign in to comment.