Skip to content

Commit

Permalink
feat(aws-kinesisstreams-lambda): Add ability to bring-your-own Kinesi…
Browse files Browse the repository at this point in the history
…s Stream to this pattern (awslabs#60)

BREAKING CHANGE: DefaultKinesisEventSourceProps was removed from lambda-event-source-mapping-defaults and replaced with KinesisEventSourceProps. This now follows the same behavior as the aws-dynamodb-stream-lambda and aws-s3-lambda patterns (which use DynamoEventSourceProps and S3EventSourceProps instead of EventSourceMappingOptions)

Co-authored-by: Daniel Pinheiro <dspin@amazon.com>
  • Loading branch information
dscpinheiro and dscpinheiro committed Sep 11, 2020
1 parent 08dd62e commit f5d8fe0
Show file tree
Hide file tree
Showing 11 changed files with 669 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ _Parameters_
|existingLambdaObj?|[`lambda.Function`](https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-lambda.Function.html)|Existing instance of Lambda Function object, if this is set then the lambdaFunctionProps is ignored.|
|lambdaFunctionProps?|[`lambda.FunctionProps`](https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-lambda.FunctionProps.html)|User provided props to override the default props for the Lambda function.|
|kinesisStreamProps?|[`kinesis.StreamProps`](https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-kinesis.StreamProps.html)|Optional user-provided props to override the default props for the Kinesis stream.|
|eventSourceProps?|[`lambda.EventSourceMappingOptions`](https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-lambda.EventSourceMappingOptions.html)|Optional user-provided props to override the default props for the Lambda event source mapping.|
|existingStreamObj?|[`kinesis.Stream`](https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-kinesis.Stream.html)|Existing instance of Kinesis Stream, if this is set then kinesisStreamProps is ignored.|
|kinesisEventSourceProps?|[`aws-lambda-event-sources.KinesisEventSourceProps`](https://docs.aws.amazon.com/cdk/api/latest/docs/@aws-cdk_aws-lambda-event-sources.KinesisEventSourceProps.html)|Optional user-provided props to override the default props for the Lambda event source mapping.|

## Pattern Properties

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@

// Imports
import * as lambda from '@aws-cdk/aws-lambda';
import { KinesisEventSourceProps, KinesisEventSource } from '@aws-cdk/aws-lambda-event-sources';
import * as kinesis from '@aws-cdk/aws-kinesis';
import * as iam from '@aws-cdk/aws-iam';
import * as defaults from '@aws-solutions-constructs/core';
import { overrideProps } from '@aws-solutions-constructs/core';
import { Construct } from '@aws-cdk/core';

/**
Expand All @@ -35,6 +35,12 @@ export interface KinesisStreamsToLambdaProps {
* @default - Default props are used.
*/
readonly lambdaFunctionProps?: lambda.FunctionProps,
/**
* Existing instance of Kinesis Stream, if this is set then kinesisStreamProps is ignored.
*
* @default - None
*/
readonly existingStreamObj?: kinesis.Stream;
/**
* Optional user-provided props to override the default props for the Kinesis stream.
*
Expand All @@ -46,7 +52,7 @@ export interface KinesisStreamsToLambdaProps {
*
* @default - Default props are used.
*/
readonly eventSourceProps?: lambda.EventSourceMappingOptions | any
readonly kinesisEventSourceProps?: KinesisEventSourceProps
}

/**
Expand All @@ -61,7 +67,7 @@ export class KinesisStreamsToLambda extends Construct {
* @summary Constructs a new instance of the KinesisStreamsToLambda class.
* @param {cdk.App} scope - represents the scope for all the resources.
* @param {string} id - this is a a scope-unique id.
* @param {CloudFrontToApiGatewayProps} props - user provided props for the construct
* @param {KinesisStreamsToLambdaProps} props - user provided props for the construct
* @since 0.8.0
* @access public
*/
Expand All @@ -70,6 +76,7 @@ export class KinesisStreamsToLambda extends Construct {

// Setup the Kinesis Stream
this.kinesisStream = defaults.buildKinesisStream(this, {
existingStreamObj: props.existingStreamObj,
kinesisStreamProps: props.kinesisStreamProps
});

Expand All @@ -80,10 +87,8 @@ export class KinesisStreamsToLambda extends Construct {
});

// Add the Lambda event source mapping
const eventSourceProps = (props.eventSourceProps) ?
overrideProps(defaults.DefaultKinesisEventSourceProps(this.kinesisStream.streamArn), props.eventSourceProps) :
defaults.DefaultKinesisEventSourceProps(this.kinesisStream.streamArn);
this.lambdaFunction.addEventSourceMapping('LambdaKinesisEventSourceMapping', eventSourceProps);
const eventSourceProps = defaults.KinesisEventSourceProps(props.kinesisEventSourceProps);
this.lambdaFunction.addEventSource(new KinesisEventSource(this.kinesisStream, eventSourceProps));

// Add permissions for the Lambda function to access Kinesis
const policy = new iam.Policy(this, 'LambdaFunctionPolicy');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"@aws-cdk/aws-kinesis": "~1.62.0",
"@aws-cdk/aws-kms": "~1.62.0",
"@aws-cdk/aws-lambda": "~1.62.0",
"@aws-cdk/aws-lambda-event-sources": "~1.62.0",
"@aws-cdk/core": "~1.62.0",
"@aws-solutions-constructs/core": "~1.62.0",
"constructs": "^3.0.4"
Expand All @@ -76,6 +77,7 @@
"@aws-cdk/aws-kinesis": "~1.62.0",
"@aws-cdk/aws-kms": "~1.62.0",
"@aws-cdk/aws-lambda": "~1.62.0",
"@aws-cdk/aws-lambda-event-sources": "~1.62.0",
"@aws-cdk/core": "~1.62.0",
"@aws-solutions-constructs/core": "~1.62.0",
"constructs": "^3.0.4"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,10 @@ Object {
},
"Type": "AWS::Lambda::Function",
},
"testkinesisstreamslambdaLambdaFunctionLambdaKinesisEventSourceMapping06EA601A": Object {
"testkinesisstreamslambdaLambdaFunctionKinesisEventSourcetestkinesisstreamslambdaKinesisStreamE01CADBD221E7379": Object {
"Properties": Object {
"BatchSize": 100,
"BisectBatchOnFunctionError": true,
"EventSourceArn": Object {
"Fn::GetAtt": Array [
"testkinesisstreamslambdaKinesisStream76FFCAB1",
Expand All @@ -112,6 +114,7 @@ Object {
"FunctionName": Object {
"Ref": "testkinesisstreamslambdaLambdaFunction02E4DD2D",
},
"StartingPosition": "TRIM_HORIZON",
},
"Type": "AWS::Lambda::EventSourceMapping",
},
Expand Down Expand Up @@ -249,6 +252,16 @@ Object {
],
},
},
Object {
"Action": "kinesis:DescribeStream",
"Effect": "Allow",
"Resource": Object {
"Fn::GetAtt": Array [
"testkinesisstreamslambdaKinesisStream76FFCAB1",
"Arn",
],
},
},
],
"Version": "2012-10-17",
},
Expand All @@ -264,3 +277,222 @@ Object {
},
}
`;

exports[`Test existing resources 1`] = `
Object {
"Parameters": Object {
"AssetParametersdfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcdArtifactHashEA3A5944": Object {
"Description": "Artifact hash for asset \\"dfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcd\\"",
"Type": "String",
},
"AssetParametersdfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcdS3BucketA460830B": Object {
"Description": "S3 bucket for asset \\"dfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcd\\"",
"Type": "String",
},
"AssetParametersdfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcdS3VersionKey58FEB9E6": Object {
"Description": "S3 key for asset version \\"dfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcd\\"",
"Type": "String",
},
},
"Resources": Object {
"testfn76BCC25C": Object {
"DependsOn": Array [
"testfnServiceRoleDefaultPolicy63AA2D42",
"testfnServiceRoleC30E0817",
],
"Properties": Object {
"Code": Object {
"S3Bucket": Object {
"Ref": "AssetParametersdfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcdS3BucketA460830B",
},
"S3Key": Object {
"Fn::Join": Array [
"",
Array [
Object {
"Fn::Select": Array [
0,
Object {
"Fn::Split": Array [
"||",
Object {
"Ref": "AssetParametersdfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcdS3VersionKey58FEB9E6",
},
],
},
],
},
Object {
"Fn::Select": Array [
1,
Object {
"Fn::Split": Array [
"||",
Object {
"Ref": "AssetParametersdfe828a7d00b0da7a6e92dc1decf39ec907e4edc6006faea8631d4dabd7f4fcdS3VersionKey58FEB9E6",
},
],
},
],
},
],
],
},
},
"Handler": "index.handler",
"Role": Object {
"Fn::GetAtt": Array [
"testfnServiceRoleC30E0817",
"Arn",
],
},
"Runtime": "nodejs10.x",
},
"Type": "AWS::Lambda::Function",
},
"testfnKinesisEventSourceteststreamE93A322D": Object {
"Properties": Object {
"BatchSize": 100,
"BisectBatchOnFunctionError": true,
"EventSourceArn": Object {
"Fn::GetAtt": Array [
"teststream04374A09",
"Arn",
],
},
"FunctionName": Object {
"Ref": "testfn76BCC25C",
},
"StartingPosition": "TRIM_HORIZON",
},
"Type": "AWS::Lambda::EventSourceMapping",
},
"testfnServiceRoleC30E0817": Object {
"Properties": Object {
"AssumeRolePolicyDocument": Object {
"Statement": Array [
Object {
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": Object {
"Service": "lambda.amazonaws.com",
},
},
],
"Version": "2012-10-17",
},
"ManagedPolicyArns": Array [
Object {
"Fn::Join": Array [
"",
Array [
"arn:",
Object {
"Ref": "AWS::Partition",
},
":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
],
],
},
],
},
"Type": "AWS::IAM::Role",
},
"testfnServiceRoleDefaultPolicy63AA2D42": Object {
"Properties": Object {
"PolicyDocument": Object {
"Statement": Array [
Object {
"Action": Array [
"kinesis:DescribeStreamSummary",
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:ListShards",
"kinesis:SubscribeToShard",
],
"Effect": "Allow",
"Resource": Object {
"Fn::GetAtt": Array [
"teststream04374A09",
"Arn",
],
},
},
Object {
"Action": "kinesis:DescribeStream",
"Effect": "Allow",
"Resource": Object {
"Fn::GetAtt": Array [
"teststream04374A09",
"Arn",
],
},
},
],
"Version": "2012-10-17",
},
"PolicyName": "testfnServiceRoleDefaultPolicy63AA2D42",
"Roles": Array [
Object {
"Ref": "testfnServiceRoleC30E0817",
},
],
},
"Type": "AWS::IAM::Policy",
},
"testkinesisstreamslambdaLambdaFunctionPolicyF7EF016E": Object {
"Metadata": Object {
"cfn_nag": Object {
"rules_to_suppress": Array [
Object {
"id": "W12",
"reason": "The kinesis:ListStreams action requires a wildcard resource.",
},
],
},
},
"Properties": Object {
"PolicyDocument": Object {
"Statement": Array [
Object {
"Action": Array [
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:DescribeStream",
],
"Effect": "Allow",
"Resource": Object {
"Fn::GetAtt": Array [
"teststream04374A09",
"Arn",
],
},
},
Object {
"Action": "kinesis:ListStreams",
"Effect": "Allow",
"Resource": "*",
},
],
"Version": "2012-10-17",
},
"PolicyName": "testkinesisstreamslambdaLambdaFunctionPolicyF7EF016E",
"Roles": Array [
Object {
"Ref": "testfnServiceRoleC30E0817",
},
],
},
"Type": "AWS::IAM::Policy",
},
"teststream04374A09": Object {
"Properties": Object {
"Name": "existing-stream",
"RetentionPeriodHours": 48,
"ShardCount": 5,
},
"Type": "AWS::Kinesis::Stream",
},
},
}
`;
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@
"Arn"
]
}
},
{
"Action": "kinesis:DescribeStream",
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"testkslambdaKinesisStreamE607D575",
"Arn"
]
}
}
],
"Version": "2012-10-17"
Expand Down Expand Up @@ -186,7 +196,7 @@
}
}
},
"testkslambdaLambdaFunctionLambdaKinesisEventSourceMapping70D66039": {
"testkslambdaLambdaFunctionKinesisEventSourcetestkslambdastacktestkslambdaKinesisStream34D4E9A7E70CF520": {
"Type": "AWS::Lambda::EventSourceMapping",
"Properties": {
"EventSourceArn": {
Expand All @@ -199,6 +209,7 @@
"Ref": "testkslambdaLambdaFunction995A7276"
},
"BatchSize": 1,
"BisectBatchOnFunctionError": true,
"StartingPosition": "TRIM_HORIZON"
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ stack.templateOptions.description = 'Integration Test for aws-kinesisstreams-lam
// Definitions
const props: KinesisStreamsToLambdaProps = {
kinesisStreamProps: {},
eventSourceProps: {
kinesisEventSourceProps: {
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
batchSize: 1
},
Expand Down
Loading

0 comments on commit f5d8fe0

Please sign in to comment.