Skip to content

Feature request: parse DynamoDB Stream events via Kinesis Data Stream #3193

Closed
@dreamorosi

Description

Use case

When working with Amazon DynamoDB Streams and Amazon Kinesis Data Streams, with events being consumed by a AWS Lambda function customers want to parse and validate events before processing them.

flowchart LR
    A[DynamoDB Table] -->|DynamoDB Stream| B[Kinesis Data Stream]
    B -->|triggers| C[Lambda. Function]
Loading

In these cases, the change event from the DynamoDB Stream gets encoded and wrapped into a Kinesis Data Stream event that acts as an envelope. The current schemas we have available for both DynamoDB Stream and Kinesis Data Stream are not enough to support this use case.

The current DynamoDBStreamSchema assumes that events come in a shape that looks like this:

{
  "Records": [
     {
        "eventID": "1",
        "eventVersion": "1.0",
        "dynamodb": {
          "ApproximateCreationDateTime": 1693997155.0,
          "Keys": {
            "Id": {
              "N": "101"
            }
          },
          "NewImage": {
            "Message": {
              "S": "New item!"
            },
            "Id": {
              "N": "101"
            }
          },
          "StreamViewType": "NEW_AND_OLD_IMAGES",
          "SequenceNumber": "111",
          "SizeBytes": 26
        },
        "awsRegion": "us-west-2",
        "eventName": "INSERT",
        "eventSourceARN": "eventsource_arn",
        "eventSource": "aws:dynamodb"
      }
  ]
}

On the other hand, when these same events come through a Kinesis Data Stream, they look like this (note that the data field actually comes as base64 encoded, in the example below I am presenting it as decoded for easier understanding ):

{
  Records: [
    {
      eventSource: 'aws:kinesis',
      eventVersion: '1.0',
      eventID:
        'shardId-000000000000:49656632116218945776331460018176327016585772817654480898',
      eventName: 'aws:kinesis:record',
      invokeIdentityArn:
        'arn:aws:iam::123456789012:role/KinesisddbStack-MyFunctionServiceRole3C357FF2-bxvXci1V8a2G',
      eventSourceARN:
        'arn:aws:kinesis:eu-west-1:123456789012:stream/KinesisddbStack-MyDataStream2006A1E4-BJi822bWFiFV',
      kinesis: {
        kinesisSchemaVersion: '1.0',
        partitionKey: '508B17441EAB608C8643A4479FCEF4A5',
        sequenceNumber:
          '49656632116218945776331460018176327016585772817654480898',
        approximateArrivalTimestamp: 1728572253.015,
        data: {
          awsRegion: 'eu-west-1',
          eventID: 'ec61129b-46af-4e89-b5d7-500aa6b9eeda',
          eventName: 'INSERT',
          userIdentity: null,
          recordFormat: 'application/json',
          tableName: 'MyTable',
          dynamodb: {
            ApproximateCreationDateTime: 1728572252034,
            Keys: {
              id: {
                S: 'foo',
              },
            },
            NewImage: {
              id: {
                S: 'foo',
              },
            },
            SizeBytes: 24,
          },
          eventSource: 'aws:dynamodb',
        },
        // data: 'eyJ...ZGIifQ==',
      },
    },
  ],
}

This means it's not possible to use DynamoDBStreamSchema to parse the data attribute due to the two structures being incompatible.

We should work to add support for a new schema specific to this type of integration.

Solution/User Experience

The schema below was successful in earlier tests

import { parser } from '@aws-lambda-powertools/parser/middleware';
import { KinesisEnvelope } from '@aws-lambda-powertools/parser/envelopes';
import middy from '@middy/core';
import { z } from 'zod';

const DynamoDBStreamEvent = z.object({
  awsRegion: z.string(),
  eventID: z.string(),
  eventName: z.enum(['INSERT', 'MODIFY', 'REMOVE']),
  userIdentity: z.null(),
  recordFormat: z.string(),
  tableName: z.string(),
  dynamodb: z.object({
    ApproximateCreationDateTime: z.number(),
    Keys: z.any().optional(),
    NewImage: z.any().optional(),
    SizeBytes: z.number(),
  }),
});

export const handler = middy(async (event: unknown) => {
  return {
    statusCode: 200,
    body: JSON.stringify('Hello, World!'),
  };
}).use(
  parser({
    schema: DynamoDBStreamEvent,
    envelope: KinesisEnvelope,
  }) 
);

Alternative solutions

No response

Acknowledgment

Future readers

Please react with 👍 and your use case to help us understand customer demand.

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Assignees

Labels

completedThis item is complete and has been merged/shippedfeature-requestThis item refers to a feature request for an existing or new utilityparserThis item relates to the Parser Utility

Type

No type

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions