Skip to content

Commit

Permalink
add logs and error handling for importer
Browse files Browse the repository at this point in the history
  • Loading branch information
zganger committed Jul 12, 2024
1 parent b643ee9 commit b373720
Showing 1 changed file with 39 additions and 31 deletions.
70 changes: 39 additions & 31 deletions backend/importer/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,41 +31,49 @@ def __init__(self, queue_name: str, region: str = "us-east-1"):
self.loader_map: dict[str, Loader] = {}

def run(self):
self.logger.info("Starting import loop")
while True:
resp = self.sqs_client.receive_message(
QueueUrl=self.sqs_queue_url,
# retrieve one message at a time - we could up this
# and parallelize but no point until way more files.
MaxNumberOfMessages=1,
# 10 minutes to process message before it becomes
# visible for another consumer.
VisibilityTimeout=600,
)
# if no messages found, wait 5m for next poll
if len(resp["Messages"]) == 0:
sleep(600)
continue
try:
self.logger.info("Polling for scraper event messages")
resp = self.sqs_client.receive_message(
QueueUrl=self.sqs_queue_url,
# retrieve one message at a time - we could up this
# and parallelize but no point until way more files.
MaxNumberOfMessages=1,
# 10 minutes to process message before it becomes
# visible for another consumer.
VisibilityTimeout=600,
)
# if no messages found, wait 5m for next poll
if len(resp["Messages"]) == 0:
sleep(600)
continue

for message in resp["Messages"]:
sqs_body = ujson.loads(message["Body"])
# this comes through as a list, but we expect one object
for record in sqs_body["Records"]:
bucket_name = record["s3"]["bucket"]["name"]
key = record["s3"]["object"]["key"]
with BytesIO() as fileobj:
self.s3_client.download_fileobj(
bucket_name, key, fileobj)
fileobj.seek(0)
content = fileobj.read()
_ = content # for linting.
for message in resp["Messages"]:
sqs_body = ujson.loads(message["Body"])
# this comes through as a list, but we expect one object
for record in sqs_body["Records"]:
bucket_name = record["s3"]["bucket"]["name"]
key = record["s3"]["object"]["key"]
with BytesIO() as fileobj:
self.s3_client.download_fileobj(
bucket_name, key, fileobj)
fileobj.seek(0)
content = fileobj.read()
_ = content # for linting.

# TODO: we now have an in-memory copy of s3 file content
# This is where we would run the importer.
# we want a standardized importer class; use like:
# loader = self.get_loader_for_content_type(key)
# loader(content).load()
# TODO: we now have an in-memory copy of s3 file content
# This is where we would run the importer.
# we want a standardized importer class; use like:
# loader = self.get_loader_for_content_type(key)
# loader(content).load()

self.logger.info(f"Imported s3://{bucket_name}/{key}")
self.logger.info(f"Imported s3://{bucket_name}/{key}")
except Exception as e:
self.logger.error(
f"Failed to process scraper events sqs queue: {e}")
sleep(600)
pass

def get_loader_for_content_type(self, s3_key: str) -> Loader:
# s3 keys should be of format /subject/source/time.jsonl
Expand Down

0 comments on commit b373720

Please sign in to comment.