diff --git a/packages/@aws-cdk/aws-dynamodb/test/integ.dynamodb.kinesis-stream.expected.json b/packages/@aws-cdk/aws-dynamodb/test/integ.dynamodb.kinesis-stream.expected.json index 77d522466ccf5..8f3ade0dc63d5 100644 --- a/packages/@aws-cdk/aws-dynamodb/test/integ.dynamodb.kinesis-stream.expected.json +++ b/packages/@aws-cdk/aws-dynamodb/test/integ.dynamodb.kinesis-stream.expected.json @@ -4,6 +4,9 @@ "Type": "AWS::Kinesis::Stream", "Properties": { "ShardCount": 1, + "StreamModeDetails": { + "StreamMode": "PROVISIONED" + }, "RetentionPeriodHours": 24, "StreamEncryption": { "Fn::If": [ @@ -73,4 +76,4 @@ ] } } -} \ No newline at end of file +} diff --git a/packages/@aws-cdk/aws-events-targets/test/kinesis/integ.kinesis-stream.expected.json b/packages/@aws-cdk/aws-events-targets/test/kinesis/integ.kinesis-stream.expected.json index 4151890c35ef3..babfefc9b5972 100644 --- a/packages/@aws-cdk/aws-events-targets/test/kinesis/integ.kinesis-stream.expected.json +++ b/packages/@aws-cdk/aws-events-targets/test/kinesis/integ.kinesis-stream.expected.json @@ -4,6 +4,9 @@ "Type": "AWS::Kinesis::Stream", "Properties": { "ShardCount": 1, + "StreamModeDetails": { + "StreamMode": "PROVISIONED" + }, "RetentionPeriodHours": 24, "StreamEncryption": { "Fn::If": [ @@ -115,4 +118,4 @@ ] } } -} \ No newline at end of file +} diff --git a/packages/@aws-cdk/aws-kinesis/lib/stream.ts b/packages/@aws-cdk/aws-kinesis/lib/stream.ts index 364cb4436f59c..fb1c345a9a844 100644 --- a/packages/@aws-cdk/aws-kinesis/lib/stream.ts +++ b/packages/@aws-cdk/aws-kinesis/lib/stream.ts @@ -673,6 +673,9 @@ export interface StreamProps { /** * The number of shards for the stream. + * + * Can only be provided if streamMode is Provisioned. + * * @default 1 */ readonly shardCount?: number; @@ -752,9 +755,15 @@ export class Stream extends StreamBase { physicalName: props.streamName, }); - const shardCount = props.shardCount || 1; + let shardCount = props.shardCount; + const streamMode = props.streamMode ?? StreamMode.PROVISIONED; - const streamMode = props.streamMode; + if (streamMode === StreamMode.ON_DEMAND && shardCount !== undefined) { + throw new Error(`streamMode must be set to ${StreamMode.PROVISIONED} (default) when specifying shardCount`); + } + if (streamMode === StreamMode.PROVISIONED && shardCount === undefined) { + shardCount = 1; + } const retentionPeriodHours = props.retentionPeriod?.toHours() ?? 24; if (!Token.isUnresolved(retentionPeriodHours)) { diff --git a/packages/@aws-cdk/aws-kinesis/test/integ.stream-dashboard.expected.json b/packages/@aws-cdk/aws-kinesis/test/integ.stream-dashboard.expected.json index 28a166e76fd1f..19b702e60830d 100644 --- a/packages/@aws-cdk/aws-kinesis/test/integ.stream-dashboard.expected.json +++ b/packages/@aws-cdk/aws-kinesis/test/integ.stream-dashboard.expected.json @@ -4,6 +4,9 @@ "Type": "AWS::Kinesis::Stream", "Properties": { "ShardCount": 1, + "StreamModeDetails": { + "StreamMode": "PROVISIONED" + }, "RetentionPeriodHours": 24, "StreamEncryption": { "Fn::If": [ @@ -203,4 +206,4 @@ ] } } -} \ No newline at end of file +} diff --git a/packages/@aws-cdk/aws-kinesis/test/integ.stream.expected.json b/packages/@aws-cdk/aws-kinesis/test/integ.stream.expected.json index 41230acc599a2..e4e0a7b73bd68 100644 --- a/packages/@aws-cdk/aws-kinesis/test/integ.stream.expected.json +++ b/packages/@aws-cdk/aws-kinesis/test/integ.stream.expected.json @@ -72,6 +72,9 @@ "Type": "AWS::Kinesis::Stream", "Properties": { "ShardCount": 1, + "StreamModeDetails": { + "StreamMode": "PROVISIONED" + }, "RetentionPeriodHours": 24, "StreamEncryption": { "Fn::If": [ @@ -110,4 +113,4 @@ ] } } -} \ No newline at end of file +} diff --git a/packages/@aws-cdk/aws-kinesis/test/stream.test.ts b/packages/@aws-cdk/aws-kinesis/test/stream.test.ts index cad461b3250ca..8e4fa385133dd 100644 --- a/packages/@aws-cdk/aws-kinesis/test/stream.test.ts +++ b/packages/@aws-cdk/aws-kinesis/test/stream.test.ts @@ -22,6 +22,9 @@ describe('Kinesis data streams', () => { Type: 'AWS::Kinesis::Stream', Properties: { ShardCount: 1, + StreamModeDetails: { + StreamMode: StreamMode.PROVISIONED, + }, RetentionPeriodHours: 24, StreamEncryption: { 'Fn::If': [ @@ -75,6 +78,9 @@ describe('Kinesis data streams', () => { Type: 'AWS::Kinesis::Stream', Properties: { ShardCount: 1, + StreamModeDetails: { + StreamMode: StreamMode.PROVISIONED, + }, RetentionPeriodHours: 24, StreamEncryption: { 'Fn::If': [ @@ -94,6 +100,9 @@ describe('Kinesis data streams', () => { Type: 'AWS::Kinesis::Stream', Properties: { ShardCount: 1, + StreamModeDetails: { + StreamMode: StreamMode.PROVISIONED, + }, RetentionPeriodHours: 24, StreamEncryption: { 'Fn::If': [ @@ -158,6 +167,9 @@ describe('Kinesis data streams', () => { Type: 'AWS::Kinesis::Stream', Properties: { ShardCount: 2, + StreamModeDetails: { + StreamMode: StreamMode.PROVISIONED, + }, RetentionPeriodHours: 24, StreamEncryption: { 'Fn::If': [ @@ -212,6 +224,9 @@ describe('Kinesis data streams', () => { Type: 'AWS::Kinesis::Stream', Properties: { ShardCount: 1, + StreamModeDetails: { + StreamMode: StreamMode.PROVISIONED, + }, RetentionPeriodHours: 168, StreamEncryption: { 'Fn::If': [ @@ -283,6 +298,9 @@ describe('Kinesis data streams', () => { Type: 'AWS::Kinesis::Stream', Properties: { ShardCount: 1, + StreamModeDetails: { + StreamMode: StreamMode.PROVISIONED, + }, RetentionPeriodHours: 24, StreamEncryption: { EncryptionType: 'KMS', @@ -319,6 +337,9 @@ describe('Kinesis data streams', () => { // THEN expect(stack).toHaveResource('AWS::Kinesis::Stream', { ShardCount: 1, + StreamModeDetails: { + StreamMode: StreamMode.PROVISIONED, + }, RetentionPeriodHours: 24, StreamEncryption: { EncryptionType: 'KMS', @@ -365,8 +386,11 @@ describe('Kinesis data streams', () => { }); expect(stack).toHaveResource('AWS::Kinesis::Stream', { - RetentionPeriodHours: 24, ShardCount: 1, + StreamModeDetails: { + StreamMode: StreamMode.PROVISIONED, + }, + RetentionPeriodHours: 24, StreamEncryption: { EncryptionType: 'KMS', KeyId: stack.resolve(explicitKey.keyArn), @@ -374,11 +398,11 @@ describe('Kinesis data streams', () => { }); }), - test.each([StreamMode.ON_DEMAND, StreamMode.PROVISIONED])('uses explicit capacity mode %s', (mode: StreamMode) => { + test('uses explicit provisioned streamMode', () => { const stack = new Stack(); new Stream(stack, 'MyStream', { - streamMode: mode, + streamMode: StreamMode.PROVISIONED, }); expect(stack).toMatchTemplate({ @@ -386,10 +410,66 @@ describe('Kinesis data streams', () => { MyStream5C050E93: { Type: 'AWS::Kinesis::Stream', Properties: { + RetentionPeriodHours: 24, ShardCount: 1, + StreamModeDetails: { + StreamMode: StreamMode.PROVISIONED, + }, + StreamEncryption: { + 'Fn::If': [ + 'AwsCdkKinesisEncryptedStreamsUnsupportedRegions', + { + Ref: 'AWS::NoValue', + }, + { + EncryptionType: 'KMS', + KeyId: 'alias/aws/kinesis', + }, + ], + }, + }, + }, + }, + Conditions: { + AwsCdkKinesisEncryptedStreamsUnsupportedRegions: { + 'Fn::Or': [ + { + 'Fn::Equals': [ + { + Ref: 'AWS::Region', + }, + 'cn-north-1', + ], + }, + { + 'Fn::Equals': [ + { + Ref: 'AWS::Region', + }, + 'cn-northwest-1', + ], + }, + ], + }, + }, + }); + }); + + test('uses explicit on-demand streamMode', () => { + const stack = new Stack(); + + new Stream(stack, 'MyStream', { + streamMode: StreamMode.ON_DEMAND, + }); + + expect(stack).toMatchTemplate({ + Resources: { + MyStream5C050E93: { + Type: 'AWS::Kinesis::Stream', + Properties: { RetentionPeriodHours: 24, StreamModeDetails: { - StreamMode: StreamMode[mode], + StreamMode: StreamMode.ON_DEMAND, }, StreamEncryption: { 'Fn::If': [ @@ -431,6 +511,17 @@ describe('Kinesis data streams', () => { }); }); + test('throws when using shardCount with on-demand streamMode', () => { + const stack = new Stack(); + + expect(() => { + new Stream(stack, 'MyStream', { + shardCount: 2, + streamMode: StreamMode.ON_DEMAND, + }); + }).toThrow(`streamMode must be set to ${StreamMode.PROVISIONED} (default) when specifying shardCount`); + }); + test('grantRead creates and attaches a policy with read only access to the principal', () => { const stack = new Stack(); const stream = new Stream(stack, 'MyStream', { @@ -536,6 +627,9 @@ describe('Kinesis data streams', () => { Type: 'AWS::Kinesis::Stream', Properties: { ShardCount: 1, + StreamModeDetails: { + StreamMode: StreamMode.PROVISIONED, + }, RetentionPeriodHours: 24, StreamEncryption: { EncryptionType: 'KMS', @@ -695,6 +789,9 @@ describe('Kinesis data streams', () => { Type: 'AWS::Kinesis::Stream', Properties: { ShardCount: 1, + StreamModeDetails: { + StreamMode: StreamMode.PROVISIONED, + }, RetentionPeriodHours: 24, StreamEncryption: { EncryptionType: 'KMS', @@ -845,8 +942,11 @@ describe('Kinesis data streams', () => { MyStream5C050E93: { Type: 'AWS::Kinesis::Stream', Properties: { - RetentionPeriodHours: 24, ShardCount: 1, + StreamModeDetails: { + StreamMode: StreamMode.PROVISIONED, + }, + RetentionPeriodHours: 24, StreamEncryption: { EncryptionType: 'KMS', KeyId: { @@ -915,6 +1015,9 @@ describe('Kinesis data streams', () => { Type: 'AWS::Kinesis::Stream', Properties: { ShardCount: 1, + StreamModeDetails: { + StreamMode: StreamMode.PROVISIONED, + }, RetentionPeriodHours: 24, StreamEncryption: { 'Fn::If': [ @@ -1003,6 +1106,9 @@ describe('Kinesis data streams', () => { Type: 'AWS::Kinesis::Stream', Properties: { ShardCount: 1, + StreamModeDetails: { + StreamMode: StreamMode.PROVISIONED, + }, RetentionPeriodHours: 24, StreamEncryption: { 'Fn::If': [ @@ -1083,6 +1189,9 @@ describe('Kinesis data streams', () => { Type: 'AWS::Kinesis::Stream', Properties: { ShardCount: 1, + StreamModeDetails: { + StreamMode: StreamMode.PROVISIONED, + }, RetentionPeriodHours: 24, StreamEncryption: { 'Fn::If': [ @@ -1173,6 +1282,9 @@ describe('Kinesis data streams', () => { Type: 'AWS::Kinesis::Stream', Properties: { ShardCount: 1, + StreamModeDetails: { + StreamMode: StreamMode.PROVISIONED, + }, RetentionPeriodHours: 24, StreamEncryption: { 'Fn::If': [ @@ -1255,6 +1367,9 @@ describe('Kinesis data streams', () => { Type: 'AWS::Kinesis::Stream', Properties: { ShardCount: 1, + StreamModeDetails: { + StreamMode: StreamMode.PROVISIONED, + }, RetentionPeriodHours: 24, StreamEncryption: { 'Fn::If': [ @@ -1372,6 +1487,9 @@ describe('Kinesis data streams', () => { Type: 'AWS::Kinesis::Stream', Properties: { ShardCount: 1, + StreamModeDetails: { + StreamMode: StreamMode.PROVISIONED, + }, RetentionPeriodHours: { Ref: 'myretentionperiod', }, diff --git a/packages/@aws-cdk/aws-kinesisfirehose/test/integ.delivery-stream.source-stream.expected.json b/packages/@aws-cdk/aws-kinesisfirehose/test/integ.delivery-stream.source-stream.expected.json index 896d0487a091c..a37685e2e47e8 100644 --- a/packages/@aws-cdk/aws-kinesisfirehose/test/integ.delivery-stream.source-stream.expected.json +++ b/packages/@aws-cdk/aws-kinesisfirehose/test/integ.delivery-stream.source-stream.expected.json @@ -75,6 +75,9 @@ "Type": "AWS::Kinesis::Stream", "Properties": { "ShardCount": 1, + "StreamModeDetails": { + "StreamMode": "PROVISIONED" + }, "RetentionPeriodHours": 24, "StreamEncryption": { "Fn::If": [ @@ -281,4 +284,4 @@ } } } -} \ No newline at end of file +} diff --git a/packages/@aws-cdk/aws-lambda-event-sources/test/integ.kinesis.expected.json b/packages/@aws-cdk/aws-lambda-event-sources/test/integ.kinesis.expected.json index c1690f2f03aac..d14d727e34999 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/test/integ.kinesis.expected.json +++ b/packages/@aws-cdk/aws-lambda-event-sources/test/integ.kinesis.expected.json @@ -116,6 +116,9 @@ "Type": "AWS::Kinesis::Stream", "Properties": { "ShardCount": 1, + "StreamModeDetails": { + "StreamMode": "PROVISIONED" + }, "RetentionPeriodHours": 24, "StreamEncryption": { "Fn::If": [ @@ -154,4 +157,4 @@ ] } } -} \ No newline at end of file +} diff --git a/packages/@aws-cdk/aws-lambda-event-sources/test/integ.kinesiswithdlq.expected.json b/packages/@aws-cdk/aws-lambda-event-sources/test/integ.kinesiswithdlq.expected.json index 616adaef6a86a..a16660f565e76 100644 --- a/packages/@aws-cdk/aws-lambda-event-sources/test/integ.kinesiswithdlq.expected.json +++ b/packages/@aws-cdk/aws-lambda-event-sources/test/integ.kinesiswithdlq.expected.json @@ -140,6 +140,9 @@ "Type": "AWS::Kinesis::Stream", "Properties": { "ShardCount": 1, + "StreamModeDetails": { + "StreamMode": "PROVISIONED" + }, "RetentionPeriodHours": 24, "StreamEncryption": { "Fn::If": [ @@ -203,4 +206,4 @@ ] } } -} \ No newline at end of file +}