From c37159e1d96c8077642e2a85da248363a8c8ea1c Mon Sep 17 00:00:00 2001 From: Michael Bischoff Date: Fri, 13 Sep 2024 16:28:37 +0200 Subject: [PATCH] OOTB support AWS Eventbridge (#40006) Adding code to OOTB support AWS Eventbridge generated events for S3 changes, see https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventBridge.html --- CHANGELOG.next.asciidoc | 3 +- .../docs/inputs/input-aws-s3.asciidoc | 7 + .../awss3/_meta/terraform/.terraform.lock.hcl | 3 + .../input/awss3/_meta/terraform/main.tf | 71 ++++++++++ .../input/awss3/_meta/terraform/outputs.tf | 2 + .../input/awss3/input_benchmark_test.go | 47 ++++--- .../input/awss3/input_integration_test.go | 121 +++++++++++++----- x-pack/filebeat/input/awss3/sqs_s3_event.go | 67 +++++++++- .../filebeat/input/awss3/sqs_s3_event_test.go | 44 +++++-- x-pack/filebeat/input/awss3/sqs_test.go | 37 ++++-- 10 files changed, 323 insertions(+), 79 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8a97739a541..1588d180d35 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -278,7 +278,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Allow elision of set and append failure logging. {issue}34544[34544] {pull}39929[39929] - Add ability to remove request trace logs from CEL input. {pull}39969[39969] - Add ability to remove request trace logs from HTTPJSON input. {pull}40003[40003] -- Update CEL mito extensions to v1.13.0. {pull}40035[40035] +- Added out of the box support for Amazon EventBridge notifications over SQS to S3 input {pull}40006[40006] +- Update CEL mito extensions to v1.13.0 {pull}40035[40035] - Add Jamf entity analytics provider. {pull}39996[39996] - Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005] - Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004] diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index a4d9ecd856c..9fa11872151 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -702,6 +702,13 @@ Please see https://docs.aws.amazon.com/AmazonS3/latest/userguide/ways-to-add-not for more details. SQS queue will be configured as a https://docs.aws.amazon.com/sns/latest/dg/sns-sqs-as-subscriber.html[subscriber to the SNS topic]. +[float] +=== S3 -> EventBridge -> SQS setup +Amazon S3 can alternatively https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventBridge.html[send events to EventBridge], +which can then be used to route these events to SQS. While the S3 input will +filter for 'Object Created' events it's more efficient to configure EventBridge +to only forward the 'Object Created' events. + [float] === Parallel Processing diff --git a/x-pack/filebeat/input/awss3/_meta/terraform/.terraform.lock.hcl b/x-pack/filebeat/input/awss3/_meta/terraform/.terraform.lock.hcl index 9814a954d21..0f717b2d3e4 100644 --- a/x-pack/filebeat/input/awss3/_meta/terraform/.terraform.lock.hcl +++ b/x-pack/filebeat/input/awss3/_meta/terraform/.terraform.lock.hcl @@ -5,6 +5,7 @@ provider "registry.terraform.io/hashicorp/aws" { version = "4.46.0" constraints = "4.46.0" hashes = [ + "h1:EZB4OgvytV38JpWyye9zoMQ0bfT9yB9xSXM5NY3Lrws=", "h1:m7RCtncaQbSD9VhNTX2xbuZY3TlYnUrluvmYZeYHb1s=", "zh:1678e6a4bdb3d81a6713adc62ca0fdb8250c584e10c10d1daca72316e9db8df2", "zh:329903acf86ef6072502736dff4c43c2b50f762a958f76aa924e2d74c7fca1e3", @@ -28,6 +29,7 @@ provider "registry.terraform.io/hashicorp/local" { version = "2.2.3" hashes = [ "h1:FvRIEgCmAezgZUqb2F+PZ9WnSSnR5zbEM2ZI+GLmbMk=", + "h1:aWp5iSUxBGgPv1UnV5yag9Pb0N+U1I0sZb38AXBFO8A=", "zh:04f0978bb3e052707b8e82e46780c371ac1c66b689b4a23bbc2f58865ab7d5c0", "zh:6484f1b3e9e3771eb7cc8e8bab8b35f939a55d550b3f4fb2ab141a24269ee6aa", "zh:78a56d59a013cb0f7eb1c92815d6eb5cf07f8b5f0ae20b96d049e73db915b238", @@ -47,6 +49,7 @@ provider "registry.terraform.io/hashicorp/random" { version = "3.4.3" hashes = [ "h1:saZR+mhthL0OZl4SyHXZraxyaBNVMxiZzks78nWcZ2o=", + "h1:xZGZf18JjMS06pFa4NErzANI98qi59SEcBsOcS2P2yQ=", "zh:41c53ba47085d8261590990f8633c8906696fa0a3c4b384ff6a7ecbf84339752", "zh:59d98081c4475f2ad77d881c4412c5129c56214892f490adf11c7e7a5a47de9b", "zh:686ad1ee40b812b9e016317e7f34c0d63ef837e084dea4a1f578f64a6314ad53", diff --git a/x-pack/filebeat/input/awss3/_meta/terraform/main.tf b/x-pack/filebeat/input/awss3/_meta/terraform/main.tf index 164b14f93ca..2b825274990 100644 --- a/x-pack/filebeat/input/awss3/_meta/terraform/main.tf +++ b/x-pack/filebeat/input/awss3/_meta/terraform/main.tf @@ -147,3 +147,74 @@ resource "aws_sns_topic_subscription" "filebeat-integtest-sns" { protocol = "sqs" endpoint = aws_sqs_queue.filebeat-integtest-sns.arn } + +resource "aws_s3_bucket" "filebeat-integtest-eventbridge" { + bucket = "filebeat-s3-integtest-eventbridge-${random_string.random.result}" + force_destroy = true +} + +resource "aws_sqs_queue" "filebeat-integtest-eventbridge" { + name = "filebeat-s3-integtest-eventbridge-${random_string.random.result}" +} + +data "aws_iam_policy_document" "sqs_queue_policy" { + statement { + effect = "Allow" + actions = ["sqs:SendMessage"] + + principals { + type = "Service" + identifiers = ["events.amazonaws.com"] + } + + resources = [aws_sqs_queue.filebeat-integtest-eventbridge.arn] + } +} + +resource "aws_sqs_queue_policy" "filebeat-integtest-eventbridge" { + queue_url = aws_sqs_queue.filebeat-integtest-eventbridge.id + policy = data.aws_iam_policy_document.sqs_queue_policy.json +} + +resource "aws_cloudwatch_event_rule" "sqs" { + name = "capture-s3-notification" + description = "Capture s3 changes" + + event_pattern = jsonencode({ + source = [ + "aws.s3" + ], + detail-type = [ + "Object Created" + ] + detail = { + bucket = { + name = [ aws_s3_bucket.filebeat-integtest-eventbridge.id ] + } + } + }) + + depends_on = [ + aws_s3_bucket.filebeat-integtest-eventbridge + ] +} + +resource "aws_cloudwatch_event_target" "sqs" { + rule = aws_cloudwatch_event_rule.sqs.name + target_id = "SendToSQS" + arn = aws_sqs_queue.filebeat-integtest-eventbridge.arn + + depends_on = [ + aws_cloudwatch_event_rule.sqs + ] +} + +resource "aws_s3_bucket_notification" "bucket_notification-eventbridge" { + bucket = aws_s3_bucket.filebeat-integtest-eventbridge.id + eventbridge = true + + depends_on = [ + aws_cloudwatch_event_target.sqs + ] +} + diff --git a/x-pack/filebeat/input/awss3/_meta/terraform/outputs.tf b/x-pack/filebeat/input/awss3/_meta/terraform/outputs.tf index e95983a7237..f197d69b66b 100644 --- a/x-pack/filebeat/input/awss3/_meta/terraform/outputs.tf +++ b/x-pack/filebeat/input/awss3/_meta/terraform/outputs.tf @@ -5,6 +5,8 @@ resource "local_file" "secrets" { "bucket_name" : aws_s3_bucket.filebeat-integtest.id "bucket_name_for_sns" : aws_s3_bucket.filebeat-integtest-sns.id "queue_url_for_sns" : aws_sqs_queue.filebeat-integtest-sns.url + "bucket_name_for_eventbridge" : aws_s3_bucket.filebeat-integtest-eventbridge.id + "queue_url_for_eventbridge" : aws_sqs_queue.filebeat-integtest-eventbridge.url }) filename = "${path.module}/outputs.yml" file_permission = "0644" diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go index 12837c410d2..0d7d79b615b 100644 --- a/x-pack/filebeat/input/awss3/input_benchmark_test.go +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -46,27 +46,29 @@ type constantSQS struct { var _ sqsAPI = (*constantSQS)(nil) -func newConstantSQS() *constantSQS { - return &constantSQS{ - msgs: []sqsTypes.Message{ - newSQSMessage(newS3Event(filepath.Base(cloudtrailTestFile))), - }, +func newConstantSQS() (*constantSQS, error) { + event, err := newSQSMessage(newS3Event(filepath.Base(cloudtrailTestFile))) + if err != nil { + return nil, err } + return &constantSQS{ + msgs: []sqsTypes.Message{event}, + }, nil } -func (c *constantSQS) ReceiveMessage(ctx context.Context, maxMessages int) ([]sqsTypes.Message, error) { +func (c *constantSQS) ReceiveMessage(context.Context, int) ([]sqsTypes.Message, error) { return c.msgs, nil } -func (*constantSQS) DeleteMessage(ctx context.Context, msg *sqsTypes.Message) error { +func (*constantSQS) DeleteMessage(context.Context, *sqsTypes.Message) error { return nil } -func (*constantSQS) ChangeMessageVisibility(ctx context.Context, msg *sqsTypes.Message, timeout time.Duration) error { +func (*constantSQS) ChangeMessageVisibility(context.Context, *sqsTypes.Message, time.Duration) error { return nil } -func (c *constantSQS) GetQueueAttributes(ctx context.Context, attr []sqsTypes.QueueAttributeName) (map[string]string, error) { +func (c *constantSQS) GetQueueAttributes(context.Context, []sqsTypes.QueueAttributeName) (map[string]string, error) { return map[string]string{}, nil } @@ -84,7 +86,7 @@ func (c *s3PagerConstant) HasMorePages() bool { return c.currentIndex < len(c.objects) } -func (c *s3PagerConstant) NextPage(ctx context.Context, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) { +func (c *s3PagerConstant) NextPage(context.Context, ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) { if !c.HasMorePages() { return nil, errors.New("no more pages") } @@ -143,19 +145,19 @@ func newConstantS3(t testing.TB) *constantS3 { } } -func (c constantS3) GetObject(ctx context.Context, _, bucket, key string) (*s3.GetObjectOutput, error) { +func (c constantS3) GetObject(context.Context, string, string, string) (*s3.GetObjectOutput, error) { return newS3GetObjectResponse(c.filename, c.data, c.contentType), nil } -func (c constantS3) CopyObject(ctx context.Context, _, from_bucket, to_bucket, from_key, to_key string) (*s3.CopyObjectOutput, error) { +func (c constantS3) CopyObject(context.Context, string, string, string, string, string) (*s3.CopyObjectOutput, error) { return nil, nil } -func (c constantS3) DeleteObject(ctx context.Context, _, bucket, key string) (*s3.DeleteObjectOutput, error) { +func (c constantS3) DeleteObject(context.Context, string, string, string) (*s3.DeleteObjectOutput, error) { return nil, nil } -func (c constantS3) ListObjectsPaginator(bucket, prefix string) s3Pager { +func (c constantS3) ListObjectsPaginator(string, string) s3Pager { return c.pagerConstant } @@ -164,7 +166,7 @@ var _ beat.Pipeline = (*fakePipeline)(nil) // fakePipeline returns new ackClients. type fakePipeline struct{} -func (c *fakePipeline) ConnectWith(clientConfig beat.ClientConfig) (beat.Client, error) { +func (c *fakePipeline) ConnectWith(beat.ClientConfig) (beat.Client, error) { return &ackClient{}, nil } @@ -211,12 +213,13 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR var err error pipeline := &fakePipeline{} - conf := makeBenchmarkConfig(t) - conf.MaxNumberOfMessages = maxMessagesInflight - sqsReader := newSQSReaderInput(conf, aws.Config{}) + config := makeBenchmarkConfig(t) + config.MaxNumberOfMessages = maxMessagesInflight + sqsReader := newSQSReaderInput(config, aws.Config{}) sqsReader.log = log.Named("sqs") sqsReader.metrics = newInputMetrics("test_id", monitoring.NewRegistry(), maxMessagesInflight) - sqsReader.sqs = newConstantSQS() + sqsReader.sqs, err = newConstantSQS() + require.NoError(t, err) sqsReader.s3 = newConstantS3(t) sqsReader.msgHandler, err = sqsReader.createEventProcessor(pipeline) require.NoError(t, err, "createEventProcessor must succeed") @@ -252,7 +255,8 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR } func TestBenchmarkInputSQS(t *testing.T) { - logp.TestingSetup(logp.WithLevel(logp.InfoLevel)) + err := logp.TestingSetup(logp.WithLevel(logp.InfoLevel)) + require.NoError(t, err) results := []testing.BenchmarkResult{ benchmarkInputSQS(t, 1), @@ -388,7 +392,8 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult } func TestBenchmarkInputS3(t *testing.T) { - logp.TestingSetup(logp.WithLevel(logp.InfoLevel)) + err := logp.TestingSetup(logp.WithLevel(logp.InfoLevel)) + require.NoError(t, err) results := []testing.BenchmarkResult{ benchmarkInputS3(t, 1), diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index af488505d0e..88d81a9f0c8 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -50,6 +50,8 @@ type terraformOutputData struct { QueueURL string `yaml:"queue_url"` BucketNameForSNS string `yaml:"bucket_name_for_sns"` QueueURLForSNS string `yaml:"queue_url_for_sns"` + BucketNameForEB string `yaml:"bucket_name_for_eventbridge"` + QueueURLForEB string `yaml:"queue_url_for_eventbridge"` } func getTerraformOutputs(t *testing.T, isLocalStack bool) terraformOutputData { @@ -255,16 +257,16 @@ func TestInputRunSQSOnLocalstack(t *testing.T) { t.Fatal(err) } - assert.EqualValues(t, s3Input.metrics.sqsMessagesReceivedTotal.Get(), 8) // S3 could batch notifications. - assert.EqualValues(t, s3Input.metrics.sqsMessagesInflight.Get(), 0) - assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7) - assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. - assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) - assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) - assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 8) - assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), uint64(0x13)) - assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0) - assert.EqualValues(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end + assert.EqualValues(t, 8, s3Input.metrics.sqsMessagesReceivedTotal.Get()) // S3 could batch notifications. + assert.EqualValues(t, 0, s3Input.metrics.sqsMessagesInflight.Get()) + assert.EqualValues(t, 7, s3Input.metrics.sqsMessagesDeletedTotal.Get()) + assert.EqualValues(t, 1, s3Input.metrics.sqsMessagesReturnedTotal.Get()) // Invalid JSON is returned so that it can eventually be DLQed. + assert.EqualValues(t, 0, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get()) + assert.EqualValues(t, 0, s3Input.metrics.s3ObjectsInflight.Get()) + assert.EqualValues(t, 8, s3Input.metrics.s3ObjectsRequestedTotal.Get()) + assert.EqualValues(t, uint64(0x13), s3Input.metrics.s3EventsCreatedTotal.Get()) + assert.Greater(t, 0.0, s3Input.metrics.sqsLagTime.Mean()) + assert.EqualValues(t, 0.0, s3Input.metrics.sqsWorkerUtilization.Get()) // Workers are reset after processing and hence utilization should be 0 at the end } func TestInputRunSQS(t *testing.T) { @@ -306,16 +308,16 @@ func TestInputRunSQS(t *testing.T) { t.Fatal(err) } - assert.EqualValues(t, sqsInput.metrics.sqsMessagesReceivedTotal.Get(), 8) // S3 could batch notifications. - assert.EqualValues(t, sqsInput.metrics.sqsMessagesInflight.Get(), 0) - assert.EqualValues(t, sqsInput.metrics.sqsMessagesDeletedTotal.Get(), 7) - assert.EqualValues(t, sqsInput.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. - assert.EqualValues(t, sqsInput.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) - assert.EqualValues(t, sqsInput.metrics.s3ObjectsInflight.Get(), 0) - assert.EqualValues(t, sqsInput.metrics.s3ObjectsRequestedTotal.Get(), 7) - assert.EqualValues(t, sqsInput.metrics.s3EventsCreatedTotal.Get(), 12) + assert.EqualValues(t, 8, sqsInput.metrics.sqsMessagesReceivedTotal.Get()) // S3 could batch notifications. + assert.EqualValues(t, 0, sqsInput.metrics.sqsMessagesInflight.Get()) + assert.EqualValues(t, 7, sqsInput.metrics.sqsMessagesDeletedTotal.Get()) + assert.EqualValues(t, 1, sqsInput.metrics.sqsMessagesReturnedTotal.Get()) // Invalid JSON is returned so that it can eventually be DLQed. + assert.EqualValues(t, 0, sqsInput.metrics.sqsVisibilityTimeoutExtensionsTotal.Get()) + assert.EqualValues(t, 0, sqsInput.metrics.s3ObjectsInflight.Get()) + assert.EqualValues(t, 7, sqsInput.metrics.s3ObjectsRequestedTotal.Get()) + assert.EqualValues(t, 12, sqsInput.metrics.s3EventsCreatedTotal.Get()) assert.Greater(t, sqsInput.metrics.sqsLagTime.Mean(), 0.0) - assert.EqualValues(t, sqsInput.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end + assert.EqualValues(t, 0.0, sqsInput.metrics.sqsWorkerUtilization.Get()) // Workers are reset after processing and hence utilization should be 0 at the end } func TestInputRunS3(t *testing.T) { @@ -354,12 +356,12 @@ func TestInputRunS3(t *testing.T) { t.Fatal(err) } - assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) - assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) - assert.EqualValues(t, s3Input.metrics.s3ObjectsListedTotal.Get(), 8) - assert.EqualValues(t, s3Input.metrics.s3ObjectsProcessedTotal.Get(), 7) - assert.EqualValues(t, s3Input.metrics.s3ObjectsAckedTotal.Get(), 7) - assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) + assert.EqualValues(t, 0, s3Input.metrics.s3ObjectsInflight.Get()) + assert.EqualValues(t, 7, s3Input.metrics.s3ObjectsRequestedTotal.Get()) + assert.EqualValues(t, 8, s3Input.metrics.s3ObjectsListedTotal.Get()) + assert.EqualValues(t, 7, s3Input.metrics.s3ObjectsProcessedTotal.Get()) + assert.EqualValues(t, 7, s3Input.metrics.s3ObjectsAckedTotal.Get()) + assert.EqualValues(t, 12, s3Input.metrics.s3EventsCreatedTotal.Get()) } func uploadS3TestFiles(t *testing.T, region, bucket string, s3Client *s3.Client, filenames ...string) { @@ -548,14 +550,65 @@ func TestInputRunSNS(t *testing.T) { t.Fatal(err) } - assert.EqualValues(t, sqsInput.metrics.sqsMessagesReceivedTotal.Get(), 8) // S3 could batch notifications. - assert.EqualValues(t, sqsInput.metrics.sqsMessagesInflight.Get(), 0) - assert.EqualValues(t, sqsInput.metrics.sqsMessagesDeletedTotal.Get(), 7) - assert.EqualValues(t, sqsInput.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. - assert.EqualValues(t, sqsInput.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) - assert.EqualValues(t, sqsInput.metrics.s3ObjectsInflight.Get(), 0) - assert.EqualValues(t, sqsInput.metrics.s3ObjectsRequestedTotal.Get(), 7) - assert.EqualValues(t, sqsInput.metrics.s3EventsCreatedTotal.Get(), 12) + assert.EqualValues(t, 8, sqsInput.metrics.sqsMessagesReceivedTotal.Get()) // S3 could batch notifications. + assert.EqualValues(t, 0, sqsInput.metrics.sqsMessagesInflight.Get()) + assert.EqualValues(t, 7, sqsInput.metrics.sqsMessagesDeletedTotal.Get()) + assert.EqualValues(t, 1, sqsInput.metrics.sqsMessagesReturnedTotal.Get()) // Invalid JSON is returned so that it can eventually be DLQed. + assert.EqualValues(t, 0, sqsInput.metrics.sqsVisibilityTimeoutExtensionsTotal.Get()) + assert.EqualValues(t, 0, sqsInput.metrics.s3ObjectsInflight.Get()) + assert.EqualValues(t, 7, sqsInput.metrics.s3ObjectsRequestedTotal.Get()) + assert.EqualValues(t, 12, sqsInput.metrics.s3EventsCreatedTotal.Get()) assert.Greater(t, sqsInput.metrics.sqsLagTime.Mean(), 0.0) - assert.EqualValues(t, sqsInput.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end + assert.EqualValues(t, 0.0, sqsInput.metrics.sqsWorkerUtilization.Get()) // Workers are reset after processing and hence utilization should be 0 at the end +} + +func TestInputRunEventbridgeSQS(t *testing.T) { + logp.TestingSetup() + + // Terraform is used to set up S3 and SQS and must be executed manually. + tfConfig := getTerraformOutputs(t, false) + awsCfg := makeAWSConfig(t, tfConfig.AWSRegion) + + // Ensure SQS is empty before testing. + drainSQS(t, tfConfig.AWSRegion, tfConfig.BucketNameForEB, awsCfg) + + s3Client := s3.NewFromConfig(awsCfg) + uploadS3TestFiles(t, tfConfig.AWSRegion, tfConfig.BucketNameForEB, s3Client, + "testdata/events-array.json", + "testdata/invalid.json", + "testdata/log.json", + "testdata/log.ndjson", + "testdata/multiline.json", + "testdata/multiline.json.gz", + "testdata/multiline.txt", + "testdata/log.txt", // Skipped (no match). + ) + + sqsInput := createSQSInput(t, makeTestConfigSQS(tfConfig.QueueURLForEB)) + + inputCtx, cancel := newV2Context() + t.Cleanup(cancel) + time.AfterFunc(15*time.Second, func() { + cancel() + }) + + var errGroup errgroup.Group + errGroup.Go(func() error { + return sqsInput.Run(inputCtx, &fakePipeline{}) + }) + + if err := errGroup.Wait(); err != nil { + t.Fatal(err) + } + + assert.EqualValues(t, 8, sqsInput.metrics.sqsMessagesReceivedTotal.Get()) // S3 could batch notifications. + assert.EqualValues(t, 0, sqsInput.metrics.sqsMessagesInflight.Get()) + assert.EqualValues(t, 7, sqsInput.metrics.sqsMessagesDeletedTotal.Get()) + assert.EqualValues(t, 1, sqsInput.metrics.sqsMessagesReturnedTotal.Get()) // Invalid JSON is returned so that it can eventually be DLQed. + assert.EqualValues(t, 0, sqsInput.metrics.sqsVisibilityTimeoutExtensionsTotal.Get()) + assert.EqualValues(t, 0, sqsInput.metrics.s3ObjectsInflight.Get()) + assert.EqualValues(t, 7, sqsInput.metrics.s3ObjectsRequestedTotal.Get()) + assert.EqualValues(t, 12, sqsInput.metrics.s3EventsCreatedTotal.Get()) + assert.Greater(t, sqsInput.metrics.sqsLagTime.Mean(), 0.0) + assert.EqualValues(t, 0.0, sqsInput.metrics.sqsWorkerUtilization.Get()) // Workers are reset after processing and hence utilization should be 0 at the end } diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event.go b/x-pack/filebeat/input/awss3/sqs_s3_event.go index db893e443ac..a489f6a7f72 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event.go @@ -44,7 +44,7 @@ func (e *nonRetryableError) Error() string { } func (e *nonRetryableError) Is(err error) bool { - _, ok := err.(*nonRetryableError) //nolint:errorlint // This is not used directly to detected wrapped errors (errors.Is handles unwrapping). + _, ok := err.(*nonRetryableError) // This is not used directly to detected wrapped errors (errors.Is handles unwrapping). return ok } @@ -83,6 +83,39 @@ type s3EventV2 struct { } `json:"s3"` } +// eventBridgeEvents is 'Object Created' payload generated by AWS EventBridge +// At the moment it doesn't seem to have a version +// https://docs.aws.amazon.com/AmazonS3/latest/userguide/ev-events.html +// https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventBridge.html +// Object created event. +type eventBridgeEvent struct { + Version string `json:"version"` + Id string `json:"id"` + DetailType string `json:"detail-type"` + Source string `json:"source"` + Account string `json:"account"` + Time string `json:"time"` + Region string `json:"region"` + Resources []string `json:"resources"` + Detail struct { + Version string `json:"version"` + Bucket struct { + Name string `json:"name"` + } + Object struct { + Key string `json:"key"` + Size int `json:"size"` + Etag string `json:"etag"` + VersionId string `json:"version-id"` + Sequencer string `json:"sequencer"` + } + RequestId string `json:"request-id"` + Requester string `json:"requester"` + SourceIpAddress string `json:"source-ip-address"` + Reason string `json:"reason"` + } `json:"detail"` +} + type sqsS3EventProcessor struct { s3ObjectHandler s3ObjectHandlerFactory sqsVisibilityTimeout time.Duration @@ -252,6 +285,17 @@ func (p *sqsS3EventProcessor) getS3Notifications(body string) ([]s3EventV2, erro } } + // Check if the notification is from S3 -> EventBridge -> SQS + if events.Records == nil { + var eventBridgeEvent eventBridgeEvent + dec := json.NewDecoder(strings.NewReader(body)) + if err := dec.Decode(&eventBridgeEvent); err != nil { + p.log.Debugw("Could not parse message as EventBridge payload", "sqs_message_body", body, "error", err) + } else { + convertEventBridge(&eventBridgeEvent, &events) + } + } + if events.Records == nil { p.log.Debugw("Invalid SQS message body: missing Records field", "sqs_message_body", body) return nil, errors.New("the message is an invalid S3 notification: missing Records field") @@ -260,6 +304,27 @@ func (p *sqsS3EventProcessor) getS3Notifications(body string) ([]s3EventV2, erro return p.getS3Info(events) } +func convertEventBridge(eventBridgeEvent *eventBridgeEvent, s3Events *s3EventsV2) { + for _, resource := range eventBridgeEvent.Resources { + s3Events.Records = append(s3Events.Records, convertEventBridgeEvent(resource, eventBridgeEvent)) + } +} + +func convertEventBridgeEvent(resource string, message *eventBridgeEvent) s3EventV2 { + var event = s3EventV2{} + if message.DetailType == "Object Created" { + event.SetEventName("ObjectCreated:Put") + } + event.SetS3BucketARN(resource) + event.SetAWSRegion(message.Region) + if message.Source == "aws.s3" { + event.SetEventSource("aws:s3") + } + event.SetS3BucketName(message.Detail.Bucket.Name) + event.SetS3ObjectKey(message.Detail.Object.Key) + return event +} + func (p *sqsS3EventProcessor) getS3Info(events s3EventsV2) ([]s3EventV2, error) { out := make([]s3EventV2, 0, len(events.Records)) for _, record := range events.Records { diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go index 65552525136..92401fe45ee 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go @@ -28,9 +28,10 @@ import ( ) func TestSQSS3EventProcessor(t *testing.T) { - logp.TestingSetup() - - msg := newSQSMessage(newS3Event("log.json")) + err := logp.TestingSetup() + require.NoError(t, err) + msg, err := newSQSMessage(newS3Event("log.json")) + require.NoError(t, err) t.Run("s3 events are processed and sqs msg is deleted", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), testTimeout) @@ -64,7 +65,9 @@ func TestSQSS3EventProcessor(t *testing.T) { mockS3HandlerFactory := NewMockS3ObjectHandlerFactory(ctrl) mockBeatPipeline := NewMockBeatPipeline(ctrl) - invalidBodyMsg := newSQSMessage(newS3Event("log.json")) + invalidBodyMsg, err := newSQSMessage(newS3Event("log.json")) + require.NoError(t, err) + body := *invalidBodyMsg.Body body = body[10:] invalidBodyMsg.Body = &body @@ -74,7 +77,7 @@ func TestSQSS3EventProcessor(t *testing.T) { ) p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, mockAPI, nil, time.Minute, 5, mockBeatPipeline, mockS3HandlerFactory) - err := p.ProcessSQS(ctx, &invalidBodyMsg) + err = p.ProcessSQS(ctx, &invalidBodyMsg) require.Error(t, err) t.Log(err) }) @@ -89,7 +92,8 @@ func TestSQSS3EventProcessor(t *testing.T) { mockS3HandlerFactory := NewMockS3ObjectHandlerFactory(ctrl) mockBeatPipeline := NewMockBeatPipeline(ctrl) - emptyRecordsMsg := newSQSMessage([]s3EventV2{}...) + emptyRecordsMsg, err := newSQSMessage([]s3EventV2{}...) + require.NoError(t, err) gomock.InOrder( mockAPI.EXPECT().DeleteMessage(gomock.Any(), gomock.Eq(&emptyRecordsMsg)).Return(nil), @@ -189,7 +193,8 @@ func TestSQSS3EventProcessor(t *testing.T) { } func TestSqsProcessor_keepalive(t *testing.T) { - msg := newSQSMessage(newS3Event("log.json")) + msg, err := newSQSMessage(newS3Event("log.json")) + require.NoError(t, err) // Ensure both ReceiptHandleIsInvalid and InvalidParameterValue error codes trigger stops. // See https://github.com/elastic/beats/issues/30675. @@ -237,13 +242,14 @@ func TestSqsProcessor_keepalive(t *testing.T) { } func TestSqsProcessor_getS3Notifications(t *testing.T) { - logp.TestingSetup() + err := logp.TestingSetup() + require.NoError(t, err) p := newSQSS3EventProcessor(logp.NewLogger(inputName), nil, nil, nil, time.Minute, 5, nil, nil) t.Run("s3 key is url unescaped", func(t *testing.T) { - msg := newSQSMessage(newS3Event("Happy+Face.jpg")) - + msg, err := newSQSMessage(newS3Event("Happy+Face.jpg")) + require.NoError(t, err) events, err := p.getS3Notifications(*msg.Body) require.NoError(t, err) assert.Len(t, events, 1) @@ -253,15 +259,27 @@ func TestSqsProcessor_getS3Notifications(t *testing.T) { t.Run("non-ObjectCreated event types are ignored", func(t *testing.T) { event := newS3Event("HappyFace.jpg") event.EventName = "ObjectRemoved:Delete" - msg := newSQSMessage(event) - + msg, err := newSQSMessage(event) + require.NoError(t, err) events, err := p.getS3Notifications(*msg.Body) require.NoError(t, err) assert.Len(t, events, 0) }) t.Run("sns-sqs notification", func(t *testing.T) { - msg := newSNSSQSMessage() + msg, err := newSNSSQSMessage() + require.NoError(t, err) + events, err := p.getS3Notifications(*msg.Body) + require.NoError(t, err) + assert.Len(t, events, 1) + assert.Equal(t, "test-object-key", events[0].S3.Object.Key) + assert.Equal(t, "arn:aws:s3:::vpc-flow-logs-ks", events[0].S3.Bucket.ARN) + assert.Equal(t, "vpc-flow-logs-ks", events[0].S3.Bucket.Name) + }) + + t.Run("EventBridge-sqs notification", func(t *testing.T) { + msg, err := newEventBridgeSQSMessage() + require.NoError(t, err) events, err := p.getS3Notifications(*msg.Body) require.NoError(t, err) assert.Len(t, events, 1) diff --git a/x-pack/filebeat/input/awss3/sqs_test.go b/x-pack/filebeat/input/awss3/sqs_test.go index 8ad01a032dc..fff17ebc1a6 100644 --- a/x-pack/filebeat/input/awss3/sqs_test.go +++ b/x-pack/filebeat/input/awss3/sqs_test.go @@ -17,6 +17,7 @@ import ( "github.com/gofrs/uuid/v5" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/elastic-agent-libs/logp" ) @@ -29,7 +30,8 @@ var ( ) func TestSQSReceiver(t *testing.T) { - logp.TestingSetup() + err := logp.TestingSetup() + require.NoError(t, err) const maxMessages = 5 @@ -41,7 +43,8 @@ func TestSQSReceiver(t *testing.T) { defer ctrl.Finish() mockSQS := NewMockSQSAPI(ctrl) mockMsgHandler := NewMockSQSProcessor(ctrl) - msg := newSQSMessage(newS3Event("log.json")) + msg, err := newSQSMessage(newS3Event("log.json")) + require.NoError(t, err) // Initial ReceiveMessage for maxMessages. mockSQS.EXPECT(). @@ -127,7 +130,8 @@ func TestSQSReceiver(t *testing.T) { } func TestGetApproximateMessageCount(t *testing.T) { - logp.TestingSetup() + err := logp.TestingSetup() + require.NoError(t, err) const count = 500 attrName := []types.QueueAttributeName{sqsApproximateNumberOfMessages} @@ -180,10 +184,10 @@ func TestGetApproximateMessageCount(t *testing.T) { }) } -func newSQSMessage(events ...s3EventV2) types.Message { +func newSQSMessage(events ...s3EventV2) (types.Message, error) { body, err := json.Marshal(s3EventsV2{Records: events}) if err != nil { - panic(err) + return types.Message{}, err } hash := sha256.Sum256(body) @@ -196,16 +200,16 @@ func newSQSMessage(events ...s3EventV2) types.Message { Body: &bodyStr, MessageId: &messageID, ReceiptHandle: &receipt, - } + }, nil } -func newSNSSQSMessage() types.Message { +func newSNSSQSMessage() (types.Message, error) { body, err := json.Marshal(s3EventsV2{ TopicArn: "arn:aws:sns:us-east-1:1234:sns-topic", Message: "{\"Records\":[{\"eventSource\":\"aws:s3\",\"awsRegion\":\"us-east-1\",\"eventName\":\"ObjectCreated:Put\",\"s3\":{\"configurationId\":\"sns-notification-vpc-flow-logs\",\"bucket\":{\"name\":\"vpc-flow-logs-ks\",\"arn\":\"arn:aws:s3:::vpc-flow-logs-ks\"},\"object\":{\"key\":\"test-object-key\"}}}]}", }) if err != nil { - panic(err) + return types.Message{}, err } hash := sha256.Sum256(body) @@ -218,7 +222,22 @@ func newSNSSQSMessage() types.Message { Body: &bodyStr, MessageId: &messageID, ReceiptHandle: &receipt, - } + }, nil +} + +func newEventBridgeSQSMessage() (types.Message, error) { + body := []byte("{ \"version\": \"0\", \"id\": \"f17994c0-7cb9-ee01-79fd-ae46df89e3a4\", \"detail-type\": \"Object Created\", \"source\": \"aws.s3\", \"account\": \"952856826737\", \"time\": \"2024-06-24T08:31:26Z\", \"region\": \"eu-west-1\", \"resources\": [\"arn:aws:s3:::vpc-flow-logs-ks\" ], \"detail\": {\"version\": \"0\",\"bucket\": { \"name\": \"vpc-flow-logs-ks\"},\"object\": { \"key\": \"test-object-key\", \"size\": 744, \"etag\": \"2ba6b152f13c75a9155b87b5b072963c\", \"version-id\": \"uoW5awQhqux4rhi4Nuh6il967FzQlsHJ\", \"sequencer\": \"0066792EC46EC0B037\"},\"request-id\": \"Q93BVWXD5G6FKWC2\",\"requester\": \"516635408544\",\"source-ip-address\": \"10.0.27.95\",\"reason\": \"PutObject\" }}") + hash := sha256.Sum256(body) + id, _ := uuid.FromBytes(hash[:16]) + messageID := id.String() + receipt := "receipt-" + messageID + bodyStr := string(body) + + return types.Message{ + Body: &bodyStr, + MessageId: &messageID, + ReceiptHandle: &receipt, + }, nil } func newS3Event(key string) s3EventV2 {