diff --git a/fig/backends/aws_sqs/__init__.py b/fig/backends/aws_sqs/__init__.py index e755ca1..c8cf5e9 100644 --- a/fig/backends/aws_sqs/__init__.py +++ b/fig/backends/aws_sqs/__init__.py @@ -1,4 +1,5 @@ import json +from functools import lru_cache import boto3 from botocore.exceptions import ClientError from ...config import config @@ -8,18 +9,41 @@ class Submitter(): def __init__(self): aws_region = config.get('aws_sqs', 'region') - sqs_queue_name = config.get('aws_sqs', 'sqs_queue_name') - log.debug("Attempting to connect to AWS (region %s) SQS: %s", aws_region, sqs_queue_name) + log.debug("Attempting to connect to AWS (region %s) SQS: %s", aws_region, self.sqs_queue_name) try: aws_client = boto3.resource('sqs', region_name=aws_region) - self.queue = aws_client.get_queue_by_name(QueueName=sqs_queue_name) + self.queue = aws_client.get_queue_by_name(QueueName=self.sqs_queue_name) except ClientError: # pylint: disable=W0703 - log.exception("Cannot configure AWS SQS Queue: %s in %s", aws_region, sqs_queue_name) + log.exception("Cannot configure AWS SQS Queue: %s in %s", aws_region, self.sqs_queue_name) self.queue = None raise def submit(self, event): - self.queue.send_message(MessageBody=json.dumps(event.original_event)) + oe = event.original_event + if self.is_fifo: + feed_id = getattr(oe, 'feed_id') if hasattr(oe, 'feed_id') else 0 + self.queue.send_message( + MessageGroupId="fig/%s/%s" % (self.app_id, feed_id), + MessageDeduplicationId=str(oe.offset), + MessageBody=json.dumps(oe) + ) + else: + self.queue.send_message(MessageBody=json.dumps(oe)) + + @property + @lru_cache + def is_fifo(self): + return self.sqs_queue_name.endswith('.fifo') + + @property + @lru_cache + def sqs_queue_name(self): + return config.get('aws_sqs', 'sqs_queue_name') + + @property + @lru_cache + def app_id(self): + return config.get('falcon', 'application_id') class Runtime():