From b4a0d9838d1fb6e884eb4e29044886b2cdc1e196 Mon Sep 17 00:00:00 2001 From: Becca-Saka Date: Fri, 23 Aug 2024 14:47:08 +0100 Subject: [PATCH] optimized modal cron --- backend/main.py | 5 ++--- backend/utils/notifications.py | 32 +++++++++++++++++++++++++++- backend/utils/other/notifications.py | 24 +++++++++------------ 3 files changed, 43 insertions(+), 18 deletions(-) diff --git a/backend/main.py b/backend/main.py index 744310a03a..4a6a64dba9 100644 --- a/backend/main.py +++ b/backend/main.py @@ -1,4 +1,3 @@ -import asyncio import json import os @@ -64,6 +63,6 @@ def api(): os.makedirs(path) -@modal_app.function(image=image,schedule=Cron('15 * * * *')) +@modal_app.function(image=image,schedule=Cron('* * * * *')) async def start_job(): - asyncio.run(start_cron_job()) + await start_cron_job() diff --git a/backend/utils/notifications.py b/backend/utils/notifications.py index eaf39b103f..d8e3662246 100644 --- a/backend/utils/notifications.py +++ b/backend/utils/notifications.py @@ -1,4 +1,6 @@ +import asyncio from firebase_admin import messaging +import math def send_notification(token: str, title: str, body: str, data: dict = None): print('send_notification', token, title, body, data) @@ -8,8 +10,36 @@ def send_notification(token: str, title: str, body: str, data: dict = None): if data: message.data = data - try: + try: response = messaging.send(message) print("Successfully sent message:", response) except Exception as e: print("Error sending message:", e) + + +async def send_bulk_notification(user_tokens: list, title: str, body: str): + try: + batch_size = 500 + num_batches = math.ceil(len(user_tokens) / batch_size) + + def send_batch(batch_users): + messages = [ + messaging.Message( + notification=messaging.Notification(title=title, body=body), + token=token + ) for token in batch_users + ] + return messaging.send_all(messages) + + tasks = [] + for i in range(num_batches): + start = i * batch_size + end = start + batch_size + batch_users = user_tokens[start:end] + task = asyncio.to_thread(send_batch, batch_users) + tasks.append(task) + + await asyncio.gather(*tasks) + + except Exception as e: + print("Error sending message:", e) \ No newline at end of file diff --git a/backend/utils/other/notifications.py b/backend/utils/other/notifications.py index ea8936da7e..821ac97ff0 100644 --- a/backend/utils/other/notifications.py +++ b/backend/utils/other/notifications.py @@ -5,7 +5,7 @@ import pytz import database.notifications as notification_db -from utils.notifications import send_notification +from utils.notifications import send_bulk_notification async def start_cron_job(): @@ -23,8 +23,14 @@ async def send_daily_notification(): morning_target_time = "08:00" daily_summary_target_time = "22:00" - await _send_notification_for_time(morning_target_time, morning_alert_title, morning_alert_body) - await _send_notification_for_time(daily_summary_target_time, daily_summary_title, daily_summary_body) + morning_task = asyncio.create_task( + _send_notification_for_time(morning_target_time, morning_alert_title, morning_alert_body) + ) + evening_task = asyncio.create_task( + _send_notification_for_time(daily_summary_target_time, daily_summary_title, daily_summary_body) + ) + + await asyncio.gather(morning_task, evening_task) except Exception as e: print(e) @@ -34,7 +40,7 @@ async def send_daily_notification(): async def _send_notification_for_time(target_time: str, title: str, body: str): user_in_time_zone = await _get_users_in_timezone(target_time) - await _send_bulk_notification(user_in_time_zone, title, body) + await send_bulk_notification(user_in_time_zone, title, body) return user_in_time_zone @@ -51,13 +57,3 @@ def _get_timezones_at_time(target_time): if current_time == target_time: target_timezones.append(tz_name) return target_timezones - - -async def _send_bulk_notification(users: list, title: str, body: str): - loop = asyncio.get_running_loop() - with concurrent.futures.ThreadPoolExecutor() as pool: - tasks = [ - loop.run_in_executor(pool, send_notification, token, title, body) - for token in users - ] - await asyncio.gather(*tasks)