Description
Is this related to an existing feature request or issue?
No response
Which AWS Lambda Powertools utility does this relate to?
Batch processing
Summary
A common pain and friction point in developing Event-Driven Architectures using AWS Serverless managed services is the existing service limit on message size which generally equates to 256KB for messages passing through any of the messaging services - SQS, SNS, EventBridge, [insert others here].
In an enterprise integration landscape, 256KB is a challenging size. It requires the building of store-and-forward systems where the inbound message is stored in an S3 bucket (or another storage mechanism) and then pushing meta-data through the messaging service to enable processing by consumers, which all need to retrieve the actual message using the metadata before processing.
Use case
System Integration Use-case where data is submitted to an integration service in AWS which uses EventBridge to route event data to different destinations. API gateway to Lambda proxy integration allows up to 6MB of payload to be processed which is way larger then 256KB limit of event bridge.
In this scenario the Lambda behind the API is required to store the large payload first in S3 and then push meta-data for routing through EventBridge. Consumers then need to read the original large message and process it.
Proposal
Similar to the idempotency Utility, an abstract Persistence Class should be created to allow for the storing and retrieval of a message into an AWS storage Service (defaulting to S3 seems sensible).
Build out a message client handler like the sqs-extended-client-lib (which is based on the AWS Java implementation for large message sending to SQS and appears abandoned on GitHub) for storing, creating meta-data for forwarding through the messaging service.
Like idempotency would need to consider JMESPath for extracting meta-data or a mechanism for building the required message structure for submitting to AWS message service (SQS, SNS, Eventbridge, etc.).
Feels like a nice utility to cover more than just SQS but also others - ideally should start with the most common use case, and SQS seems logical in this regard since there are well-known implementations that already exist and match the existing user experience from the Java utility as a starting point seems reasonable.
Would also integrate the capability of detecting the large message events within the existing Powertools batch processing utilities so that the retrieval of the large message can be done as part of the batch processing utilities when they are used.
Need to provide a stand-alone mechanism for retrieving the large messages for customers who wish to partially adopt Lambda powertools so there are pathways for everyone to gain advantage from this feature.
Producer:
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.event_handler import APIGatewayRestResolver
from aws_lambda_powertools.utilities.message_handler import SQSMessageClient, S3MessagePersistenceLayer, MessageClientConfig
from aws_lambda_powertools.logging import correlation_paths
from aws_lambda_powertools.utilities.typing import LambdaContext
tracer = Tracer()
logger = Logger()
app = APIGatewayRestResolver()
config = MessageClientConfig(
message_key_jmespath="messageId",
use_local_cache=True,
)
persistence_layer = S3PersistenceLayer(bucket=os.environ.get("MessageBucket"))
sqs_client = SQSMessageClient(persistence_store=persistence_layer, config=config)
@app.post("/event")
@tracer.capture_method
def post_event():
payload = app.current_event.json_body
return sqs_client.send_message(queue=os.environ.get("MyMessageQueue"), message=payload)
# You can continue to use other utilities just as before
@logger.inject_lambda_context(correlation_id_path=correlation_paths.API_GATEWAY_REST)
@tracer.capture_lambda_handler
def lambda_handler(event: dict, context: LambdaContext) -> dict:
return app.resolve(event, context)
Consumer:
import json
from aws_lambda_powertools import Logger, Tracer
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext
processor = BatchProcessor(event_type=EventType.SQS)
tracer = Tracer()
logger = Logger()
@tracer.capture_method
def record_handler(record: SQSRecord):
payload: str = record.body
if payload:
item: dict = json.loads(payload)
...
@logger.inject_lambda_context
@tracer.capture_lambda_handler
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context: LambdaContext):
return processor.response()
Out of scope
Initially a single messaging service should be targetted but consideration of other Messaging services (SNS, EventBridge, Kinesis, etc) should be considered through design to enable this feature on all batch processing utilities in the future.
Potential challenges
A customisable method of taking the large message and creating the meta-data for the messaging service with data detailing how to retrieve the large message is needed. JMESPath or JSON path can be considered for simple use cases but should also provide a custom function implementation since not every use case will be so straightforward.
Need to also consider how the consumer would know where to access the large message data given it will be in a different Lambda function and potentially without knowledge of the storage mechanism - if this could be embedded in meta-data for Powertools to determine from the consumer side it would lessen boilerplate considerations.
Dependencies and Integrations
Batch Utilities would need to understand how to retrieve large messages when this utility is in use. Having the retrieval as an automated mechanism removes a lot of boilerplate code from existing solutions.
Alternative solutions
[sqs-extended-client-lib](https://github.com/timothymugayi/boto3-sqs-extended-client-lib) is a consideration for SQS only and appears to be abandoned (no changes in over 14 months).
Acknowledgment
- This feature request meets Lambda Powertools Tenets
- Should this be considered in other Lambda Powertools languages? i.e. Java, TypeScript
Metadata
Metadata
Assignees
Labels
Type
Projects
Status