diff --git a/packages/@aws-cdk/aws-lambda-event-sources/lib/dynamodb.ts b/packages/@aws-cdk/aws-lambda-event-sources/lib/dynamodb.ts index e433042e481c9..6591838fce84b 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/lib/dynamodb.ts +++ b/packages/@aws-cdk/aws-lambda-event-sources/lib/dynamodb.ts @@ -1,29 +1,17 @@ import dynamodb = require('@aws-cdk/aws-dynamodb'); import lambda = require('@aws-cdk/aws-lambda'); +import {StreamEventSource, StreamEventSourceProps} from './stream'; -export interface DynamoEventSourceProps { - /** - * The largest number of records that AWS Lambda will retrieve from your event - * source at the time of invoking your function. Your function receives an - * event with all the retrieved records. - * - * Valid Range: Minimum value of 1. Maximum value of 1000. - * - * @default 100 - */ - readonly batchSize?: number; - - /** - * Where to begin consuming the DynamoDB stream. - */ - readonly startingPosition: lambda.StartingPosition; +export interface DynamoEventSourceProps extends StreamEventSourceProps { } /** * Use an Amazon DynamoDB stream as an event source for AWS Lambda. */ -export class DynamoEventSource implements lambda.IEventSource { - constructor(private readonly table: dynamodb.Table, private readonly props: DynamoEventSourceProps) { +export class DynamoEventSource extends StreamEventSource { + constructor(private readonly table: dynamodb.Table, props: DynamoEventSourceProps) { + super(props); + if (this.props.batchSize !== undefined && (this.props.batchSize < 1 || this.props.batchSize > 1000)) { throw new Error(`Maximum batch size must be between 1 and 1000 inclusive (given ${this.props.batchSize})`); } @@ -34,11 +22,9 @@ export class DynamoEventSource implements lambda.IEventSource { throw new Error(`DynamoDB Streams must be enabled on the table ${this.table.node.path}`); } - target.addEventSourceMapping(`DynamoDBEventSource:${this.table.node.uniqueId}`, { - batchSize: this.props.batchSize || 100, - eventSourceArn: this.table.tableStreamArn, - startingPosition: this.props.startingPosition - }); + target.addEventSourceMapping(`DynamoDBEventSource:${this.table.node.uniqueId}`, + this.enrichMappingOptions({eventSourceArn: this.table.tableStreamArn}) + ); this.table.grantStreamRead(target); dynamodb.Table.grantListStreams(target); diff --git a/packages/@aws-cdk/aws-lambda-event-sources/lib/kinesis.ts b/packages/@aws-cdk/aws-lambda-event-sources/lib/kinesis.ts index 93102e456432e..2095b2cdfacee 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/lib/kinesis.ts +++ b/packages/@aws-cdk/aws-lambda-event-sources/lib/kinesis.ts @@ -1,41 +1,27 @@ import kinesis = require('@aws-cdk/aws-kinesis'); import lambda = require('@aws-cdk/aws-lambda'); +import {StreamEventSource, StreamEventSourceProps} from './stream'; -export interface KinesisEventSourceProps { - /** - * The largest number of records that AWS Lambda will retrieve from your event - * source at the time of invoking your function. Your function receives an - * event with all the retrieved records. - * - * Valid Range: Minimum value of 1. Maximum value of 10000. - * - * @default 100 - */ - readonly batchSize?: number; - - /** - * Where to begin consuming the Kinesis stream. - */ - readonly startingPosition: lambda.StartingPosition; +export interface KinesisEventSourceProps extends StreamEventSourceProps { } /** * Use an Amazon Kinesis stream as an event source for AWS Lambda. */ -export class KinesisEventSource implements lambda.IEventSource { - constructor(readonly stream: kinesis.IStream, private readonly props: KinesisEventSourceProps) { +export class KinesisEventSource extends StreamEventSource { + constructor(readonly stream: kinesis.IStream, props: KinesisEventSourceProps) { + super(props); + if (this.props.batchSize !== undefined && (this.props.batchSize < 1 || this.props.batchSize > 10000)) { throw new Error(`Maximum batch size must be between 1 and 10000 inclusive (given ${this.props.batchSize})`); } } public bind(target: lambda.IFunction) { - target.addEventSourceMapping(`KinesisEventSource:${this.stream.node.uniqueId}`, { - batchSize: this.props.batchSize || 100, - startingPosition: this.props.startingPosition, - eventSourceArn: this.stream.streamArn, - }); + target.addEventSourceMapping(`KinesisEventSource:${this.stream.node.uniqueId}`, + this.enrichMappingOptions({eventSourceArn: this.stream.streamArn}) + ); this.stream.grantRead(target); } -} \ No newline at end of file +} diff --git a/packages/@aws-cdk/aws-lambda-event-sources/lib/stream.ts b/packages/@aws-cdk/aws-lambda-event-sources/lib/stream.ts new file mode 100644 index 0000000000000..02f9e4a657b01 --- /dev/null +++ b/packages/@aws-cdk/aws-lambda-event-sources/lib/stream.ts @@ -0,0 +1,52 @@ +import lambda = require('@aws-cdk/aws-lambda'); +import {Duration} from '@aws-cdk/core'; + +/** + * @internal + */ +export interface StreamEventSourceProps { + /** + * The largest number of records that AWS Lambda will retrieve from your event + * source at the time of invoking your function. Your function receives an + * event with all the retrieved records. + * + * Valid Range: Minimum value of 1. Maximum value of 10000. + * + * @default 100 + */ + readonly batchSize?: number; + + /** + * Where to begin consuming the stream. + */ + readonly startingPosition: lambda.StartingPosition; + + /** + * The maximum amount of time to gather records before invoking the function. + * Maximum of Duration.minutes(5) + * + * @default Duration.seconds(0) + */ + readonly maxBatchingWindow?: Duration; +} + +/** + * Use an stream as an event source for AWS Lambda. + * + * @internal + */ +export abstract class StreamEventSource implements lambda.IEventSource { + protected constructor(protected readonly props: StreamEventSourceProps) { + } + + public abstract bind(_target: lambda.IFunction): void; + + protected enrichMappingOptions(options: lambda.EventSourceMappingOptions): lambda.EventSourceMappingOptions { + return { + ...options, + batchSize: this.props.batchSize || 100, + startingPosition: this.props.startingPosition, + maxBatchingWindow: this.props.maxBatchingWindow, + }; + } +} diff --git a/packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts b/packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts index e4372a283af65..ce9fc9813ca29 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts +++ b/packages/@aws-cdk/aws-lambda-event-sources/test/test.dynamo.ts @@ -172,4 +172,63 @@ export = { test.done(); }, + + 'specific maxBatchingWindow'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const table = new dynamodb.Table(stack, 'T', { + partitionKey: { + name: 'id', + type: dynamodb.AttributeType.STRING + }, + stream: dynamodb.StreamViewType.NEW_IMAGE + }); + + // WHEN + fn.addEventSource(new sources.DynamoEventSource(table, { + maxBatchingWindow: cdk.Duration.minutes(2), + startingPosition: lambda.StartingPosition.LATEST + })); + + // THEN + expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', { + "EventSourceArn": { + "Fn::GetAtt": [ + "TD925BC7E", + "StreamArn" + ] + }, + "FunctionName": { + "Ref": "Fn9270CBC0" + }, + "MaximumBatchingWindowInSeconds": 120, + "StartingPosition": "LATEST" + })); + + test.done(); + }, + + 'throws if maxBatchingWindow > 300 seconds'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const table = new dynamodb.Table(stack, 'T', { + partitionKey: { + name: 'id', + type: dynamodb.AttributeType.STRING + }, + stream: dynamodb.StreamViewType.NEW_IMAGE + }); + + // THEN + test.throws(() => + fn.addEventSource(new sources.DynamoEventSource(table, { + maxBatchingWindow: cdk.Duration.seconds(301), + startingPosition: lambda.StartingPosition.LATEST + })), /maxBatchingWindow cannot be over 300 seconds/); + + test.done(); + }, + }; diff --git a/packages/@aws-cdk/aws-lambda-event-sources/test/test.kinesis.ts b/packages/@aws-cdk/aws-lambda-event-sources/test/test.kinesis.ts index 93050af864f3d..4261ce568e92a 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/test/test.kinesis.ts +++ b/packages/@aws-cdk/aws-lambda-event-sources/test/test.kinesis.ts @@ -123,4 +123,34 @@ export = { test.done(); }, + + 'specific maxBatchingWindow'(test: Test) { + // GIVEN + const stack = new cdk.Stack(); + const fn = new TestFunction(stack, 'Fn'); + const stream = new kinesis.Stream(stack, 'S'); + + // WHEN + fn.addEventSource(new sources.KinesisEventSource(stream, { + maxBatchingWindow: cdk.Duration.minutes(2), + startingPosition: lambda.StartingPosition.LATEST + })); + + // THEN + expect(stack).to(haveResource('AWS::Lambda::EventSourceMapping', { + "EventSourceArn": { + "Fn::GetAtt": [ + "S509448A1", + "Arn" + ] + }, + "FunctionName": { + "Ref": "Fn9270CBC0" + }, + "MaximumBatchingWindowInSeconds": 120, + "StartingPosition": "LATEST" + })); + + test.done(); + }, }; diff --git a/packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts b/packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts index 83b126326cd9e..299ce2e91cdfb 100644 --- a/packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts +++ b/packages/@aws-cdk/aws-lambda/lib/event-source-mapping.ts @@ -37,7 +37,15 @@ export interface EventSourceMappingOptions { * * @default - Required for Amazon Kinesis and Amazon DynamoDB Streams sources. */ - readonly startingPosition?: StartingPosition + readonly startingPosition?: StartingPosition; + + /** + * The maximum amount of time to gather records before invoking the function. + * Maximum of Duration.minutes(5) + * + * @default Duration.seconds(0) + */ + readonly maxBatchingWindow?: cdk.Duration; } export interface EventSourceMappingProps extends EventSourceMappingOptions { @@ -63,12 +71,17 @@ export class EventSourceMapping extends Resource { constructor(scope: cdk.Construct, id: string, props: EventSourceMappingProps) { super(scope, id); + if (props.maxBatchingWindow && props.maxBatchingWindow.toSeconds() > 300) { + throw new Error(`maxBatchingWindow cannot be over 300 seconds, got ${props.maxBatchingWindow.toSeconds()}`); + } + new CfnEventSourceMapping(this, 'Resource', { batchSize: props.batchSize, enabled: props.enabled, eventSourceArn: props.eventSourceArn, functionName: props.target.functionName, startingPosition: props.startingPosition, + maximumBatchingWindowInSeconds: props.maxBatchingWindow && props.maxBatchingWindow.toSeconds(), }); } }