Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AWS] [EC2] enrich events with EC2 tags with add_cloud_metadata processor #41477

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]

*Libbeat*

- enrich events with EC2 tags in add_cloud_metadata processor {pull}41477[41477]


*Heartbeat*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ examples for each of the supported providers.

_AWS_

Metadata given below are extracted from https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instance-identity-documents.html[instance identity document],

[source,json]
-------------------------------------------------------------------------------
{
Expand All @@ -98,6 +100,22 @@ _AWS_
}
-------------------------------------------------------------------------------

If the EC2 instance has IMDS enabled and if tags are allowed through IMDS endpoint, the processor will further append tags in metadata.
Please refer official documentation on https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html[IMDS endpoint] for further details.

[source,json]
-------------------------------------------------------------------------------
{
"aws": {
"tags": {
"org" : "myOrg",
"owner": "userID"
}
}
}
-------------------------------------------------------------------------------


_Digital Ocean_

[source,json]
Expand Down
137 changes: 114 additions & 23 deletions libbeat/processors/add_cloud_metadata/provider_aws_ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ package add_cloud_metadata
import (
"context"
"fmt"
"io"
"net/http"
"strings"

"github.com/elastic/elastic-agent-libs/logp"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
awscfg "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials/ec2rolecreds"
"github.com/aws/aws-sdk-go-v2/feature/ec2/imds"
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/aws/aws-sdk-go-v2/service/ec2/types"
Expand All @@ -35,7 +38,14 @@ import (
conf "github.com/elastic/elastic-agent-libs/config"
)

const (
eksClusterNameTagKey = "eks:cluster-name"
tagsCategory = "tags/instance"
tagPrefix = "aws.tags"
)

type IMDSClient interface {
ec2rolecreds.GetMetadataAPIClient
GetInstanceIdentityDocument(ctx context.Context, params *imds.GetInstanceIdentityDocumentInput, optFns ...func(*imds.Options)) (*imds.GetInstanceIdentityDocumentOutput, error)
}

Expand Down Expand Up @@ -90,30 +100,17 @@ func fetchRawProviderMetadata(
result.err = fmt.Errorf("failed loading AWS default configuration: %w", err)
return
}
awsClient := NewIMDSClient(awsConfig)

instanceIdentity, err := awsClient.GetInstanceIdentityDocument(context.TODO(), &imds.GetInstanceIdentityDocumentInput{})
imdsClient := NewIMDSClient(awsConfig)
instanceIdentity, err := imdsClient.GetInstanceIdentityDocument(ctx, &imds.GetInstanceIdentityDocumentInput{})
if err != nil {
result.err = fmt.Errorf("failed fetching EC2 Identity Document: %w", err)
return
}

// AWS Region must be set to be able to get EC2 Tags
awsRegion := instanceIdentity.InstanceIdentityDocument.Region
awsConfig.Region = awsRegion
accountID := instanceIdentity.InstanceIdentityDocument.AccountID

clusterName, err := fetchEC2ClusterNameTag(awsConfig, instanceIdentity.InstanceIdentityDocument.InstanceID)
if err != nil {
logger.Warnf("error fetching cluster name metadata: %s.", err)
} else if clusterName != "" {
// for AWS cluster ID is used cluster ARN: arn:partition:service:region:account-id:resource-type/resource-id, example:
// arn:aws:eks:us-east-2:627286350134:cluster/cluster-name
clusterARN := fmt.Sprintf("arn:aws:eks:%s:%s:cluster/%v", awsRegion, accountID, clusterName)

_, _ = result.metadata.Put("orchestrator.cluster.id", clusterARN)
_, _ = result.metadata.Put("orchestrator.cluster.name", clusterName)
}
instanceID := instanceIdentity.InstanceIdentityDocument.InstanceID

_, _ = result.metadata.Put("cloud.instance.id", instanceIdentity.InstanceIdentityDocument.InstanceID)
_, _ = result.metadata.Put("cloud.machine.type", instanceIdentity.InstanceIdentityDocument.InstanceType)
Expand All @@ -122,10 +119,106 @@ func fetchRawProviderMetadata(
_, _ = result.metadata.Put("cloud.account.id", accountID)
_, _ = result.metadata.Put("cloud.image.id", instanceIdentity.InstanceIdentityDocument.ImageID)

// AWS Region must be set to be able to get EC2 Tags
awsConfig.Region = awsRegion
tags := getTags(ctx, imdsClient, NewEC2Client(awsConfig), instanceID, logger)

if tags[eksClusterNameTagKey] != "" {
// for AWS cluster ID is used cluster ARN: arn:partition:service:region:account-id:resource-type/resource-id, example:
// arn:aws:eks:us-east-2:627286350134:cluster/cluster-name
clusterARN := fmt.Sprintf("arn:aws:eks:%s:%s:cluster/%v", awsRegion, accountID, tags[eksClusterNameTagKey])

_, _ = result.metadata.Put("orchestrator.cluster.id", clusterARN)
_, _ = result.metadata.Put("orchestrator.cluster.name", tags[eksClusterNameTagKey])
}

if len(tags) == 0 {
return
}

logger.Infof("Adding retrieved tags with key: %s", tagPrefix)
for k, v := range tags {
_, _ = result.metadata.Put(fmt.Sprintf("%s.%s", tagPrefix, k), v)
}
}

// getTags is a helper to extract EC2 tags. Internally it utilize multiple extraction methods.
func getTags(ctx context.Context, imdsClient IMDSClient, ec2Client EC2Client, instanceId string, logger *logp.Logger) map[string]string {
logger.Info("Extracting EC2 tags from IMDS endpoint")
tags, ok := getTagsFromIMDS(ctx, imdsClient, logger)
if ok {
return tags
}

logger.Info("Tag extraction from IMDS failed, fallback to DescribeTags API to obtain EKS cluster name.")
clusterName, err := clusterNameFromDescribeTag(ctx, ec2Client, instanceId)
if err != nil {
logger.Warnf("error obtaining cluster name: %v.", err)
return tags
}

if clusterName != "" {
tags[eksClusterNameTagKey] = clusterName
}
return tags
}

// getTagsFromIMDS is a helper to extract EC2 tags using instance metadata service.
// Note that this call could get throttled and currently does not implement a retry mechanism.
// See - https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/instancedata-data-retrieval.html#instancedata-throttling
func getTagsFromIMDS(ctx context.Context, client IMDSClient, logger *logp.Logger) (tags map[string]string, ok bool) {
tags = make(map[string]string)

b, err := getMetadataHelper(ctx, client, tagsCategory, logger)
if err != nil {
logger.Warnf("error obtaining tags category: %v", err)
return tags, false
}

for _, tag := range strings.Split(string(b), "\n") {
tagPath := fmt.Sprintf("%s/%s", tagsCategory, tag)
b, err := getMetadataHelper(ctx, client, tagPath, logger)
if err != nil {
logger.Warnf("error extracting tag value of %s: %v", tag, err)
return tags, false
}

tagValue := string(b)
if tagValue == "" {
logger.Infof("Ignoring tag key %s as value is empty", tag)
continue
}

tags[tag] = tagValue
}

return tags, true
}

// getMetadataHelper performs the IMDS call for the given path and returns the response content after closing the underlying content reader.
func getMetadataHelper(ctx context.Context, client IMDSClient, path string, logger *logp.Logger) (content []byte, err error) {
metadata, err := client.GetMetadata(ctx, &imds.GetMetadataInput{Path: path})
if err != nil {
return nil, fmt.Errorf("error from IMDS metadata request: %w", err)
}

defer func(Content io.ReadCloser) {
err := Content.Close()
if err != nil {
logger.Warnf("error closing IMDS metadata response body: %v", err)
}
}(metadata.Content)

content, err = io.ReadAll(metadata.Content)
if err != nil {
return nil, fmt.Errorf("error extracting metadata from the IMDS response: %w", err)
}

return content, nil
}

func fetchEC2ClusterNameTag(awsConfig awssdk.Config, instanceID string) (string, error) {
svc := NewEC2Client(awsConfig)
// clusterNameFromDescribeTag is a helper to extract EKS cluster name using DescribeTag.
func clusterNameFromDescribeTag(ctx context.Context, ec2Client EC2Client, instanceID string) (string, error) {
input := &ec2.DescribeTagsInput{
Filters: []types.Filter{
{
Expand All @@ -135,15 +228,13 @@ func fetchEC2ClusterNameTag(awsConfig awssdk.Config, instanceID string) (string,
},
},
{
Name: awssdk.String("key"),
Values: []string{
"eks:cluster-name",
},
Name: awssdk.String("key"),
Values: []string{eksClusterNameTagKey},
},
},
}

tagsResult, err := svc.DescribeTags(context.TODO(), input)
tagsResult, err := ec2Client.DescribeTags(ctx, input)
if err != nil {
return "", fmt.Errorf("error fetching EC2 Tags: %w", err)
}
Expand Down
Loading
Loading