Skip to content

Commit

Permalink
fix(aws-kinesis): remove default shard count when stream mode is on-d…
Browse files Browse the repository at this point in the history
…emand and set default mode to provisioned (aws#18221)

Change the default Kinesis Data Stream's stream mode to provisioned from undefined to make the active configuration more explicit in the resulting CloudFormation templates.

Fix an issue whereby the shard count is always set when the stream mode is set to on-demand, which is invalid. Shard count still defaults to `1` in provisioned mode, but is left undefined in on-demand mode.

Add validation for the above so that an error is thrown from CDK when specifying on-demand mode with a shard count.

Fixes aws#18139

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
igilham committed Jan 6, 2022
1 parent 7868869 commit cac11bb
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
"Type": "AWS::Kinesis::Stream",
"Properties": {
"ShardCount": 1,
"StreamModeDetails": {
"StreamMode": "PROVISIONED"
},
"RetentionPeriodHours": 24,
"StreamEncryption": {
"Fn::If": [
Expand Down Expand Up @@ -73,4 +76,4 @@
]
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
"Type": "AWS::Kinesis::Stream",
"Properties": {
"ShardCount": 1,
"StreamModeDetails": {
"StreamMode": "PROVISIONED"
},
"RetentionPeriodHours": 24,
"StreamEncryption": {
"Fn::If": [
Expand Down Expand Up @@ -115,4 +118,4 @@
]
}
}
}
}
13 changes: 11 additions & 2 deletions packages/@aws-cdk/aws-kinesis/lib/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,9 @@ export interface StreamProps {

/**
* The number of shards for the stream.
*
* Can only be provided if streamMode is Provisioned.
*
* @default 1
*/
readonly shardCount?: number;
Expand Down Expand Up @@ -752,9 +755,15 @@ export class Stream extends StreamBase {
physicalName: props.streamName,
});

const shardCount = props.shardCount || 1;
let shardCount = props.shardCount;
const streamMode = props.streamMode ?? StreamMode.PROVISIONED;

const streamMode = props.streamMode;
if (streamMode === StreamMode.ON_DEMAND && shardCount !== undefined) {
throw new Error(`streamMode must be set to ${StreamMode.PROVISIONED} (default) when specifying shardCount`);
}
if (streamMode === StreamMode.PROVISIONED && shardCount === undefined) {
shardCount = 1;
}

const retentionPeriodHours = props.retentionPeriod?.toHours() ?? 24;
if (!Token.isUnresolved(retentionPeriodHours)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
"Type": "AWS::Kinesis::Stream",
"Properties": {
"ShardCount": 1,
"StreamModeDetails": {
"StreamMode": "PROVISIONED"
},
"RetentionPeriodHours": 24,
"StreamEncryption": {
"Fn::If": [
Expand Down Expand Up @@ -203,4 +206,4 @@
]
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@
"Type": "AWS::Kinesis::Stream",
"Properties": {
"ShardCount": 1,
"StreamModeDetails": {
"StreamMode": "PROVISIONED"
},
"RetentionPeriodHours": 24,
"StreamEncryption": {
"Fn::If": [
Expand Down Expand Up @@ -110,4 +113,4 @@
]
}
}
}
}
128 changes: 123 additions & 5 deletions packages/@aws-cdk/aws-kinesis/test/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ describe('Kinesis data streams', () => {
Type: 'AWS::Kinesis::Stream',
Properties: {
ShardCount: 1,
StreamModeDetails: {
StreamMode: StreamMode.PROVISIONED,
},
RetentionPeriodHours: 24,
StreamEncryption: {
'Fn::If': [
Expand Down Expand Up @@ -75,6 +78,9 @@ describe('Kinesis data streams', () => {
Type: 'AWS::Kinesis::Stream',
Properties: {
ShardCount: 1,
StreamModeDetails: {
StreamMode: StreamMode.PROVISIONED,
},
RetentionPeriodHours: 24,
StreamEncryption: {
'Fn::If': [
Expand All @@ -94,6 +100,9 @@ describe('Kinesis data streams', () => {
Type: 'AWS::Kinesis::Stream',
Properties: {
ShardCount: 1,
StreamModeDetails: {
StreamMode: StreamMode.PROVISIONED,
},
RetentionPeriodHours: 24,
StreamEncryption: {
'Fn::If': [
Expand Down Expand Up @@ -158,6 +167,9 @@ describe('Kinesis data streams', () => {
Type: 'AWS::Kinesis::Stream',
Properties: {
ShardCount: 2,
StreamModeDetails: {
StreamMode: StreamMode.PROVISIONED,
},
RetentionPeriodHours: 24,
StreamEncryption: {
'Fn::If': [
Expand Down Expand Up @@ -212,6 +224,9 @@ describe('Kinesis data streams', () => {
Type: 'AWS::Kinesis::Stream',
Properties: {
ShardCount: 1,
StreamModeDetails: {
StreamMode: StreamMode.PROVISIONED,
},
RetentionPeriodHours: 168,
StreamEncryption: {
'Fn::If': [
Expand Down Expand Up @@ -283,6 +298,9 @@ describe('Kinesis data streams', () => {
Type: 'AWS::Kinesis::Stream',
Properties: {
ShardCount: 1,
StreamModeDetails: {
StreamMode: StreamMode.PROVISIONED,
},
RetentionPeriodHours: 24,
StreamEncryption: {
EncryptionType: 'KMS',
Expand Down Expand Up @@ -319,6 +337,9 @@ describe('Kinesis data streams', () => {
// THEN
expect(stack).toHaveResource('AWS::Kinesis::Stream', {
ShardCount: 1,
StreamModeDetails: {
StreamMode: StreamMode.PROVISIONED,
},
RetentionPeriodHours: 24,
StreamEncryption: {
EncryptionType: 'KMS',
Expand Down Expand Up @@ -365,31 +386,90 @@ describe('Kinesis data streams', () => {
});

expect(stack).toHaveResource('AWS::Kinesis::Stream', {
RetentionPeriodHours: 24,
ShardCount: 1,
StreamModeDetails: {
StreamMode: StreamMode.PROVISIONED,
},
RetentionPeriodHours: 24,
StreamEncryption: {
EncryptionType: 'KMS',
KeyId: stack.resolve(explicitKey.keyArn),
},
});
}),

test.each([StreamMode.ON_DEMAND, StreamMode.PROVISIONED])('uses explicit capacity mode %s', (mode: StreamMode) => {
test('uses explicit provisioned streamMode', () => {
const stack = new Stack();

new Stream(stack, 'MyStream', {
streamMode: mode,
streamMode: StreamMode.PROVISIONED,
});

expect(stack).toMatchTemplate({
Resources: {
MyStream5C050E93: {
Type: 'AWS::Kinesis::Stream',
Properties: {
RetentionPeriodHours: 24,
ShardCount: 1,
StreamModeDetails: {
StreamMode: StreamMode.PROVISIONED,
},
StreamEncryption: {
'Fn::If': [
'AwsCdkKinesisEncryptedStreamsUnsupportedRegions',
{
Ref: 'AWS::NoValue',
},
{
EncryptionType: 'KMS',
KeyId: 'alias/aws/kinesis',
},
],
},
},
},
},
Conditions: {
AwsCdkKinesisEncryptedStreamsUnsupportedRegions: {
'Fn::Or': [
{
'Fn::Equals': [
{
Ref: 'AWS::Region',
},
'cn-north-1',
],
},
{
'Fn::Equals': [
{
Ref: 'AWS::Region',
},
'cn-northwest-1',
],
},
],
},
},
});
});

test('uses explicit on-demand streamMode', () => {
const stack = new Stack();

new Stream(stack, 'MyStream', {
streamMode: StreamMode.ON_DEMAND,
});

expect(stack).toMatchTemplate({
Resources: {
MyStream5C050E93: {
Type: 'AWS::Kinesis::Stream',
Properties: {
RetentionPeriodHours: 24,
StreamModeDetails: {
StreamMode: StreamMode[mode],
StreamMode: StreamMode.ON_DEMAND,
},
StreamEncryption: {
'Fn::If': [
Expand Down Expand Up @@ -431,6 +511,17 @@ describe('Kinesis data streams', () => {
});
});

test('throws when using shardCount with on-demand streamMode', () => {
const stack = new Stack();

expect(() => {
new Stream(stack, 'MyStream', {
shardCount: 2,
streamMode: StreamMode.ON_DEMAND,
});
}).toThrow(`streamMode must be set to ${StreamMode.PROVISIONED} (default) when specifying shardCount`);
});

test('grantRead creates and attaches a policy with read only access to the principal', () => {
const stack = new Stack();
const stream = new Stream(stack, 'MyStream', {
Expand Down Expand Up @@ -536,6 +627,9 @@ describe('Kinesis data streams', () => {
Type: 'AWS::Kinesis::Stream',
Properties: {
ShardCount: 1,
StreamModeDetails: {
StreamMode: StreamMode.PROVISIONED,
},
RetentionPeriodHours: 24,
StreamEncryption: {
EncryptionType: 'KMS',
Expand Down Expand Up @@ -695,6 +789,9 @@ describe('Kinesis data streams', () => {
Type: 'AWS::Kinesis::Stream',
Properties: {
ShardCount: 1,
StreamModeDetails: {
StreamMode: StreamMode.PROVISIONED,
},
RetentionPeriodHours: 24,
StreamEncryption: {
EncryptionType: 'KMS',
Expand Down Expand Up @@ -845,8 +942,11 @@ describe('Kinesis data streams', () => {
MyStream5C050E93: {
Type: 'AWS::Kinesis::Stream',
Properties: {
RetentionPeriodHours: 24,
ShardCount: 1,
StreamModeDetails: {
StreamMode: StreamMode.PROVISIONED,
},
RetentionPeriodHours: 24,
StreamEncryption: {
EncryptionType: 'KMS',
KeyId: {
Expand Down Expand Up @@ -915,6 +1015,9 @@ describe('Kinesis data streams', () => {
Type: 'AWS::Kinesis::Stream',
Properties: {
ShardCount: 1,
StreamModeDetails: {
StreamMode: StreamMode.PROVISIONED,
},
RetentionPeriodHours: 24,
StreamEncryption: {
'Fn::If': [
Expand Down Expand Up @@ -1003,6 +1106,9 @@ describe('Kinesis data streams', () => {
Type: 'AWS::Kinesis::Stream',
Properties: {
ShardCount: 1,
StreamModeDetails: {
StreamMode: StreamMode.PROVISIONED,
},
RetentionPeriodHours: 24,
StreamEncryption: {
'Fn::If': [
Expand Down Expand Up @@ -1083,6 +1189,9 @@ describe('Kinesis data streams', () => {
Type: 'AWS::Kinesis::Stream',
Properties: {
ShardCount: 1,
StreamModeDetails: {
StreamMode: StreamMode.PROVISIONED,
},
RetentionPeriodHours: 24,
StreamEncryption: {
'Fn::If': [
Expand Down Expand Up @@ -1173,6 +1282,9 @@ describe('Kinesis data streams', () => {
Type: 'AWS::Kinesis::Stream',
Properties: {
ShardCount: 1,
StreamModeDetails: {
StreamMode: StreamMode.PROVISIONED,
},
RetentionPeriodHours: 24,
StreamEncryption: {
'Fn::If': [
Expand Down Expand Up @@ -1255,6 +1367,9 @@ describe('Kinesis data streams', () => {
Type: 'AWS::Kinesis::Stream',
Properties: {
ShardCount: 1,
StreamModeDetails: {
StreamMode: StreamMode.PROVISIONED,
},
RetentionPeriodHours: 24,
StreamEncryption: {
'Fn::If': [
Expand Down Expand Up @@ -1372,6 +1487,9 @@ describe('Kinesis data streams', () => {
Type: 'AWS::Kinesis::Stream',
Properties: {
ShardCount: 1,
StreamModeDetails: {
StreamMode: StreamMode.PROVISIONED,
},
RetentionPeriodHours: {
Ref: 'myretentionperiod',
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@
"Type": "AWS::Kinesis::Stream",
"Properties": {
"ShardCount": 1,
"StreamModeDetails": {
"StreamMode": "PROVISIONED"
},
"RetentionPeriodHours": 24,
"StreamEncryption": {
"Fn::If": [
Expand Down Expand Up @@ -281,4 +284,4 @@
}
}
}
}
}
Loading

0 comments on commit cac11bb

Please sign in to comment.