From 895f96f21bae704a692c194aeed912f5386a5e99 Mon Sep 17 00:00:00 2001 From: Sven Rebhan <36194019+srebhan@users.noreply.github.com> Date: Thu, 5 Sep 2024 18:56:39 +0200 Subject: [PATCH] feat(processors.aws_ec2): Allow to use instance metadata (#15795) --- plugins/processors/aws_ec2/README.md | 22 +- plugins/processors/aws_ec2/ec2.go | 281 ++++++++++++++----------- plugins/processors/aws_ec2/ec2_test.go | 1 - plugins/processors/aws_ec2/sample.conf | 22 +- 4 files changed, 191 insertions(+), 135 deletions(-) diff --git a/plugins/processors/aws_ec2/README.md b/plugins/processors/aws_ec2/README.md index 69e2e4cad4d04..eb91a7c71d393 100644 --- a/plugins/processors/aws_ec2/README.md +++ b/plugins/processors/aws_ec2/README.md @@ -37,7 +37,7 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## * ramdiskId ## * region ## * version - imds_tags = [] + # imds_tags = [] ## EC2 instance tags retrieved with DescribeTags action. ## In case tag is empty upon retrieval it's omitted when tagging metrics. @@ -47,10 +47,22 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## ## For more information see: ## https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeTags.html - ec2_tags = [] + # ec2_tags = [] + + ## Paths to instance metadata information to attach to the metrics. + ## Specify the full path without the base-path e.g. `tags/instance/Name`. + ## + ## For more information see: + ## https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html + # metadata_paths = [] + + ## Allows to convert metadata tag-names to canonical names representing the + ## full path with slashes ('/') being replaces with underscores. By default, + ## only the last path element is used to name the tag. + # canonical_metadata_tags = false ## Timeout for http requests made by against aws ec2 metadata endpoint. - timeout = "10s" + # timeout = "10s" ## ordered controls whether or not the metrics need to stay in the same order ## this plugin received them in. If false, this plugin will change the order @@ -58,12 +70,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details. ## waiting on slower lookups. This may cause issues for you if you are ## depending on the order of metrics staying the same. If so, set this to true. ## Keeping the metrics ordered may be slightly slower. - ordered = false + # ordered = false ## max_parallel_calls is the maximum number of AWS API calls to be in flight ## at the same time. ## It's probably best to keep this number fairly low. - max_parallel_calls = 10 + # max_parallel_calls = 10 ## cache_ttl determines how long each cached item will remain in the cache before ## it is removed and subsequently needs to be queried for from the AWS API. By diff --git a/plugins/processors/aws_ec2/ec2.go b/plugins/processors/aws_ec2/ec2.go index 66fc8282989fa..d6ec50fae7a6b 100644 --- a/plugins/processors/aws_ec2/ec2.go +++ b/plugins/processors/aws_ec2/ec2.go @@ -6,6 +6,8 @@ import ( _ "embed" "errors" "fmt" + "io" + "slices" "strings" "time" @@ -27,20 +29,21 @@ import ( var sampleConfig string type AwsEc2Processor struct { - ImdsTags []string `toml:"imds_tags"` - EC2Tags []string `toml:"ec2_tags"` - Timeout config.Duration `toml:"timeout"` - CacheTTL config.Duration `toml:"cache_ttl"` - Ordered bool `toml:"ordered"` - MaxParallelCalls int `toml:"max_parallel_calls"` - Log telegraf.Logger `toml:"-"` - TagCacheSize int `toml:"tag_cache_size"` - LogCacheStats bool `toml:"log_cache_stats"` + ImdsTags []string `toml:"imds_tags"` + EC2Tags []string `toml:"ec2_tags"` + MetadataPaths []string `toml:"metadata_paths"` + CanonicalMetadataTags bool `toml:"canonical_metadata_tags"` + Timeout config.Duration `toml:"timeout"` + CacheTTL config.Duration `toml:"cache_ttl"` + Ordered bool `toml:"ordered"` + MaxParallelCalls int `toml:"max_parallel_calls"` + TagCacheSize int `toml:"tag_cache_size"` + LogCacheStats bool `toml:"log_cache_stats"` + Log telegraf.Logger `toml:"-"` tagCache *freecache.Cache imdsClient *imds.Client - imdsTagsMap map[string]struct{} ec2Client *ec2.Client parallel parallel.Parallel instanceID string @@ -56,20 +59,20 @@ const ( DefaultLogCacheStats = false ) -var allowedImdsTags = map[string]struct{}{ - "accountId": {}, - "architecture": {}, - "availabilityZone": {}, - "billingProducts": {}, - "imageId": {}, - "instanceId": {}, - "instanceType": {}, - "kernelId": {}, - "pendingTime": {}, - "privateIp": {}, - "ramdiskId": {}, - "region": {}, - "version": {}, +var allowedImdsTags = []string{ + "accountId", + "architecture", + "availabilityZone", + "billingProducts", + "imageId", + "instanceId", + "instanceType", + "kernelId", + "pendingTime", + "privateIp", + "ramdiskId", + "region", + "version", } func (*AwsEc2Processor) SampleConfig() string { @@ -81,43 +84,17 @@ func (r *AwsEc2Processor) Add(metric telegraf.Metric, _ telegraf.Accumulator) er return nil } -func (r *AwsEc2Processor) logCacheStatistics(ctx context.Context) { - if r.tagCache == nil { - return - } - - ticker := time.NewTicker(30 * time.Second) - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - r.Log.Debugf("cache: size=%d hit=%d miss=%d full=%d\n", - r.tagCache.EntryCount(), - r.tagCache.HitCount(), - r.tagCache.MissCount(), - r.tagCache.EvacuateCount(), - ) - r.tagCache.ResetStatistics() - } - } -} - func (r *AwsEc2Processor) Init() error { r.Log.Debug("Initializing AWS EC2 Processor") - if len(r.EC2Tags) == 0 && len(r.ImdsTags) == 0 { + + if len(r.ImdsTags) == 0 && len(r.MetadataPaths) == 0 && len(r.EC2Tags) == 0 { return errors.New("no tags specified in configuration") } for _, tag := range r.ImdsTags { - if len(tag) == 0 || !isImdsTagAllowed(tag) { - return fmt.Errorf("not allowed metadata tag specified in configuration: %s", tag) + if tag == "" || !slices.Contains(allowedImdsTags, tag) { + return fmt.Errorf("invalid imds tag %q", tag) } - r.imdsTagsMap[tag] = struct{}{} - } - if len(r.imdsTagsMap) == 0 && len(r.EC2Tags) == 0 { - return errors.New("no allowed metadata tags specified in configuration") } return nil @@ -189,13 +166,36 @@ func (r *AwsEc2Processor) Stop() { r.cancelCleanupWorker() } -func (r *AwsEc2Processor) LookupIMDSTags(metric telegraf.Metric) telegraf.Metric { +func (r *AwsEc2Processor) logCacheStatistics(ctx context.Context) { + if r.tagCache == nil { + return + } + + ticker := time.NewTicker(30 * time.Second) + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + r.Log.Debugf("cache: size=%d hit=%d miss=%d full=%d\n", + r.tagCache.EntryCount(), + r.tagCache.HitCount(), + r.tagCache.MissCount(), + r.tagCache.EvacuateCount(), + ) + r.tagCache.ResetStatistics() + } + } +} + +func (r *AwsEc2Processor) lookupIMDSTags(metric telegraf.Metric) telegraf.Metric { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.Timeout)) defer cancel() var tagsNotFound []string - for tag := range r.imdsTagsMap { + for _, tag := range r.ImdsTags { val, err := r.tagCache.Get([]byte(tag)) if err != nil { tagsNotFound = append(tagsNotFound, tag) @@ -208,31 +208,102 @@ func (r *AwsEc2Processor) LookupIMDSTags(metric telegraf.Metric) telegraf.Metric return metric } - iido, err := r.imdsClient.GetInstanceIdentityDocument( - ctx, - &imds.GetInstanceIdentityDocumentInput{}, - ) - + doc, err := r.imdsClient.GetInstanceIdentityDocument(ctx, &imds.GetInstanceIdentityDocumentInput{}) if err != nil { r.Log.Errorf("Error when calling GetInstanceIdentityDocument: %v", err) return metric } for _, tag := range tagsNotFound { - if v := getTagFromInstanceIdentityDocument(iido, tag); v != "" { - metric.AddTag(tag, v) - expiration := int(time.Duration(r.CacheTTL).Seconds()) - err = r.tagCache.Set([]byte(tag), []byte(v), expiration) - if err != nil { - r.Log.Errorf("Error when setting IMDS tag cache value: %v", err) + var v string + switch tag { + case "accountId": + v = doc.AccountID + case "architecture": + v = doc.Architecture + case "availabilityZone": + v = doc.AvailabilityZone + case "billingProducts": + v = strings.Join(doc.BillingProducts, ",") + case "imageId": + v = doc.ImageID + case "instanceId": + v = doc.InstanceID + case "instanceType": + v = doc.InstanceType + case "kernelId": + v = doc.KernelID + case "pendingTime": + v = doc.PendingTime.String() + case "privateIp": + v = doc.PrivateIP + case "ramdiskId": + v = doc.RamdiskID + case "region": + v = doc.Region + case "version": + v = doc.Version + default: + continue + } + + metric.AddTag(tag, v) + expiration := int(time.Duration(r.CacheTTL).Seconds()) + if err := r.tagCache.Set([]byte(tag), []byte(v), expiration); err != nil { + r.Log.Errorf("Error when setting IMDS tag cache value: %v", err) + continue + } + } + + return metric +} + +func (r *AwsEc2Processor) lookupMetadata(metric telegraf.Metric) telegraf.Metric { + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.Timeout)) + defer cancel() + + for _, path := range r.MetadataPaths { + key := strings.Trim(path, "/ ") + if r.CanonicalMetadataTags { + key = strings.ReplaceAll(key, "/", "_") + } else { + if idx := strings.LastIndex(key, "/"); idx > 0 { + key = key[idx:] } } + + // Try to lookup the tag in cache + if value, err := r.tagCache.Get([]byte("metadata/" + path)); err == nil { + metric.AddTag(key, string(value)) + continue + } + + // Query the tag with the full path + resp, err := r.imdsClient.GetMetadata(ctx, &imds.GetMetadataInput{Path: path}) + if err != nil { + r.Log.Errorf("Getting metadata %q failed: %v", path, err) + continue + } + + value, err := io.ReadAll(resp.Content) + if err != nil { + r.Log.Errorf("Reading metadata reponse for %+v failed: %v", path, err) + continue + } + if len(value) > 0 { + metric.AddTag(key, string(value)) + } + expiration := int(time.Duration(r.CacheTTL).Seconds()) + if err = r.tagCache.Set([]byte("metadata/"+path), value, expiration); err != nil { + r.Log.Errorf("Updating metadata cache for %q failed: %v", path, err) + continue + } } return metric } -func (r *AwsEc2Processor) LookupEC2Tags(metric telegraf.Metric) telegraf.Metric { +func (r *AwsEc2Processor) lookupEC2Tags(metric telegraf.Metric) telegraf.Metric { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(r.Timeout)) defer cancel() @@ -252,7 +323,16 @@ func (r *AwsEc2Processor) LookupEC2Tags(metric telegraf.Metric) telegraf.Metric } dto, err := r.ec2Client.DescribeTags(ctx, &ec2.DescribeTagsInput{ - Filters: createFilterFromTags(r.instanceID, r.EC2Tags), + Filters: []types.Filter{ + { + Name: aws.String("resource-id"), + Values: []string{r.instanceID}, + }, + { + Name: aws.String("key"), + Values: r.EC2Tags, + }, + }, }) if err != nil { @@ -276,13 +356,18 @@ func (r *AwsEc2Processor) LookupEC2Tags(metric telegraf.Metric) telegraf.Metric func (r *AwsEc2Processor) asyncAdd(metric telegraf.Metric) []telegraf.Metric { // Add IMDS Instance Identity Document tags. - if len(r.imdsTagsMap) > 0 { - metric = r.LookupIMDSTags(metric) + if len(r.ImdsTags) > 0 { + metric = r.lookupIMDSTags(metric) + } + + // Add instance metadata tags. + if len(r.MetadataPaths) > 0 { + metric = r.lookupMetadata(metric) } // Add EC2 instance tags. if len(r.EC2Tags) > 0 { - metric = r.LookupEC2Tags(metric) + metric = r.lookupEC2Tags(metric) } return []telegraf.Metric{metric} @@ -300,20 +385,6 @@ func newAwsEc2Processor() *AwsEc2Processor { TagCacheSize: DefaultCacheSize, Timeout: config.Duration(DefaultTimeout), CacheTTL: config.Duration(DefaultCacheTTL), - imdsTagsMap: make(map[string]struct{}), - } -} - -func createFilterFromTags(instanceID string, tagNames []string) []types.Filter { - return []types.Filter{ - { - Name: aws.String("resource-id"), - Values: []string{instanceID}, - }, - { - Name: aws.String("key"), - Values: tagNames, - }, } } @@ -325,41 +396,3 @@ func getTagFromDescribeTags(o *ec2.DescribeTagsOutput, tag string) string { } return "" } - -func getTagFromInstanceIdentityDocument(o *imds.GetInstanceIdentityDocumentOutput, tag string) string { - switch tag { - case "accountId": - return o.AccountID - case "architecture": - return o.Architecture - case "availabilityZone": - return o.AvailabilityZone - case "billingProducts": - return strings.Join(o.BillingProducts, ",") - case "imageId": - return o.ImageID - case "instanceId": - return o.InstanceID - case "instanceType": - return o.InstanceType - case "kernelId": - return o.KernelID - case "pendingTime": - return o.PendingTime.String() - case "privateIp": - return o.PrivateIP - case "ramdiskId": - return o.RamdiskID - case "region": - return o.Region - case "version": - return o.Version - default: - return "" - } -} - -func isImdsTagAllowed(tag string) bool { - _, ok := allowedImdsTags[tag] - return ok -} diff --git a/plugins/processors/aws_ec2/ec2_test.go b/plugins/processors/aws_ec2/ec2_test.go index 7409b75a7fdc7..7f3b3aa302803 100644 --- a/plugins/processors/aws_ec2/ec2_test.go +++ b/plugins/processors/aws_ec2/ec2_test.go @@ -145,7 +145,6 @@ func TestTracking(t *testing.T) { CacheTTL: config.Duration(DefaultCacheTTL), ImdsTags: []string{"accountId", "instanceId"}, Log: &testutil.Logger{}, - imdsTagsMap: make(map[string]struct{}), } require.NoError(t, plugin.Init()) diff --git a/plugins/processors/aws_ec2/sample.conf b/plugins/processors/aws_ec2/sample.conf index 198ab6a84c234..c07ac086daa69 100644 --- a/plugins/processors/aws_ec2/sample.conf +++ b/plugins/processors/aws_ec2/sample.conf @@ -18,7 +18,7 @@ ## * ramdiskId ## * region ## * version - imds_tags = [] + # imds_tags = [] ## EC2 instance tags retrieved with DescribeTags action. ## In case tag is empty upon retrieval it's omitted when tagging metrics. @@ -28,10 +28,22 @@ ## ## For more information see: ## https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeTags.html - ec2_tags = [] + # ec2_tags = [] + + ## Paths to instance metadata information to attach to the metrics. + ## Specify the full path without the base-path e.g. `tags/instance/Name`. + ## + ## For more information see: + ## https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html + # metadata_paths = [] + + ## Allows to convert metadata tag-names to canonical names representing the + ## full path with slashes ('/') being replaces with underscores. By default, + ## only the last path element is used to name the tag. + # canonical_metadata_tags = false ## Timeout for http requests made by against aws ec2 metadata endpoint. - timeout = "10s" + # timeout = "10s" ## ordered controls whether or not the metrics need to stay in the same order ## this plugin received them in. If false, this plugin will change the order @@ -39,12 +51,12 @@ ## waiting on slower lookups. This may cause issues for you if you are ## depending on the order of metrics staying the same. If so, set this to true. ## Keeping the metrics ordered may be slightly slower. - ordered = false + # ordered = false ## max_parallel_calls is the maximum number of AWS API calls to be in flight ## at the same time. ## It's probably best to keep this number fairly low. - max_parallel_calls = 10 + # max_parallel_calls = 10 ## cache_ttl determines how long each cached item will remain in the cache before ## it is removed and subsequently needs to be queried for from the AWS API. By