Feature request: parse DynamoDB Stream events via Kinesis Data Stream #3193
Description
Use case
When working with Amazon DynamoDB Streams and Amazon Kinesis Data Streams, with events being consumed by a AWS Lambda function customers want to parse and validate events before processing them.
flowchart LR
A[DynamoDB Table] -->|DynamoDB Stream| B[Kinesis Data Stream]
B -->|triggers| C[Lambda. Function]
In these cases, the change event from the DynamoDB Stream gets encoded and wrapped into a Kinesis Data Stream event that acts as an envelope. The current schemas we have available for both DynamoDB Stream and Kinesis Data Stream are not enough to support this use case.
The current DynamoDBStreamSchema assumes that events come in a shape that looks like this:
{
"Records": [
{
"eventID": "1",
"eventVersion": "1.0",
"dynamodb": {
"ApproximateCreationDateTime": 1693997155.0,
"Keys": {
"Id": {
"N": "101"
}
},
"NewImage": {
"Message": {
"S": "New item!"
},
"Id": {
"N": "101"
}
},
"StreamViewType": "NEW_AND_OLD_IMAGES",
"SequenceNumber": "111",
"SizeBytes": 26
},
"awsRegion": "us-west-2",
"eventName": "INSERT",
"eventSourceARN": "eventsource_arn",
"eventSource": "aws:dynamodb"
}
]
}
On the other hand, when these same events come through a Kinesis Data Stream, they look like this (note that the data
field actually comes as base64 encoded, in the example below I am presenting it as decoded for easier understanding ):
{
Records: [
{
eventSource: 'aws:kinesis',
eventVersion: '1.0',
eventID:
'shardId-000000000000:49656632116218945776331460018176327016585772817654480898',
eventName: 'aws:kinesis:record',
invokeIdentityArn:
'arn:aws:iam::123456789012:role/KinesisddbStack-MyFunctionServiceRole3C357FF2-bxvXci1V8a2G',
eventSourceARN:
'arn:aws:kinesis:eu-west-1:123456789012:stream/KinesisddbStack-MyDataStream2006A1E4-BJi822bWFiFV',
kinesis: {
kinesisSchemaVersion: '1.0',
partitionKey: '508B17441EAB608C8643A4479FCEF4A5',
sequenceNumber:
'49656632116218945776331460018176327016585772817654480898',
approximateArrivalTimestamp: 1728572253.015,
data: {
awsRegion: 'eu-west-1',
eventID: 'ec61129b-46af-4e89-b5d7-500aa6b9eeda',
eventName: 'INSERT',
userIdentity: null,
recordFormat: 'application/json',
tableName: 'MyTable',
dynamodb: {
ApproximateCreationDateTime: 1728572252034,
Keys: {
id: {
S: 'foo',
},
},
NewImage: {
id: {
S: 'foo',
},
},
SizeBytes: 24,
},
eventSource: 'aws:dynamodb',
},
// data: 'eyJ...ZGIifQ==',
},
},
],
}
This means it's not possible to use DynamoDBStreamSchema
to parse the data
attribute due to the two structures being incompatible.
We should work to add support for a new schema specific to this type of integration.
Solution/User Experience
The schema below was successful in earlier tests
import { parser } from '@aws-lambda-powertools/parser/middleware';
import { KinesisEnvelope } from '@aws-lambda-powertools/parser/envelopes';
import middy from '@middy/core';
import { z } from 'zod';
const DynamoDBStreamEvent = z.object({
awsRegion: z.string(),
eventID: z.string(),
eventName: z.enum(['INSERT', 'MODIFY', 'REMOVE']),
userIdentity: z.null(),
recordFormat: z.string(),
tableName: z.string(),
dynamodb: z.object({
ApproximateCreationDateTime: z.number(),
Keys: z.any().optional(),
NewImage: z.any().optional(),
SizeBytes: z.number(),
}),
});
export const handler = middy(async (event: unknown) => {
return {
statusCode: 200,
body: JSON.stringify('Hello, World!'),
};
}).use(
parser({
schema: DynamoDBStreamEvent,
envelope: KinesisEnvelope,
})
);
Alternative solutions
No response
Acknowledgment
- This feature request meets Powertools for AWS Lambda (TypeScript) Tenets
- Should this be considered in other Powertools for AWS Lambda languages? i.e. Python, Java, and .NET
Future readers
Please react with 👍 and your use case to help us understand customer demand.
Metadata
Assignees
Labels
Type
Projects
Status
Coming soon
Activity