Skip to content

feat(event_source): deserialize dynamodb new_image, old_image as dict #1580

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5d5b4b8
chore(deps-dev): bump flake8-builtins from 1.5.3 to 2.0.0 (#1582)
dependabot[bot] Oct 11, 2022
154df7c
docs(homepage): include .NET powertools
heitorlessa Oct 11, 2022
73865b3
update changelog with latest changes
Oct 11, 2022
ea28f04
docs(logger): fix typo. (#1587)
digitalisx Oct 11, 2022
263ba7b
update changelog with latest changes
Oct 11, 2022
5d755a7
docs(governance): new form to allow customers self-nominate as public…
heitorlessa Oct 12, 2022
99d53a7
docs(idempotency) - Update invalid link target. (#1588)
eldritchideen Oct 12, 2022
0b9e2d1
update changelog with latest changes
Oct 12, 2022
c9fd2f1
chore(deps-dev): bump mypy-boto3-ssm from 1.24.81 to 1.24.90 (#1594)
dependabot[bot] Oct 13, 2022
74cf2cc
docs(idempotency): "persisntence" typo (#1596)
senmm Oct 14, 2022
7dbc6da
update changelog with latest changes
Oct 14, 2022
e2d4744
docs(governance): allow community to suggest feature content (#1593)
heitorlessa Oct 14, 2022
abb8043
fix(parser): loose validation on SNS fields to support FIFO (#1606)
heitorlessa Oct 14, 2022
fb59763
update changelog with latest changes
Oct 14, 2022
c98e03b
chore(layer): bump to 1.31.1 (v39)
heitorlessa Oct 14, 2022
ec709c3
update changelog with latest changes
Oct 14, 2022
416ab1b
feat(data_classes): add KinesisFirehoseEvent (#1540)
ryandeivert Oct 17, 2022
fa2260a
update changelog with latest changes
Oct 17, 2022
97bc096
Fix spacing
shanab Oct 10, 2022
68704d6
feat(data-classes): add DynamoDBImageDeserializer
shanab Oct 10, 2022
d28ed9f
fix(typing): fix mypy errors
shanab Oct 10, 2022
ddfd50e
chore(deps): bump release-drafter/release-drafter from 5.21.0 to 5.21…
dependabot[bot] Oct 18, 2022
95700dc
feat(data-classes): Replace AttributeValue with values deserialized b…
shanab Oct 18, 2022
0e198fe
Merge branch 'v2' into feature/dynamodb-image-deserializer
shanab Oct 18, 2022
e055948
Merge branch 'awslabs:develop' into feature/dynamodb-image-deserializer
shanab Oct 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat(data_classes): add KinesisFirehoseEvent (#1540)
Co-authored-by: Rúben Fonseca <fonseka@gmail.com>
Co-authored-by: Leandro Damascena <leandro.damascena@gmail.com>
  • Loading branch information
3 people authored Oct 17, 2022
commit 416ab1b32ebf17a427ac428051acd11432f7cc95
2 changes: 2 additions & 0 deletions aws_lambda_powertools/utilities/data_classes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .event_bridge_event import EventBridgeEvent
from .event_source import event_source
from .kafka_event import KafkaEvent
from .kinesis_firehose_event import KinesisFirehoseEvent
from .kinesis_stream_event import KinesisStreamEvent
from .lambda_function_url_event import LambdaFunctionUrlEvent
from .s3_event import S3Event
Expand All @@ -32,6 +33,7 @@
"DynamoDBStreamEvent",
"EventBridgeEvent",
"KafkaEvent",
"KinesisFirehoseEvent",
"KinesisStreamEvent",
"LambdaFunctionUrlEvent",
"S3Event",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import base64
import json
from typing import Iterator, Optional

from aws_lambda_powertools.utilities.data_classes.common import DictWrapper


class KinesisFirehoseRecordMetadata(DictWrapper):
@property
def _metadata(self) -> dict:
"""Optional: metadata associated with this record; present only when Kinesis Stream is source"""
return self["kinesisRecordMetadata"] # could raise KeyError

@property
def shard_id(self) -> str:
"""Kinesis stream shard ID; present only when Kinesis Stream is source"""
return self._metadata["shardId"]

@property
def partition_key(self) -> str:
"""Kinesis stream partition key; present only when Kinesis Stream is source"""
return self._metadata["partitionKey"]

@property
def approximate_arrival_timestamp(self) -> int:
"""Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source"""
return self._metadata["approximateArrivalTimestamp"]

@property
def sequence_number(self) -> str:
"""Kinesis stream sequence number; present only when Kinesis Stream is source"""
return self._metadata["sequenceNumber"]

@property
def subsequence_number(self) -> str:
"""Kinesis stream sub-sequence number; present only when Kinesis Stream is source

Note: this will only be present for Kinesis streams using record aggregation
"""
return self._metadata["subsequenceNumber"]


class KinesisFirehoseRecord(DictWrapper):
@property
def approximate_arrival_timestamp(self) -> int:
"""The approximate time that the record was inserted into the delivery stream"""
return self["approximateArrivalTimestamp"]

@property
def record_id(self) -> str:
"""Record ID; uniquely identifies this record within the current batch"""
return self["recordId"]

@property
def data(self) -> str:
"""The data blob, base64-encoded"""
return self["data"]

@property
def metadata(self) -> Optional[KinesisFirehoseRecordMetadata]:
"""Optional: metadata associated with this record; present only when Kinesis Stream is source"""
return KinesisFirehoseRecordMetadata(self._data) if self.get("kinesisRecordMetadata") else None

@property
def data_as_bytes(self) -> bytes:
"""Decoded base64-encoded data as bytes"""
return base64.b64decode(self.data)

@property
def data_as_text(self) -> str:
"""Decoded base64-encoded data as text"""
return self.data_as_bytes.decode("utf-8")

@property
def data_as_json(self) -> dict:
"""Decoded base64-encoded data loaded to json"""
if self._json_data is None:
self._json_data = json.loads(self.data_as_text)
return self._json_data


class KinesisFirehoseEvent(DictWrapper):
"""Kinesis Data Firehose event

Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html
"""

@property
def invocation_id(self) -> str:
"""Unique ID for for Lambda invocation"""
return self["invocationId"]

@property
def delivery_stream_arn(self) -> str:
"""ARN of the Firehose Data Firehose Delivery Stream"""
return self["deliveryStreamArn"]

@property
def source_kinesis_stream_arn(self) -> Optional[str]:
"""ARN of the Kinesis Stream; present only when Kinesis Stream is source"""
return self.get("sourceKinesisStreamArn")

@property
def region(self) -> str:
"""AWS region where the event originated eg: us-east-1"""
return self["region"]

@property
def records(self) -> Iterator[KinesisFirehoseRecord]:
for record in self["records"]:
yield KinesisFirehoseRecord(record)
15 changes: 15 additions & 0 deletions docs/utilities/data_classes.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Event Source | Data_class
[EventBridge](#eventbridge) | `EventBridgeEvent`
[Kafka](#kafka) | `KafkaEvent`
[Kinesis Data Stream](#kinesis-streams) | `KinesisStreamEvent`
[Kinesis Firehose Delivery Stream](#kinesis-firehose-delivery-stream) | `KinesisFirehoseEvent`
[Lambda Function URL](#lambda-function-url) | `LambdaFunctionUrlEvent`
[Rabbit MQ](#rabbit-mq) | `RabbitMQEvent`
[S3](#s3) | `S3Event`
Expand Down Expand Up @@ -892,6 +893,20 @@ or plain text, depending on the original payload.
do_something_with(data)
```

### Kinesis Firehose delivery stream

Kinesis Firehose Data Transformation can use a Lambda Function to modify the records
inline, and re-emit them back to the Delivery Stream.

Similar to Kinesis Data Streams, the events contain base64 encoded data. You can use the helper
function to access the data either as json or plain text, depending on the original payload.

=== "app.py"

```python
--8<-- "examples/event_sources/src/kinesis_firehose_delivery_stream.py"
```

### Lambda Function URL

=== "app.py"
Expand Down
25 changes: 25 additions & 0 deletions examples/event_sources/src/kinesis_firehose_delivery_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import base64
import json

from aws_lambda_powertools.utilities.data_classes import KinesisFirehoseEvent, event_source
from aws_lambda_powertools.utilities.typing import LambdaContext


@event_source(data_class=KinesisFirehoseEvent)
def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext):
result = []

for record in event.records:
# if data was delivered as json; caches loaded value
data = record.data_as_json

processed_record = {
"recordId": record.record_id,
"data": base64.b64encode(json.dumps(data).encode("utf-8")),
"result": "Ok",
}

result.append(processed_record)

# return transformed records
return {"records": result}
14 changes: 7 additions & 7 deletions tests/events/kinesisFirehosePutEvent.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
"invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a",
"deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name",
"region": "us-east-2",
"records":[
"records": [
{
"recordId":"record1",
"approximateArrivalTimestamp":1664029185290,
"data":"SGVsbG8gV29ybGQ="
"recordId": "record1",
"approximateArrivalTimestamp": 1664029185290,
"data": "SGVsbG8gV29ybGQ="
},
{
"recordId":"record2",
"approximateArrivalTimestamp":1664029186945,
"data":"eyJIZWxsbyI6ICJXb3JsZCJ9"
"recordId": "record2",
"approximateArrivalTimestamp": 1664029186945,
"data": "eyJIZWxsbyI6ICJXb3JsZCJ9"
}
]
}
67 changes: 67 additions & 0 deletions tests/functional/test_data_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
CodePipelineJobEvent,
EventBridgeEvent,
KafkaEvent,
KinesisFirehoseEvent,
KinesisStreamEvent,
S3Event,
SESEvent,
Expand Down Expand Up @@ -1239,6 +1240,72 @@ def test_kafka_self_managed_event():
assert record.get_header_value("HeaderKey", case_sensitive=False) == b"headerValue"


def test_kinesis_firehose_kinesis_event():
event = KinesisFirehoseEvent(load_event("kinesisFirehoseKinesisEvent.json"))

assert event.region == "us-east-2"
assert event.invocation_id == "2b4d1ad9-2f48-94bd-a088-767c317e994a"
assert event.delivery_stream_arn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name"
assert event.source_kinesis_stream_arn == "arn:aws:kinesis:us-east-1:123456789012:stream/kinesis-source"

records = list(event.records)
assert len(records) == 2
record_01, record_02 = records[:]

assert record_01.approximate_arrival_timestamp == 1664028820148
assert record_01.record_id == "record1"
assert record_01.data == "SGVsbG8gV29ybGQ="
assert record_01.data_as_bytes == b"Hello World"
assert record_01.data_as_text == "Hello World"

assert record_01.metadata.shard_id == "shardId-000000000000"
assert record_01.metadata.partition_key == "4d1ad2b9-24f8-4b9d-a088-76e9947c317a"
assert record_01.metadata.approximate_arrival_timestamp == 1664028820148
assert record_01.metadata.sequence_number == "49546986683135544286507457936321625675700192471156785154"
assert record_01.metadata.subsequence_number == ""

assert record_02.approximate_arrival_timestamp == 1664028793294
assert record_02.record_id == "record2"
assert record_02.data == "eyJIZWxsbyI6ICJXb3JsZCJ9"
assert record_02.data_as_bytes == b'{"Hello": "World"}'
assert record_02.data_as_text == '{"Hello": "World"}'
assert record_02.data_as_json == {"Hello": "World"}

assert record_02.metadata.shard_id == "shardId-000000000001"
assert record_02.metadata.partition_key == "4d1ad2b9-24f8-4b9d-a088-76e9947c318a"
assert record_02.metadata.approximate_arrival_timestamp == 1664028793294
assert record_02.metadata.sequence_number == "49546986683135544286507457936321625675700192471156785155"
assert record_02.metadata.subsequence_number == ""


def test_kinesis_firehose_put_event():
event = KinesisFirehoseEvent(load_event("kinesisFirehosePutEvent.json"))

assert event.region == "us-east-2"
assert event.invocation_id == "2b4d1ad9-2f48-94bd-a088-767c317e994a"
assert event.delivery_stream_arn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name"
assert event.source_kinesis_stream_arn is None

records = list(event.records)
assert len(records) == 2
record_01, record_02 = records[:]

assert record_01.approximate_arrival_timestamp == 1664029185290
assert record_01.record_id == "record1"
assert record_01.data == "SGVsbG8gV29ybGQ="
assert record_01.data_as_bytes == b"Hello World"
assert record_01.data_as_text == "Hello World"
assert record_01.metadata is None

assert record_02.approximate_arrival_timestamp == 1664029186945
assert record_02.record_id == "record2"
assert record_02.data == "eyJIZWxsbyI6ICJXb3JsZCJ9"
assert record_02.data_as_bytes == b'{"Hello": "World"}'
assert record_02.data_as_text == '{"Hello": "World"}'
assert record_02.data_as_json == {"Hello": "World"}
assert record_02.metadata is None


def test_kinesis_stream_event():
event = KinesisStreamEvent(load_event("kinesisStreamEvent.json"))

Expand Down