Skip to content

feat(nested-event-sources): Generic unwrapping of event source data #4069

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 43 commits into
base: v2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
541e9cc
testing poc
seshubaws Jan 24, 2024
630bac9
Merge branch 'aws-powertools:develop' into nested_event_sources
seshubaws Jan 24, 2024
58ffa89
Merge branch 'aws-powertools:develop' into nested_event_sources
seshubaws Feb 1, 2024
0e76edb
Added sample nested test events, parent class for unwrapping, and cus…
seshubaws Mar 26, 2024
c46c544
Extended EventWrapper class in some events
seshubaws Mar 26, 2024
a0f8bc7
Added more test events, tried to utilize schemas but wasn't working
seshubaws Apr 3, 2024
d406f43
Merging upstream develop into local branch
seshubaws Apr 4, 2024
20d55c3
Merge branch 'develop' into nested_event_sources
seshubaws Apr 4, 2024
11ee8d6
Removed code for schemas
seshubaws Apr 4, 2024
5fa798d
Merge branch 'develop' into nested_event_sources
seshubaws Apr 4, 2024
26188b2
Merge branch 'develop' into nested_event_sources
seshubaws Apr 9, 2024
980041f
Added decode for first event and error handling
seshubaws Apr 9, 2024
5376073
Merge branch 'develop' into nested_event_sources
seshubaws Apr 9, 2024
7b09126
Merge branch 'develop' into nested_event_sources
seshubaws Apr 12, 2024
432d8a6
Fixed EB event unwrapping
seshubaws Apr 12, 2024
31a7a09
Merge branch 'develop' into nested_event_sources
seshubaws Apr 12, 2024
02935e0
Fixed triple nested to not use iterator
seshubaws Apr 12, 2024
a6c7a34
Merge branch 'develop' into nested_event_sources
seshubaws Apr 17, 2024
2fb67c3
Merge branch 'develop' into nested_event_sources
seshubaws Apr 19, 2024
ab594f1
Adding unit tests
seshubaws Apr 20, 2024
fb0be61
Added unit tests
seshubaws Apr 20, 2024
ee16fc6
fix linting
seshubaws Apr 20, 2024
58dd2ea
Merge branch 'develop' into nested_event_sources
seshubaws Apr 22, 2024
6a0676c
Fixing unit tests
seshubaws Apr 23, 2024
c6b6a10
Merge branch 'develop' into nested_event_sources
seshubaws Apr 26, 2024
7fb170e
Updating tests
seshubaws Apr 26, 2024
1066438
Fix some linting
seshubaws Apr 26, 2024
659f989
Merge branch 'develop' into nested_event_sources
seshubaws Apr 29, 2024
ca50ac0
Merge branch 'develop' into nested_event_sources
seshubaws May 2, 2024
162be0b
Merge branch 'develop' into nested_event_sources
seshubaws May 2, 2024
d67f1f1
Merge branch 'develop' into nested_event_sources
seshubaws May 3, 2024
96cc71e
Merge branch 'develop' into nested_event_sources
seshubaws May 6, 2024
98998b2
Merge branch 'develop' into nested_event_sources
seshubaws May 8, 2024
4236ca2
Merge branch 'develop' into nested_event_sources
seshubaws May 8, 2024
2e7839d
Merge branch 'develop' into nested_event_sources
seshubaws May 9, 2024
3f5292d
Merge branch 'develop' into nested_event_sources
seshubaws May 9, 2024
ef35f02
Merge branch 'develop' into nested_event_sources
seshubaws May 13, 2024
64bbe9f
Merge branch 'develop' into nested_event_sources
seshubaws May 13, 2024
47ba533
Merge branch 'develop' into nested_event_sources
seshubaws May 17, 2024
0fa8f4f
Merge branch 'develop' into nested_event_sources
seshubaws May 21, 2024
2ef3aec
Merge branch 'develop' into nested_event_sources
seshubaws May 30, 2024
123e31f
Merge branch 'develop' into nested_event_sources
seshubaws Jun 25, 2024
6c5095b
Merge branch 'develop' into nested_event_sources
seshubaws Jul 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Added sample nested test events, parent class for unwrapping, and cus…
…tom logic for unwrapping some events
  • Loading branch information
seshubaws committed Mar 26, 2024
commit 0e76edbb99bd91bb9b97f96c426841e71c40bcf2
31 changes: 19 additions & 12 deletions aws_lambda_powertools/utilities/data_classes/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,28 +94,35 @@ def raw_event(self) -> Dict[str, Any]:
return self._data

class EventWrapper(DictWrapper):

NestedEvent = TypeVar("NestedEvent", bound=DictWrapper)
@property

