Skip to content

Commit

Permalink
Add support for SQS FIFO queues
Browse files Browse the repository at this point in the history
  • Loading branch information
isimluk committed Sep 8, 2022
1 parent 20a38a9 commit 15c2977
Showing 1 changed file with 29 additions and 5 deletions.
34 changes: 29 additions & 5 deletions fig/backends/aws_sqs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
from functools import lru_cache
import boto3
from botocore.exceptions import ClientError
from ...config import config
Expand All @@ -8,18 +9,41 @@
class Submitter():
def __init__(self):
aws_region = config.get('aws_sqs', 'region')
sqs_queue_name = config.get('aws_sqs', 'sqs_queue_name')
log.debug("Attempting to connect to AWS (region %s) SQS: %s", aws_region, sqs_queue_name)
log.debug("Attempting to connect to AWS (region %s) SQS: %s", aws_region, self.sqs_queue_name)
try:
aws_client = boto3.resource('sqs', region_name=aws_region)
self.queue = aws_client.get_queue_by_name(QueueName=sqs_queue_name)
self.queue = aws_client.get_queue_by_name(QueueName=self.sqs_queue_name)
except ClientError: # pylint: disable=W0703
log.exception("Cannot configure AWS SQS Queue: %s in %s", aws_region, sqs_queue_name)
log.exception("Cannot configure AWS SQS Queue: %s in %s", aws_region, self.sqs_queue_name)
self.queue = None
raise

def submit(self, event):
self.queue.send_message(MessageBody=json.dumps(event.original_event))
oe = event.original_event
if self.is_fifo:
feed_id = getattr(oe, 'feed_id') if hasattr(oe, 'feed_id') else 0
self.queue.send_message(
MessageGroupId="fig/%s/%s" % (self.app_id, feed_id),
MessageDeduplicationId=str(oe.offset),
MessageBody=json.dumps(oe)
)
else:
self.queue.send_message(MessageBody=json.dumps(oe))

@property
@lru_cache
def is_fifo(self):
return self.sqs_queue_name.endswith('.fifo')

@property
@lru_cache
def sqs_queue_name(self):
return config.get('aws_sqs', 'sqs_queue_name')

@property
@lru_cache
def app_id(self):
return config.get('falcon', 'application_id')


class Runtime():
Expand Down

0 comments on commit 15c2977

Please sign in to comment.