Skip to content

Commit ab1963b

Browse files
authored
Fix race condition when fetching resources (CoverGenius#23)
* Fix race condition when fetching resources (#4) Currently when fetching resources an empty array for the current list of resources is initialized and is saved in place of the existing list. If this list is accessed by one of the metrics go routines before it has been fully populated then that metric will not be populated for the missing resources. This diff adds locking and changes the new array to only be saved once fully populated to prevent such data races from occurring. * Fix bug causing all default metrics to only use average statistic
1 parent bb256e8 commit ab1963b

File tree

17 files changed

+174
-133
lines changed

17 files changed

+174
-133
lines changed

base/config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ type configMetric struct {
1313
Dimensions []*cloudwatch.Dimension `yaml:"dimensions"` // The resource dimensions to generate individual series for (via labels)
1414
Statistics []*string `yaml:"statistics"` // List of AWS statistics to use.
1515
OutputName string `yaml:"output_name"` // Allows override of the generate metric name
16-
RangeSeconds int64 `yaml:"range_seconds"` // How far back to request data for in seconds.
1716
PeriodSeconds int64 `yaml:"period_seconds"` // Granularity of results from cloudwatch API.
17+
RangeSeconds int64 `yaml:"range_seconds"` // How far back to request data for in seconds.
1818
}
1919

2020
type metric struct {
@@ -43,8 +43,7 @@ type Config struct {
4343
func (c *Config) ConstructMetrics(defaults map[string]map[string]*MetricDescription) map[string][]*MetricDescription {
4444
mds := make(map[string][]*MetricDescription)
4545
for namespace, metrics := range c.Metrics.Data {
46-
47-
if len(metrics) <= 0 {
46+
if len(metrics) == 0 {
4847
if namespaceDefaults, ok := defaults[namespace]; ok {
4948
for key, defaultMetric := range namespaceDefaults {
5049
metrics = append(metrics, &configMetric{
@@ -54,6 +53,7 @@ func (c *Config) ConstructMetrics(defaults map[string]map[string]*MetricDescript
5453
PeriodSeconds: defaultMetric.PeriodSeconds,
5554
RangeSeconds: defaultMetric.RangeSeconds,
5655
Dimensions: defaultMetric.Dimensions,
56+
Statistics: defaultMetric.Statistic,
5757
})
5858
}
5959
}

base/controller.go

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
log "github.com/sirupsen/logrus"
2626
)
2727

28-
var alphaRegex, _ = regexp.Compile("[^a-zA-Z0-9]+")
28+
var alphaRegex = regexp.MustCompile("[^a-zA-Z0-9]+")
2929

3030
// TagDescription represents an AWS tag key value pair
3131
type TagDescription struct {
@@ -120,7 +120,7 @@ func (rd *RegionDescription) BuildARN(s *string, r *string) (string, error) {
120120
return a.String(), nil
121121
}
122122

123-
func (rd *RegionDescription) buildFilters() error {
123+
func (rd *RegionDescription) saveFilters() {
124124
filters := []*ec2.Filter{}
125125
for _, tag := range rd.Tags {
126126
f := &ec2.Filter{
@@ -130,16 +130,20 @@ func (rd *RegionDescription) buildFilters() error {
130130
filters = append(filters, f)
131131
}
132132
rd.Filters = filters
133-
return nil
134133
}
135134

136135
func (rd *RegionDescription) saveAccountID() error {
137136
session := iam.New(rd.Session)
138137
input := iam.GetUserInput{}
139138
user, err := session.GetUser(&input)
140-
h.LogError(err)
139+
if err != nil {
140+
return err
141+
}
142+
141143
a, err := arn.Parse(*user.User.Arn)
142-
h.LogError(err)
144+
if err != nil {
145+
return err
146+
}
143147
rd.AccountID = &a.AccountID
144148

145149
return nil
@@ -153,13 +157,16 @@ func (rd *RegionDescription) Init(s *session.Session, td []*TagDescription, metr
153157
rd.Tags = td
154158

155159
err := rd.saveAccountID()
156-
h.LogErrorExit(err)
160+
if err != nil {
161+
return fmt.Errorf("error saving account id: %s", err)
162+
}
157163

158-
err = rd.buildFilters()
159-
h.LogErrorExit(err)
164+
rd.saveFilters()
160165

161166
err = rd.CreateNamespaceDescriptions(metrics)
162-
h.LogErrorExit(err)
167+
if err != nil {
168+
return fmt.Errorf("error creating namespaces: %s", err)
169+
}
163170

164171
return nil
165172
}
@@ -196,9 +203,9 @@ func (nd *NamespaceDescription) GatherMetrics(cw *cloudwatch.CloudWatch) {
196203
for _, md := range nd.Metrics {
197204
go func(md *MetricDescription) {
198205
nd.Mutex.RLock()
199-
result, err := md.getData(cw, nd.Resources, nd)
206+
result, err := md.getData(cw, nd.Resources)
200207
nd.Mutex.RUnlock()
201-
h.LogError(err)
208+
h.LogIfError(err)
202209
md.saveData(result, *nd.Parent.Region)
203210
}(md)
204211
}
@@ -295,7 +302,7 @@ func (md *MetricDescription) saveData(c *cloudwatch.GetMetricDataOutput, region
295302

296303
labels, err := awsLabelsFromString(*data.Label)
297304
if err != nil {
298-
h.LogError(err)
305+
h.LogIfError(err)
299306
continue
300307
}
301308

@@ -320,7 +327,7 @@ func (md *MetricDescription) saveData(c *cloudwatch.GetMetricDataOutput, region
320327
err = fmt.Errorf("unknown statistic type: %s", labels.statistic)
321328
}
322329
if err != nil {
323-
h.LogError(err)
330+
h.LogIfError(err)
324331
continue
325332
}
326333

@@ -448,12 +455,12 @@ func (rd *RegionDescription) TagsFound(tl interface{}) bool {
448455
return false
449456
}
450457

451-
func (md *MetricDescription) getData(cw *cloudwatch.CloudWatch, rds []*ResourceDescription, nd *NamespaceDescription) (*cloudwatch.GetMetricDataOutput, error) {
458+
func (md *MetricDescription) getData(cw *cloudwatch.CloudWatch, rds []*ResourceDescription) (*cloudwatch.GetMetricDataOutput, error) {
452459
query, err := md.BuildQuery(rds)
453-
if len(query) < 1 {
460+
if len(query) == 0 {
454461
return &cloudwatch.GetMetricDataOutput{}, nil
455462
}
456-
h.LogError(err)
463+
h.LogIfError(err)
457464

458465
end := time.Now().Round(5 * time.Minute)
459466
start := end.Add(-time.Duration(md.RangeSeconds) * time.Second)
@@ -464,7 +471,7 @@ func (md *MetricDescription) getData(cw *cloudwatch.CloudWatch, rds []*ResourceD
464471
MetricDataQueries: query,
465472
}
466473
result, err := cw.GetMetricData(&input)
467-
h.LogError(err)
474+
h.LogIfError(err)
468475

469476
return result, err
470477
}

base/prometheus.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,5 +132,4 @@ func NewBatchCounterVec(opts prometheus.Opts, labels []string) *BatchCounterVec
132132
labels,
133133
),
134134
}
135-
136135
}

build/cloudwatch-prometheus-exporter.spec

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66

77
Name: cloudwatch-prometheus-exporter
8-
Version: 0.1.0
8+
Version: 0.1.1
99
Release: 0%{?dist}
1010
Summary: Cloudwatch Prometheus Exporter
1111
License: BSD
@@ -38,6 +38,8 @@ rm -rf %{buildroot}
3838

3939

4040
%changelog
41+
* Thu Apr 16 2020 Andrew Wright <andrew.w@covergenius.com>
42+
- Fix race condition when fetching resource list
4143
* Wed Apr 15 2020 Andrew Wright <andrew.w@covergenius.com>
4244
- Allow runtime configuration of metrics
4345
* Wed Apr 08 2020 Andrew Wright <andrew.w@covergenius.com>

ec2/controller.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,17 @@ import (
1111
"github.com/aws/aws-sdk-go/service/ec2"
1212
)
1313

14-
func CreateResourceDescription(nd *b.NamespaceDescription, instance *ec2.Instance) error {
14+
func createResourceDescription(nd *b.NamespaceDescription, instance *ec2.Instance) (*b.ResourceDescription, error) {
1515
rd := b.ResourceDescription{}
1616
dd := []*b.DimensionDescription{
1717
{
1818
Name: aws.String("InstanceId"),
1919
Value: instance.InstanceId,
2020
},
2121
}
22-
err := rd.BuildDimensions(dd)
23-
h.LogError(err)
22+
if err := rd.BuildDimensions(dd); err != nil {
23+
return nil, err
24+
}
2425

2526
tags := make(map[string]*string)
2627
for _, t := range instance.Tags {
@@ -34,29 +35,32 @@ func CreateResourceDescription(nd *b.NamespaceDescription, instance *ec2.Instanc
3435
}
3536
rd.Type = aws.String("ec2")
3637
rd.Parent = nd
37-
nd.Resources = append(nd.Resources, &rd)
3838

39-
return nil
39+
return &rd, nil
4040
}
4141

4242
// CreateResourceList fetches a list of all EC2 instances in the parent region
43-
func CreateResourceList(nd *b.NamespaceDescription, wg *sync.WaitGroup) error {
43+
func CreateResourceList(nd *b.NamespaceDescription, wg *sync.WaitGroup) {
4444
defer wg.Done()
4545
log.Debug("Creating EC2 resource list ...")
4646

47-
nd.Resources = []*b.ResourceDescription{}
4847
session := ec2.New(nd.Parent.Session)
4948
input := ec2.DescribeInstancesInput{
5049
Filters: nd.Parent.Filters,
5150
}
5251
result, err := session.DescribeInstances(&input)
53-
h.LogError(err)
52+
h.LogIfError(err)
5453

54+
resources := []*b.ResourceDescription{}
5555
for _, reservation := range result.Reservations {
5656
for _, instance := range reservation.Instances {
57-
err := CreateResourceDescription(nd, instance)
58-
h.LogError(err)
57+
if r, err := createResourceDescription(nd, instance); err == nil {
58+
resources = append(resources, r)
59+
}
60+
h.LogIfError(err)
5961
}
6062
}
61-
return nil
63+
nd.Mutex.Lock()
64+
nd.Resources = resources
65+
nd.Mutex.Unlock()
6266
}

elasticache/controller.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
"github.com/aws/aws-sdk-go/service/elasticache"
1313
)
1414

15-
func CreateResourceDescription(nd *b.NamespaceDescription, cc *elasticache.CacheCluster) (*b.ResourceDescription, error) {
15+
func createResourceDescription(nd *b.NamespaceDescription, cc *elasticache.CacheCluster) (*b.ResourceDescription, error) {
1616
rd := b.ResourceDescription{}
1717
dd := []*b.DimensionDescription{
1818
{
@@ -21,7 +21,7 @@ func CreateResourceDescription(nd *b.NamespaceDescription, cc *elasticache.Cache
2121
},
2222
}
2323
err := rd.BuildDimensions(dd)
24-
h.LogError(err)
24+
h.LogIfError(err)
2525
rd.ID = cc.CacheClusterId
2626
rd.Name = cc.CacheClusterId
2727
rd.Type = aws.String("elasticache")
@@ -31,48 +31,49 @@ func CreateResourceDescription(nd *b.NamespaceDescription, cc *elasticache.Cache
3131
}
3232

3333
// CreateResourceList fetches a list of all Elasticache clusters in the parent region
34-
func CreateResourceList(nd *b.NamespaceDescription, wg *sync.WaitGroup) error {
34+
func CreateResourceList(nd *b.NamespaceDescription, wg *sync.WaitGroup) {
3535
defer wg.Done()
3636
log.Debug("Creating Elasticache resource list ...")
3737

38-
resources := []*b.ResourceDescription{}
3938
session := elasticache.New(nd.Parent.Session)
4039
input := elasticache.DescribeCacheClustersInput{}
4140
result, err := session.DescribeCacheClusters(&input)
42-
h.LogError(err)
41+
h.LogIfError(err)
4342
service := "elasticache"
4443

4544
var w sync.WaitGroup
4645
w.Add(len(result.CacheClusters))
47-
ch := make(chan (*b.ResourceDescription), len(result.CacheClusters))
46+
ch := make(chan *b.ResourceDescription, len(result.CacheClusters))
4847
for _, cc := range result.CacheClusters {
4948
go func(cc *elasticache.CacheCluster, wg *sync.WaitGroup) {
5049
defer wg.Done()
50+
5151
resource := strings.Join([]string{"cluster", *cc.CacheClusterId}, ":")
5252
arn, err := nd.Parent.BuildARN(&service, &resource)
53-
h.LogError(err)
53+
h.LogIfError(err)
54+
5455
input := elasticache.ListTagsForResourceInput{
5556
ResourceName: aws.String(arn),
5657
}
5758
tags, err := session.ListTagsForResource(&input)
58-
h.LogError(err)
59+
h.LogIfError(err)
5960

6061
if nd.Parent.TagsFound(tags) {
61-
if r, err := CreateResourceDescription(nd, cc); err == nil {
62+
if r, err := createResourceDescription(nd, cc); err == nil {
6263
ch <- r
6364
}
64-
h.LogError(err)
65+
h.LogIfError(err)
6566
}
6667
}(cc, &w)
6768
}
6869
w.Wait()
6970
close(ch)
71+
72+
resources := []*b.ResourceDescription{}
7073
for r := range ch {
7174
resources = append(resources, r)
7275
}
73-
7476
nd.Mutex.Lock()
7577
nd.Resources = resources
7678
nd.Mutex.Unlock()
77-
return nil
7879
}

elb/controller.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"github.com/aws/aws-sdk-go/service/elb"
1212
)
1313

14-
func CreateResourceDescription(nd *b.NamespaceDescription, td *elb.TagDescription) error {
14+
func createResourceDescription(nd *b.NamespaceDescription, td *elb.TagDescription) (*b.ResourceDescription, error) {
1515
rd := b.ResourceDescription{}
1616
dd := []*b.DimensionDescription{
1717
{
@@ -20,47 +20,50 @@ func CreateResourceDescription(nd *b.NamespaceDescription, td *elb.TagDescriptio
2020
},
2121
}
2222
err := rd.BuildDimensions(dd)
23-
h.LogError(err)
23+
h.LogIfError(err)
2424
rd.ID = td.LoadBalancerName
2525
rd.Name = td.LoadBalancerName
2626
rd.Type = aws.String("lb-classic")
2727
rd.Parent = nd
28-
nd.Resources = append(nd.Resources, &rd)
2928

30-
return nil
29+
return &rd, nil
3130
}
3231

3332
// CreateResourceList fetches a list of all Classic LB resources in the region
34-
func CreateResourceList(nd *b.NamespaceDescription, wg *sync.WaitGroup) error {
33+
func CreateResourceList(nd *b.NamespaceDescription, wg *sync.WaitGroup) {
3534
defer wg.Done()
3635
log.Debug("Creating Classic LB resource list ...")
37-
nd.Resources = []*b.ResourceDescription{}
3836
session := elb.New(nd.Parent.Session)
3937
input := elb.DescribeLoadBalancersInput{}
4038
result, err := session.DescribeLoadBalancers(&input)
41-
h.LogError(err)
39+
h.LogIfError(err)
4240

4341
resourceList := []*string{}
4442
for _, lb := range result.LoadBalancerDescriptions {
4543
resourceList = append(resourceList, lb.LoadBalancerName)
4644
}
4745
if len(resourceList) <= 0 {
48-
return nil
46+
return
4947
}
5048

5149
dti := elb.DescribeTagsInput{
5250
LoadBalancerNames: resourceList,
5351
}
5452
tags, err := session.DescribeTags(&dti)
55-
h.LogError(err)
53+
h.LogIfError(err)
5654

55+
resources := []*b.ResourceDescription{}
5756
for _, td := range tags.TagDescriptions {
5857
if nd.Parent.TagsFound(td) {
59-
err := CreateResourceDescription(nd, td)
60-
h.LogError(err)
58+
if r, err := createResourceDescription(nd, td); err == nil {
59+
resources = append(resources, r)
60+
}
61+
h.LogIfError(err)
6162
} else {
6263
continue
6364
}
6465
}
65-
return nil
66+
nd.Mutex.Lock()
67+
nd.Resources = resources
68+
nd.Mutex.Unlock()
6669
}

0 commit comments

Comments
 (0)