def __init__(self, data: Dict[str, Any], json_deserializer: Optional[Callable] = None):
"""
Parameters
----------
data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize `str`, `bytes`, `bytearray`
containing a JSON document to a Python `obj`,
by default json.loads
"""
super().__init__(data, json_deserializer)

def nested_event_contents(self):
for record in self["Records"]:
yield record["body"]
body = record['body']
yield body


# @property
def decode_nested_events(self, nested_event_class: Type[NestedEvent], nested_event_content_deserializer = None):
if nested_event_content_deserializer is None:
nested_event_content_deserializer = self._json_deserializer

for content in self.nested_event_contents:
for content in self.nested_event_contents():
yield nested_event_class(nested_event_content_deserializer(content))

# @property
def decode_nested_event(self, nested_event_class, nested_event_content_deserializer = None):
if nested_event_content_deserializer is None:
nested_event_content_deserializer = self._json_deserializer

for content in self.nested_event_contents:
return nested_event_class(nested_event_content_deserializer(content))

class BaseProxyEvent(DictWrapper):
@property
def headers(self) -> Dict[str, str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,9 @@ def detail(self) -> Dict[str, Any]:
def replay_name(self) -> Optional[str]:
"""Identifies whether the event is being replayed and what is the name of the replay."""
return self["replay-name"]

def nested_event_contents(self):
for record in self["detail"]:
print('record', record, type(record))
# print('body:', body, type(body))
yield record
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from typing_extensions import Literal

from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper, EventWrapper


@dataclass(repr=False, order=False, frozen=True)
Expand Down Expand Up @@ -274,7 +274,7 @@ def build_data_transformation_response(
)


class KinesisFirehoseEvent(DictWrapper):
class KinesisFirehoseEvent(EventWrapper):
"""Kinesis Data Firehose event

Documentation:
Expand Down Expand Up @@ -306,3 +306,10 @@ def region(self) -> str:
def records(self) -> Iterator[KinesisFirehoseRecord]:
for record in self["records"]:
yield KinesisFirehoseRecord(data=record, json_deserializer=self._json_deserializer)

def nested_event_contents(self):
for record in self["records"]:
# print('record', record, type(record))
# body = record[]
# print('body:', body, type(body))
yield record
116 changes: 69 additions & 47 deletions aws_lambda_powertools/utilities/data_classes/poc.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,71 @@
from typing import Iterator
import json
from aws_lambda_powertools.utilities.data_classes import S3Event, SQSEvent, SNSEvent, SESEvent, EventBridgeEvent
import test_events

from aws_lambda_powertools.utilities.data_classes import S3Event, SQSEvent
# from aws_lambda_powertools.utilities.data_classes.sns_event import SNSMessage
from aws_lambda_powertools.utilities.data_classes import event_source


# @event_source(data_class=SQSEvent)
def lambda_handler(event: SQSEvent, context):
event = SQSEvent(event)
nesteds3event = event.decode_nested_events(S3Event)
for record in event.records: #then how would a for loop work..
nested_event = record.decode_nested_event(S3Event) #for loop must be for same events inside one event
print(nested_event, nested_event)

# event = SQSEvent()
# sns_events = event.decode_nested_events(Iterator[SNSEvent])
# for sns_event in sns_events:
# s3_events = sns_event.decode_nested_events(S3Event)
# for s3_event in s3_events:
# print(s3_event.bucket.name)




event = SQSEvent({
"Records": [
{
"messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78",
"receiptHandle": "MessageReceiptHandle",
"body": {
"Message": "{\"Records\":[{\"eventVersion\":\"2.1\",\"eventSource\":\"aws:s3\",\"awsRegion\":\"us-east-1\",\"eventTime\":\"2023-01-01T00:00:00.000Z\",\"eventName\":\"ObjectCreated:Put\",\"userIdentity\":{\"principalId\":\"AWS:123456789012:example-user\"},\"requestParameters\":{\"sourceIPAddress\":\"127.0.0.1\"},\"responseElements\":{\"x-amz-request-id\":\"example-request-id\",\"x-amz-id-2\":\"example-id\"},\"s3\":{\"s3SchemaVersion\":\"1.0\",\"configurationId\":\"testConfigRule\",\"bucket\":{\"name\":\"example-bucket\",\"ownerIdentity\":{\"principalId\":\"EXAMPLE\"},\"arn\":\"arn:aws:s3:::example-bucket\"},\"object\":{\"key\":\"example-object.txt\",\"size\":1024,\"eTag\":\"example-tag\",\"versionId\":\"1\",\"sequencer\":\"example-sequencer\"}}}]}"
},
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1523232000000",
"SenderId": "123456789012",
"ApproximateFirstReceiveTimestamp": "1523232000001"
},
"messageAttributes": {},
"md5OfBody": "7b270e59b47ff90a553787216d55d91d",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:MyQueue",
"awsRegion": "us-east-1"
}
]
})

lambda_handler(event, {})

def lambda_handler_sqs_s3(event: SQSEvent = test_events.sqs_s3_event): # sqs(s3)
sqs_event = SQSEvent(event)
s3_event = sqs_event.decode_nested_events(S3Event)
for rec in s3_event:
print('rec:', rec.bucket_name)
# for sqs_record in sqs_event.records:
# # print('body in main:', sqs_record.body)
# # s3_event = sqs_event.decode_nested_events(Iterator[S3Event]) # is this correct format?
# s3_event = sqs_event.decode_nested_events(S3Event)
# for rec in s3_event:
# print('rec:', rec.bucket_name)
# print(next(s3_event).bucket_name) # use this if not inside a for loop


def lambda_handler_sqs_sns(event: SQSEvent = test_events.sqs_sns_event): # sqs(sns)
sqs_event = SQSEvent(event)
sns_event = sqs_event.decode_nested_events(SNSEvent)
for rec in sns_event:
print('rec:', type(rec), rec.sns_message)


def lambda_handler_sns_s3(event: SNSEvent = test_events.sns_s3_event): # sns(s3)
sns_event = SNSEvent(event)
s3_event = sns_event.decode_nested_events(S3Event)
for rec in s3_event:
print(type(rec))
print('rec:', rec.bucket_name)


def lambda_handler_sqs_s3_multi(event: SQSEvent = test_events.sqs_s3_multi_event): # sqs(s3, s3)
sqs_event = SQSEvent(event)
s3_event = sqs_event.decode_nested_events(S3Event)
for rec in s3_event:
print('rec:', rec.bucket_name)


def lambda_handler_sqs_sns_s3(event: SQSEvent = test_events.sqs_sns_s3_event): # sqs(sns(s3))
sqs_event = SQSEvent(event)
sns_event = sqs_event.decode_nested_events(SNSEvent)
for rec in sns_event:
print('rec:', type(rec), rec.sns_message)
# s3_event = sns_event.decode_nested_events(S3Event)


def lambda_handler_sns_ses(event: SNSEvent = test_events.sns_ses_event): # sns(ses)
sns_event = SNSEvent(event)
ses_event = sns_event.decode_nested_events(SESEvent)
for rec in ses_event:
print(type(rec))
print('rec:', rec.get("mail").get('source')) #but can't do rec.mail bc no "Records" key..

def lambda_handler_eb_s3(event: EventBridgeEvent = test_events.eb_s3_event): # eventbridge(s3)
eb_event = EventBridgeEvent(event)
s3_event = eb_event.decode_nested_events(S3Event)
for rec in s3_event:
print(type(rec))
print('rec:', rec)

lambda_handler_sqs_s3(test_events.sqs_s3_event)
# lambda_handler_sqs_sns(test_events.sqs_sns_event) #not working bc sns doesn't have Records key
lambda_handler_sns_s3(test_events.sns_s3_event)
lambda_handler_sqs_s3_multi(test_events.sqs_s3_multi_event)
# lambda_handler_sqs_sns_s3(test_events.sqs_sns_s3_event) #not working bc sns doesn't have Records key
lambda_handler_sns_ses(test_events.sns_ses_event)
# lambda_handler_eb_s3(test_events.eb_s3_event) #EB returning a str, not dict
11 changes: 9 additions & 2 deletions aws_lambda_powertools/utilities/data_classes/ses_event.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Iterator, List, Optional

from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper, EventWrapper


class SESMailHeader(DictWrapper):
Expand Down Expand Up @@ -233,7 +233,7 @@ def ses(self) -> SESMessage:
return SESMessage(self._data)


class SESEvent(DictWrapper):
class SESEvent(EventWrapper):
"""Amazon SES to receive message event trigger

NOTE: There is a 30-second timeout on RequestResponse invocations.
Expand All @@ -260,3 +260,10 @@ def mail(self) -> SESMail:
@property
def receipt(self) -> SESReceipt:
return self.record.ses.receipt

def nested_event_contents(self):
for record in self["Records"]:
# print('record', record, type(record))
body = record['ses']
# print('body:', body, type(body))
yield body
7 changes: 7 additions & 0 deletions aws_lambda_powertools/utilities/data_classes/sns_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,10 @@ def record(self) -> SNSEventRecord:
def sns_message(self) -> str:
"""Return the message for the first sns event record"""
return self.record.sns.message

def nested_event_contents(self):
for record in self["Records"]:
# print('record', record, type(record))
body = record['Sns']['Message']
# print('body:', body, type(body))
yield body
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,5 @@ class SQSEvent(EventWrapper):

@property
def records(self) -> Iterator[SQSRecord]:
print("in here!!")
for record in self["Records"]:
yield SQSRecord(data=record, json_deserializer=self._json_deserializer)
Loading