From 788b463589ad68a674d45d29df9e8c515371df52 Mon Sep 17 00:00:00 2001 From: Hong Chen Date: Tue, 10 Sep 2024 15:31:19 +0800 Subject: [PATCH] [exporter/awskinesisexporter] Add stream ARN parameter support (#33891) Resolves #33891 Signed-off-by: Hong Chen --- .../awskinesisexporter-cross-account.yaml | 27 +++++++++++++++++++ exporter/awskinesisexporter/config.go | 1 + exporter/awskinesisexporter/config_test.go | 1 + exporter/awskinesisexporter/exporter.go | 1 + exporter/awskinesisexporter/exporter_test.go | 2 ++ .../internal/producer/batcher.go | 15 +++++++++-- .../internal/producer/batcher_bench_test.go | 2 +- .../internal/producer/batcher_test.go | 26 +++++++++++++++--- .../awskinesisexporter/testdata/config.yaml | 1 + 9 files changed, 70 insertions(+), 6 deletions(-) create mode 100644 .chloggen/awskinesisexporter-cross-account.yaml diff --git a/.chloggen/awskinesisexporter-cross-account.yaml b/.chloggen/awskinesisexporter-cross-account.yaml new file mode 100644 index 000000000000..e5a64c57ef60 --- /dev/null +++ b/.chloggen/awskinesisexporter-cross-account.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awskinesisexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add stream ARN parameter for the AWS Kinesis Data Stream exporter. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33891] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/awskinesisexporter/config.go b/exporter/awskinesisexporter/config.go index 6fa643d29a93..1086a9b97591 100644 --- a/exporter/awskinesisexporter/config.go +++ b/exporter/awskinesisexporter/config.go @@ -14,6 +14,7 @@ import ( // AWSConfig contains AWS specific configuration such as awskinesis stream, region, etc. type AWSConfig struct { StreamName string `mapstructure:"stream_name"` + StreamARN string `mapstructure:"stream_arn"` KinesisEndpoint string `mapstructure:"kinesis_endpoint"` Region string `mapstructure:"region"` Role string `mapstructure:"role"` diff --git a/exporter/awskinesisexporter/config_test.go b/exporter/awskinesisexporter/config_test.go index 344fa304046e..30ddfffa1dc2 100644 --- a/exporter/awskinesisexporter/config_test.go +++ b/exporter/awskinesisexporter/config_test.go @@ -67,6 +67,7 @@ func TestLoadConfig(t *testing.T) { }, AWS: AWSConfig{ StreamName: "test-stream", + StreamARN: "arn:aws:kinesis:mars-1:123456789012:stream/test-stream", KinesisEndpoint: "awskinesis.mars-1.aws.galactic", Region: "mars-1", Role: "arn:test-role", diff --git a/exporter/awskinesisexporter/exporter.go b/exporter/awskinesisexporter/exporter.go index acf83d831176..ba9705115d30 100644 --- a/exporter/awskinesisexporter/exporter.go +++ b/exporter/awskinesisexporter/exporter.go @@ -83,6 +83,7 @@ func createExporter(ctx context.Context, c component.Config, log *zap.Logger, op producer, err := producer.NewBatcher( options.NewKinesisClient(awsconf, kinesisOpts...), conf.AWS.StreamName, + conf.AWS.StreamARN, producer.WithLogger(log), ) if err != nil { diff --git a/exporter/awskinesisexporter/exporter_test.go b/exporter/awskinesisexporter/exporter_test.go index ffab5b0c58c3..d42b8ac79ad9 100644 --- a/exporter/awskinesisexporter/exporter_test.go +++ b/exporter/awskinesisexporter/exporter_test.go @@ -32,6 +32,7 @@ func TestCreatingExporter(t *testing.T) { name: "Default configuration", conf: applyConfigChanges(func(conf *Config) { conf.AWS.StreamName = "example-test" + conf.AWS.StreamARN = "arn:aws:kinesis:us-west-2:123456789012:stream/example-test" }), validateNew: func(tb testing.TB) func(conf aws.Config, opts ...func(*kinesis.Options)) *kinesis.Client { return func(conf aws.Config, opts ...func(*kinesis.Options)) *kinesis.Client { @@ -45,6 +46,7 @@ func TestCreatingExporter(t *testing.T) { name: "Apply different region", conf: applyConfigChanges(func(conf *Config) { conf.AWS.StreamName = "example-test" + conf.AWS.StreamName = "arn:aws:kinesis:us-east-1:123456789012:stream/example-test" conf.AWS.Region = "us-east-1" conf.AWS.Role = "example-role" }), diff --git a/exporter/awskinesisexporter/internal/producer/batcher.go b/exporter/awskinesisexporter/internal/producer/batcher.go index 2b68e725608b..2e04f13058c8 100644 --- a/exporter/awskinesisexporter/internal/producer/batcher.go +++ b/exporter/awskinesisexporter/internal/producer/batcher.go @@ -18,6 +18,7 @@ import ( type batcher struct { stream *string + arn *string client Kinesis log *zap.Logger @@ -32,12 +33,20 @@ var ( permanentErrInvalidArgument = new(*types.InvalidArgumentException) ) -func NewBatcher(kinesisAPI Kinesis, stream string, opts ...BatcherOptions) (Batcher, error) { +func NewBatcher(kinesisAPI Kinesis, stream string, arn string, opts ...BatcherOptions) (Batcher, error) { be := &batcher{ - stream: aws.String(stream), client: kinesisAPI, log: zap.NewNop(), } + + if stream != "" { + be.stream = aws.String(stream) + } + + if arn != "" { + be.arn = aws.String(arn) + } + for _, opt := range opts { if err := opt(be); err != nil { return nil, err @@ -50,6 +59,7 @@ func (b *batcher) Put(ctx context.Context, bt *batch.Batch) error { for _, records := range bt.Chunk() { out, err := b.client.PutRecords(ctx, &kinesis.PutRecordsInput{ StreamName: b.stream, + StreamARN: b.arn, Records: records, }) @@ -75,6 +85,7 @@ func (b *batcher) Put(ctx context.Context, bt *batch.Batch) error { func (b *batcher) Ready(ctx context.Context) error { _, err := b.client.DescribeStream(ctx, &kinesis.DescribeStreamInput{ StreamName: b.stream, + StreamARN: b.arn, }) return err } diff --git a/exporter/awskinesisexporter/internal/producer/batcher_bench_test.go b/exporter/awskinesisexporter/internal/producer/batcher_bench_test.go index dc5d7de64455..a3932788df87 100644 --- a/exporter/awskinesisexporter/internal/producer/batcher_bench_test.go +++ b/exporter/awskinesisexporter/internal/producer/batcher_bench_test.go @@ -17,7 +17,7 @@ import ( func benchXEmptyMessages(b *testing.B, msgCount int) { producer, err := producer.NewBatcher(SetPutRecordsOperation(SuccessfulPutRecordsOperation), "benchmark-stream", - producer.WithLogger(zaptest.NewLogger(b)), + "arn:aws:kinesis:mars-1:123456789012:stream/benchmark-stream", producer.WithLogger(zaptest.NewLogger(b)), ) require.NoError(b, err, "Must have a valid producer") diff --git a/exporter/awskinesisexporter/internal/producer/batcher_test.go b/exporter/awskinesisexporter/internal/producer/batcher_test.go index e2c1de74bb67..5da2536750a7 100644 --- a/exporter/awskinesisexporter/internal/producer/batcher_test.go +++ b/exporter/awskinesisexporter/internal/producer/batcher_test.go @@ -68,13 +68,32 @@ func TestBatchedExporter(t *testing.T) { cases := []struct { name string + arn string PutRecordsOP func(*kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error) shouldErr bool isPermanent bool }{ - {name: "Successful put to kinesis", PutRecordsOP: SuccessfulPutRecordsOperation, shouldErr: false, isPermanent: false}, - {name: "Invalid kinesis configuration", PutRecordsOP: HardFailedPutRecordsOperation, shouldErr: true, isPermanent: true}, - {name: "Test throttled kinesis operation", PutRecordsOP: TransiantPutRecordsOperation(2), shouldErr: true, isPermanent: false}, + { + name: "Successful put to kinesis", + arn: "arn:aws:kinesis:mars-1:123456789012:stream/test-stream", + PutRecordsOP: SuccessfulPutRecordsOperation, + shouldErr: false, + isPermanent: false, + }, + { + name: "Invalid kinesis configuration", + arn: "arn:aws:kinesis:mars-1:123456789012:stream/test-stream", + PutRecordsOP: HardFailedPutRecordsOperation, + shouldErr: true, + isPermanent: true, + }, + { + name: "Test throttled kinesis operation", + arn: "arn:aws:kinesis:mars-1:123456789012:stream/test-stream", + PutRecordsOP: TransiantPutRecordsOperation(2), + shouldErr: true, + isPermanent: false, + }, } bt := batch.New() @@ -88,6 +107,7 @@ func TestBatchedExporter(t *testing.T) { be, err := producer.NewBatcher( SetPutRecordsOperation(tc.PutRecordsOP), tc.name, + tc.arn, producer.WithLogger(zaptest.NewLogger(t)), ) require.NoError(t, err, "Must not error when creating BatchedExporter") diff --git a/exporter/awskinesisexporter/testdata/config.yaml b/exporter/awskinesisexporter/testdata/config.yaml index 137fedc9a164..3cc01034f0e7 100644 --- a/exporter/awskinesisexporter/testdata/config.yaml +++ b/exporter/awskinesisexporter/testdata/config.yaml @@ -4,6 +4,7 @@ awskinesis: max_record_size: 1000 aws: stream_name: test-stream + stream_arn: arn:aws:kinesis:mars-1:123456789012:stream/test-stream region: mars-1 role: arn:test-role kinesis_endpoint: awskinesis.mars-1.aws.galactic