diff --git a/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/README.md b/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/README.md index 9575cdced..eafaf0b33 100644 --- a/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/README.md +++ b/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/README.md @@ -62,7 +62,8 @@ _Parameters_ |existingLambdaObj?|[`lambda.Function`](https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-lambda.Function.html)|Existing instance of Lambda Function object, if this is set then the lambdaFunctionProps is ignored.| |lambdaFunctionProps?|[`lambda.FunctionProps`](https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-lambda.FunctionProps.html)|User provided props to override the default props for the Lambda function.| |kinesisStreamProps?|[`kinesis.StreamProps`](https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-kinesis.StreamProps.html)|Optional user-provided props to override the default props for the Kinesis stream.| -|eventSourceProps?|[`lambda.EventSourceMappingOptions`](https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-lambda.EventSourceMappingOptions.html)|Optional user-provided props to override the default props for the Lambda event source mapping.| +|existingStreamObj?|[`kinesis.Stream`](https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-kinesis.Stream.html)|Existing instance of Kinesis Stream, if this is set then kinesisStreamProps is ignored.| +|kinesisEventSourceProps?|[`aws-lambda-event-sources.KinesisEventSourceProps`](https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-lambda-event-sources.KinesisEventSourceProps.html)|Optional user-provided props to override the default props for the Lambda event source mapping.| ## Pattern Properties diff --git a/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/lib/index.ts b/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/lib/index.ts index beccf39ef..d18c17cfe 100644 --- a/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/lib/index.ts +++ b/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/lib/index.ts @@ -13,10 +13,10 @@ // Imports import * as lambda from '@aws-cdk/aws-lambda'; +import { KinesisEventSourceProps, KinesisEventSource } from '@aws-cdk/aws-lambda-event-sources'; import * as kinesis from '@aws-cdk/aws-kinesis'; import * as iam from '@aws-cdk/aws-iam'; import * as defaults from '@aws-solutions-constructs/core'; -import { overrideProps } from '@aws-solutions-constructs/core'; import { Construct } from '@aws-cdk/core'; /** @@ -35,6 +35,12 @@ export interface KinesisStreamsToLambdaProps { * @default - Default props are used. */ readonly lambdaFunctionProps?: lambda.FunctionProps, + /** + * Existing instance of Kinesis Stream, if this is set then kinesisStreamProps is ignored. + * + * @default - None + */ + readonly existingStreamObj?: kinesis.Stream; /** * Optional user-provided props to override the default props for the Kinesis stream. * @@ -46,7 +52,7 @@ export interface KinesisStreamsToLambdaProps { * * @default - Default props are used. */ - readonly eventSourceProps?: lambda.EventSourceMappingOptions | any + readonly kinesisEventSourceProps?: KinesisEventSourceProps } /** @@ -61,7 +67,7 @@ export class KinesisStreamsToLambda extends Construct { * @summary Constructs a new instance of the KinesisStreamsToLambda class. * @param {cdk.App} scope - represents the scope for all the resources. * @param {string} id - this is a a scope-unique id. - * @param {CloudFrontToApiGatewayProps} props - user provided props for the construct + * @param {KinesisStreamsToLambdaProps} props - user provided props for the construct * @since 0.8.0 * @access public */ @@ -70,6 +76,7 @@ export class KinesisStreamsToLambda extends Construct { // Setup the Kinesis Stream this.kinesisStream = defaults.buildKinesisStream(this, { + existingStreamObj: props.existingStreamObj, kinesisStreamProps: props.kinesisStreamProps }); @@ -80,10 +87,8 @@ export class KinesisStreamsToLambda extends Construct { }); // Add the Lambda event source mapping - const eventSourceProps = (props.eventSourceProps) ? - overrideProps(defaults.DefaultKinesisEventSourceProps(this.kinesisStream.streamArn), props.eventSourceProps) : - defaults.DefaultKinesisEventSourceProps(this.kinesisStream.streamArn); - this.lambdaFunction.addEventSourceMapping('LambdaKinesisEventSourceMapping', eventSourceProps); + const eventSourceProps = defaults.KinesisEventSourceProps(props.kinesisEventSourceProps); + this.lambdaFunction.addEventSource(new KinesisEventSource(this.kinesisStream, eventSourceProps)); // Add permissions for the Lambda function to access Kinesis const policy = new iam.Policy(this, 'LambdaFunctionPolicy'); diff --git a/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/package.json b/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/package.json index b27dc61cc..e4ddc13da 100644 --- a/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/package.json +++ b/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/package.json @@ -57,6 +57,7 @@ "@aws-cdk/aws-kinesis": "~1.62.0", "@aws-cdk/aws-kms": "~1.62.0", "@aws-cdk/aws-lambda": "~1.62.0", + "@aws-cdk/aws-lambda-event-sources": "~1.62.0", "@aws-cdk/core": "~1.62.0", "@aws-solutions-constructs/core": "~1.62.0", "constructs": "^3.0.4" @@ -76,6 +77,7 @@ "@aws-cdk/aws-kinesis": "~1.62.0", "@aws-cdk/aws-kms": "~1.62.0", "@aws-cdk/aws-lambda": "~1.62.0", + "@aws-cdk/aws-lambda-event-sources": "~1.62.0", "@aws-cdk/core": "~1.62.0", "@aws-solutions-constructs/core": "~1.62.0", "constructs": "^3.0.4" diff --git a/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/test/__snapshots__/test.kinesisstreams-lambda.test.js.snap b/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/test/__snapshots__/test.kinesisstreams-lambda.test.js.snap index a223bf081..3a71e7bd8 100644 --- a/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/test/__snapshots__/test.kinesisstreams-lambda.test.js.snap +++ b/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/test/__snapshots__/test.kinesisstreams-lambda.test.js.snap @@ -101,8 +101,10 @@ Object { }, "Type": "AWS::Lambda::Function", }, - "testkinesisstreamslambdaLambdaFunctionLambdaKinesisEventSourceMapping06EA601A": Object { + "testkinesisstreamslambdaLambdaFunctionKinesisEventSourcetestkinesisstreamslambdaKinesisStreamE01CADBD221E7379": Object { "Properties": Object { + "BatchSize": 100, + "BisectBatchOnFunctionError": true, "EventSourceArn": Object { "Fn::GetAtt": Array [ "testkinesisstreamslambdaKinesisStream76FFCAB1", @@ -112,6 +114,7 @@ Object { "FunctionName": Object { "Ref": "testkinesisstreamslambdaLambdaFunction02E4DD2D", }, + "StartingPosition": "TRIM_HORIZON", }, "Type": "AWS::Lambda::EventSourceMapping", }, @@ -249,6 +252,16 @@ Object { ], }, }, + Object { + "Action": "kinesis:DescribeStream", + "Effect": "Allow", + "Resource": Object { + "Fn::GetAtt": Array [ + "testkinesisstreamslambdaKinesisStream76FFCAB1", + "Arn", + ], + }, + }, ], "Version": "2012-10-17", }, @@ -264,3 +277,222 @@ Object { }, } `; + +exports[`Test existing resources 1`] = ` +Object { + "Parameters": Object { + "AssetParametersdfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcdArtifactHashEA3A5944": Object { + "Description": "Artifact hash for asset \\"dfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcd\\"", + "Type": "String", + }, + "AssetParametersdfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcdS3BucketA460830B": Object { + "Description": "S3 bucket for asset \\"dfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcd\\"", + "Type": "String", + }, + "AssetParametersdfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcdS3VersionKey58FEB9E6": Object { + "Description": "S3 key for asset version \\"dfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcd\\"", + "Type": "String", + }, + }, + "Resources": Object { + "testfn76BCC25C": Object { + "DependsOn": Array [ + "testfnServiceRoleDefaultPolicy63AA2D42", + "testfnServiceRoleC30E0817", + ], + "Properties": Object { + "Code": Object { + "S3Bucket": Object { + "Ref": "AssetParametersdfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcdS3BucketA460830B", + }, + "S3Key": Object { + "Fn::Join": Array [ + "", + Array [ + Object { + "Fn::Select": Array [ + 0, + Object { + "Fn::Split": Array [ + "||", + Object { + "Ref": "AssetParametersdfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcdS3VersionKey58FEB9E6", + }, + ], + }, + ], + }, + Object { + "Fn::Select": Array [ + 1, + Object { + "Fn::Split": Array [ + "||", + Object { + "Ref": "AssetParametersdfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcdS3VersionKey58FEB9E6", + }, + ], + }, + ], + }, + ], + ], + }, + }, + "Handler": "index.handler", + "Role": Object { + "Fn::GetAtt": Array [ + "testfnServiceRoleC30E0817", + "Arn", + ], + }, + "Runtime": "nodejs10.x", + }, + "Type": "AWS::Lambda::Function", + }, + "testfnKinesisEventSourceteststreamE93A322D": Object { + "Properties": Object { + "BatchSize": 100, + "BisectBatchOnFunctionError": true, + "EventSourceArn": Object { + "Fn::GetAtt": Array [ + "teststream04374A09", + "Arn", + ], + }, + "FunctionName": Object { + "Ref": "testfn76BCC25C", + }, + "StartingPosition": "TRIM_HORIZON", + }, + "Type": "AWS::Lambda::EventSourceMapping", + }, + "testfnServiceRoleC30E0817": Object { + "Properties": Object { + "AssumeRolePolicyDocument": Object { + "Statement": Array [ + Object { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": Object { + "Service": "lambda.amazonaws.com", + }, + }, + ], + "Version": "2012-10-17", + }, + "ManagedPolicyArns": Array [ + Object { + "Fn::Join": Array [ + "", + Array [ + "arn:", + Object { + "Ref": "AWS::Partition", + }, + ":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole", + ], + ], + }, + ], + }, + "Type": "AWS::IAM::Role", + }, + "testfnServiceRoleDefaultPolicy63AA2D42": Object { + "Properties": Object { + "PolicyDocument": Object { + "Statement": Array [ + Object { + "Action": Array [ + "kinesis:DescribeStreamSummary", + "kinesis:GetRecords", + "kinesis:GetShardIterator", + "kinesis:ListShards", + "kinesis:SubscribeToShard", + ], + "Effect": "Allow", + "Resource": Object { + "Fn::GetAtt": Array [ + "teststream04374A09", + "Arn", + ], + }, + }, + Object { + "Action": "kinesis:DescribeStream", + "Effect": "Allow", + "Resource": Object { + "Fn::GetAtt": Array [ + "teststream04374A09", + "Arn", + ], + }, + }, + ], + "Version": "2012-10-17", + }, + "PolicyName": "testfnServiceRoleDefaultPolicy63AA2D42", + "Roles": Array [ + Object { + "Ref": "testfnServiceRoleC30E0817", + }, + ], + }, + "Type": "AWS::IAM::Policy", + }, + "testkinesisstreamslambdaLambdaFunctionPolicyF7EF016E": Object { + "Metadata": Object { + "cfn_nag": Object { + "rules_to_suppress": Array [ + Object { + "id": "W12", + "reason": "The kinesis:ListStreams action requires a wildcard resource.", + }, + ], + }, + }, + "Properties": Object { + "PolicyDocument": Object { + "Statement": Array [ + Object { + "Action": Array [ + "kinesis:GetRecords", + "kinesis:GetShardIterator", + "kinesis:DescribeStream", + ], + "Effect": "Allow", + "Resource": Object { + "Fn::GetAtt": Array [ + "teststream04374A09", + "Arn", + ], + }, + }, + Object { + "Action": "kinesis:ListStreams", + "Effect": "Allow", + "Resource": "*", + }, + ], + "Version": "2012-10-17", + }, + "PolicyName": "testkinesisstreamslambdaLambdaFunctionPolicyF7EF016E", + "Roles": Array [ + Object { + "Ref": "testfnServiceRoleC30E0817", + }, + ], + }, + "Type": "AWS::IAM::Policy", + }, + "teststream04374A09": Object { + "Properties": Object { + "Name": "existing-stream", + "RetentionPeriodHours": 48, + "ShardCount": 5, + }, + "Type": "AWS::Kinesis::Stream", + }, + }, +} +`; diff --git a/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/test/integ.deployFunction.expected.json b/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/test/integ.deployFunction.expected.json index 178b598ef..744aee135 100644 --- a/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/test/integ.deployFunction.expected.json +++ b/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/test/integ.deployFunction.expected.json @@ -91,6 +91,16 @@ "Arn" ] } + }, + { + "Action": "kinesis:DescribeStream", + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "testkslambdaKinesisStreamE607D575", + "Arn" + ] + } } ], "Version": "2012-10-17" @@ -186,7 +196,7 @@ } } }, - "testkslambdaLambdaFunctionLambdaKinesisEventSourceMapping70D66039": { + "testkslambdaLambdaFunctionKinesisEventSourcetestkslambdastacktestkslambdaKinesisStream34D4E9A7E70CF520": { "Type": "AWS::Lambda::EventSourceMapping", "Properties": { "EventSourceArn": { @@ -199,6 +209,7 @@ "Ref": "testkslambdaLambdaFunction995A7276" }, "BatchSize": 1, + "BisectBatchOnFunctionError": true, "StartingPosition": "TRIM_HORIZON" } }, diff --git a/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/test/integ.deployFunction.ts b/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/test/integ.deployFunction.ts index 42d1d3b72..2fde501d4 100644 --- a/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/test/integ.deployFunction.ts +++ b/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/test/integ.deployFunction.ts @@ -24,7 +24,7 @@ stack.templateOptions.description = 'Integration Test for aws-kinesisstreams-lam // Definitions const props: KinesisStreamsToLambdaProps = { kinesisStreamProps: {}, - eventSourceProps: { + kinesisEventSourceProps: { startingPosition: lambda.StartingPosition.TRIM_HORIZON, batchSize: 1 }, diff --git a/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/test/integ.existing.expected.json b/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/test/integ.existing.expected.json new file mode 100644 index 000000000..a44447f8f --- /dev/null +++ b/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/test/integ.existing.expected.json @@ -0,0 +1,240 @@ +{ + "Description": "Integration Test for aws-kinesisstreams-lambda", + "Resources": { + "testroleB50A37BE": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "Service": "lambda.amazonaws.com" + } + } + ], + "Version": "2012-10-17" + }, + "Policies": [ + { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "logs:CreateLogGroup", + "logs:CreateLogStream", + "logs:PutLogEvents" + ], + "Effect": "Allow", + "Resource": { + "Fn::Join": [ + "", + [ + "arn:aws:logs:", + { + "Ref": "AWS::Region" + }, + ":", + { + "Ref": "AWS::AccountId" + }, + ":log-group:/aws/lambda/*" + ] + ] + } + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "LambdaFunctionServiceRolePolicy" + } + ] + } + }, + "testroleDefaultPolicy884631E2": { + "Type": "AWS::IAM::Policy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "kinesis:DescribeStreamSummary", + "kinesis:GetRecords", + "kinesis:GetShardIterator", + "kinesis:ListShards", + "kinesis:SubscribeToShard" + ], + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "teststream04374A09", + "Arn" + ] + } + }, + { + "Action": "kinesis:DescribeStream", + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "teststream04374A09", + "Arn" + ] + } + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "testroleDefaultPolicy884631E2", + "Roles": [ + { + "Ref": "testroleB50A37BE" + } + ] + } + }, + "testfn76BCC25C": { + "Type": "AWS::Lambda::Function", + "Properties": { + "Code": { + "S3Bucket": { + "Ref": "AssetParametersdfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcdS3BucketA460830B" + }, + "S3Key": { + "Fn::Join": [ + "", + [ + { + "Fn::Select": [ + 0, + { + "Fn::Split": [ + "||", + { + "Ref": "AssetParametersdfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcdS3VersionKey58FEB9E6" + } + ] + } + ] + }, + { + "Fn::Select": [ + 1, + { + "Fn::Split": [ + "||", + { + "Ref": "AssetParametersdfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcdS3VersionKey58FEB9E6" + } + ] + } + ] + } + ] + ] + } + }, + "Handler": "index.handler", + "Role": { + "Fn::GetAtt": [ + "testroleB50A37BE", + "Arn" + ] + }, + "Runtime": "nodejs10.x" + }, + "DependsOn": [ + "testroleDefaultPolicy884631E2", + "testroleB50A37BE" + ] + }, + "testfnKinesisEventSourcetestkslambdastackteststream0CD251F02AE6CC2F": { + "Type": "AWS::Lambda::EventSourceMapping", + "Properties": { + "EventSourceArn": { + "Fn::GetAtt": [ + "teststream04374A09", + "Arn" + ] + }, + "FunctionName": { + "Ref": "testfn76BCC25C" + }, + "BatchSize": 1, + "BisectBatchOnFunctionError": true, + "StartingPosition": "LATEST" + } + }, + "teststream04374A09": { + "Type": "AWS::Kinesis::Stream", + "Properties": { + "ShardCount": 2, + "RetentionPeriodHours": 24, + "StreamEncryption": { + "EncryptionType": "KMS", + "KeyId": "alias/aws/kinesis" + } + } + }, + "testkslambdaLambdaFunctionPolicyDC40446F": { + "Type": "AWS::IAM::Policy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "kinesis:GetRecords", + "kinesis:GetShardIterator", + "kinesis:DescribeStream" + ], + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "teststream04374A09", + "Arn" + ] + } + }, + { + "Action": "kinesis:ListStreams", + "Effect": "Allow", + "Resource": "*" + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "testkslambdaLambdaFunctionPolicyDC40446F", + "Roles": [ + { + "Ref": "testroleB50A37BE" + } + ] + }, + "Metadata": { + "cfn_nag": { + "rules_to_suppress": [ + { + "id": "W12", + "reason": "The kinesis:ListStreams action requires a wildcard resource." + } + ] + } + } + } + }, + "Parameters": { + "AssetParametersdfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcdS3BucketA460830B": { + "Type": "String", + "Description": "S3 bucket for asset \"dfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcd\"" + }, + "AssetParametersdfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcdS3VersionKey58FEB9E6": { + "Type": "String", + "Description": "S3 key for asset version \"dfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcd\"" + }, + "AssetParametersdfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcdArtifactHashEA3A5944": { + "Type": "String", + "Description": "Artifact hash for asset \"dfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcd\"" + } + } +} \ No newline at end of file diff --git a/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/test/integ.existing.ts b/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/test/integ.existing.ts new file mode 100644 index 000000000..7908a9ee9 --- /dev/null +++ b/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/test/integ.existing.ts @@ -0,0 +1,67 @@ +/** + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance + * with the License. A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the 'license' file accompanying this file. This file is distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES + * OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +// Imports +import { KinesisStreamsToLambda, KinesisStreamsToLambdaProps } from '../lib'; +import { Stack, App, Aws } from '@aws-cdk/core'; +import * as kinesis from '@aws-cdk/aws-kinesis'; +import * as lambda from '@aws-cdk/aws-lambda'; +import * as iam from '@aws-cdk/aws-iam'; + +// Setup +const app = new App(); +const stack = new Stack(app, 'test-ks-lambda-stack'); +stack.templateOptions.description = 'Integration Test for aws-kinesisstreams-lambda'; + +const lambdaRole = new iam.Role(stack, 'test-role', { + assumedBy: new iam.ServicePrincipal('lambda.amazonaws.com'), + inlinePolicies: { + LambdaFunctionServiceRolePolicy: new iam.PolicyDocument({ + statements: [new iam.PolicyStatement({ + actions: [ + 'logs:CreateLogGroup', + 'logs:CreateLogStream', + 'logs:PutLogEvents' + ], + resources: [`arn:aws:logs:${Aws.REGION}:${Aws.ACCOUNT_ID}:log-group:/aws/lambda/*`] + })] + }) + } +}); + +const lambdaFn = new lambda.Function(stack, 'test-fn', { + runtime: lambda.Runtime.NODEJS_10_X, + handler: 'index.handler', + code: lambda.Code.asset(`${__dirname}/lambda`), + role: lambdaRole, +}); + +const stream = new kinesis.Stream(stack, 'test-stream', { + shardCount: 2, + encryption: kinesis.StreamEncryption.MANAGED +}); + +// Definitions +const props: KinesisStreamsToLambdaProps = { + existingStreamObj: stream, + existingLambdaObj: lambdaFn, + kinesisEventSourceProps: { + startingPosition: lambda.StartingPosition.LATEST, + batchSize: 1 + }, +}; + +new KinesisStreamsToLambda(stack, 'test-ks-lambda', props); + +// Synth +app.synth(); \ No newline at end of file diff --git a/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/test/test.kinesisstreams-lambda.test.ts b/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/test/test.kinesisstreams-lambda.test.ts index 889b750e1..6dd866939 100644 --- a/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/test/test.kinesisstreams-lambda.test.ts +++ b/source/patterns/@aws-solutions-constructs/aws-kinesisstreams-lambda/test/test.kinesisstreams-lambda.test.ts @@ -12,11 +12,11 @@ */ // Imports -import { Stack } from "@aws-cdk/core"; +import { Stack, Duration } from "@aws-cdk/core"; import { KinesisStreamsToLambda, KinesisStreamsToLambdaProps } from "../lib"; -import { StartingPosition } from '@aws-cdk/aws-lambda'; import { SynthUtils } from '@aws-cdk/assert'; import * as lambda from '@aws-cdk/aws-lambda'; +import * as kinesis from '@aws-cdk/aws-kinesis'; import '@aws-cdk/assert/jest'; // -------------------------------------------------------------- @@ -50,8 +50,8 @@ test('Test properties', () => { handler: 'index.handler', code: lambda.Code.asset(`${__dirname}/lambda`) }, - eventSourceProps: { - startingPosition: StartingPosition.TRIM_HORIZON, + kinesisEventSourceProps: { + startingPosition: lambda.StartingPosition.TRIM_HORIZON, batchSize: 1 } }; @@ -62,4 +62,56 @@ test('Test properties', () => { expect(app.kinesisStream !== null); // Assertion 3 expect(app.kinesisStreamRole !== null); -}); \ No newline at end of file +}); + +// -------------------------------------------------------------- +// Test existing resources +// -------------------------------------------------------------- +test('Test existing resources', () => { + // Initial Setup + const stack = new Stack(); + + const fn = new lambda.Function(stack, 'test-fn', { + runtime: lambda.Runtime.NODEJS_10_X, + handler: 'index.handler', + code: lambda.Code.asset(`${__dirname}/lambda`) + }); + + const stream = new kinesis.Stream(stack, 'test-stream', { + streamName: 'existing-stream', + shardCount: 5, + retentionPeriod: Duration.hours(48), + encryption: kinesis.StreamEncryption.UNENCRYPTED + }); + + new KinesisStreamsToLambda(stack, 'test-kinesis-streams-lambda', { + existingLambdaObj: fn, + existingStreamObj: stream, + + // These properties will be ignored as existing objects were provided + lambdaFunctionProps: { + runtime: lambda.Runtime.PYTHON_3_8, + handler: 'lambda_function.handler', + code: lambda.Code.asset(`${__dirname}/lambda`) + }, + kinesisStreamProps: { + streamName: 'other-name-stream', + shardCount: 1, + retentionPeriod: Duration.hours(24) + } + }); + + // Assertions + expect(SynthUtils.toCloudFormation(stack)).toMatchSnapshot(); + + expect(stack).toHaveResource('AWS::Kinesis::Stream', { + Name: 'existing-stream', + ShardCount: 5, + RetentionPeriodHours: 48, + }); + + expect(stack).toHaveResource('AWS::Lambda::Function', { + Handler: 'index.handler', + Runtime: 'nodejs10.x', + }); +}); diff --git a/source/patterns/@aws-solutions-constructs/core/lib/lambda-event-source-mapping-defaults.ts b/source/patterns/@aws-solutions-constructs/core/lib/lambda-event-source-mapping-defaults.ts index 1672b7c89..fac0eb436 100644 --- a/source/patterns/@aws-solutions-constructs/core/lib/lambda-event-source-mapping-defaults.ts +++ b/source/patterns/@aws-solutions-constructs/core/lib/lambda-event-source-mapping-defaults.ts @@ -13,16 +13,9 @@ import * as lambda from '@aws-cdk/aws-lambda'; import { overrideProps } from './utils'; -import { DynamoEventSourceProps, S3EventSourceProps } from '@aws-cdk/aws-lambda-event-sources'; +import { DynamoEventSourceProps, S3EventSourceProps, KinesisEventSourceProps } from '@aws-cdk/aws-lambda-event-sources'; import * as s3 from '@aws-cdk/aws-s3'; -export function DefaultKinesisEventSourceProps(_eventSourceArn: string) { - const defaultEventSourceProps: lambda.EventSourceMappingOptions = { - eventSourceArn: _eventSourceArn - }; - return defaultEventSourceProps; -} - export function DynamoEventSourceProps(_dynamoEventSourceProps?: DynamoEventSourceProps) { const defaultDynamoEventSourceProps = { @@ -47,4 +40,17 @@ export function S3EventSourceProps(_s3EventSourceProps?: S3EventSourceProps) { } else { return defaultS3EventSourceProps; } -} \ No newline at end of file +} + +export function KinesisEventSourceProps(_kinesisEventSourceProps?: KinesisEventSourceProps) { + const defaultKinesisEventSourceProps = { + startingPosition: lambda.StartingPosition.TRIM_HORIZON, + bisectBatchOnError: true + }; + + if (_kinesisEventSourceProps) { + return overrideProps(defaultKinesisEventSourceProps, _kinesisEventSourceProps, false); + } else { + return defaultKinesisEventSourceProps; + } +} diff --git a/source/patterns/@aws-solutions-constructs/core/test/lambda-event-source.test.ts b/source/patterns/@aws-solutions-constructs/core/test/lambda-event-source.test.ts index 11adf1286..9a5679d8d 100644 --- a/source/patterns/@aws-solutions-constructs/core/test/lambda-event-source.test.ts +++ b/source/patterns/@aws-solutions-constructs/core/test/lambda-event-source.test.ts @@ -12,7 +12,7 @@ */ import * as defaults from '../index'; -import { DynamoEventSourceProps } from '@aws-cdk/aws-lambda-event-sources'; +import { DynamoEventSourceProps, KinesisEventSourceProps } from '@aws-cdk/aws-lambda-event-sources'; import * as lambda from '@aws-cdk/aws-lambda'; import * as s3 from '@aws-cdk/aws-s3'; import '@aws-cdk/assert/jest'; @@ -39,14 +39,6 @@ test('test DynamoEventSourceProps override', () => { }); }); -test('test KinesisEventSourceProps', () => { - const streamArn = 'xyz'; - const props = defaults.DefaultKinesisEventSourceProps(streamArn); - expect(props).toEqual({ - eventSourceArn: "xyz" - }); -}); - test('test S3EventSourceProps w/ default props', () => { const props = defaults.S3EventSourceProps(); expect(props).toEqual({ @@ -64,4 +56,30 @@ test('test S3EventSourceProps w/ user props', () => { expect(props).toEqual({ events: ["s3:ObjectCreated:Post"] }); -}); \ No newline at end of file +}); + +test('test KinesisEventSourceProps', () => { + const props = defaults.KinesisEventSourceProps(); + + expect(props).toEqual({ + startingPosition: "TRIM_HORIZON", + bisectBatchOnError: true + }); +}); + +test('test KinesisEventSourceProps override', () => { + const myProps: KinesisEventSourceProps = { + startingPosition: lambda.StartingPosition.LATEST, + batchSize: 5, + retryAttempts: 3 + }; + + const props = defaults.KinesisEventSourceProps(myProps); + + expect(props).toEqual({ + batchSize: 5, + startingPosition: "LATEST", + bisectBatchOnError: true, + retryAttempts: 3 + }); +});