Skip to content

Commit 1d7ec10

Browse files
fix: improve email queue and send email to group admins
1 parent 9c441d1 commit 1d7ec10

File tree

5 files changed

+100
-45
lines changed

5 files changed

+100
-45
lines changed

routers/biocommons_register.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from biocommons.bundles import BUNDLES, BiocommonsBundle
1111
from biocommons.default import DEFAULT_PLATFORMS
1212
from config import Settings, get_settings
13-
from db.models import BiocommonsUser
13+
from db.models import BiocommonsGroup, BiocommonsUser
1414
from db.setup import get_db_session
1515
from routers.errors import RegistrationRoute
1616
from schemas.biocommons import Auth0UserData, BiocommonsRegisterData
@@ -105,15 +105,26 @@ async def register_biocommons_user(
105105
session=db_session,
106106
auth0_client=auth0_client
107107
)
108-
# Queue approval email for manual bundles
108+
# Queue approval email(s) for manual bundles
109109
if bundle is not None and not bundle.group_auto_approve:
110+
admin_emails: set[str] = set()
111+
group_record = BiocommonsGroup.get_by_id(bundle.group_id.value, db_session)
112+
if group_record is not None:
113+
admin_emails = group_record.get_admins(auth0_client=auth0_client)
114+
if not admin_emails:
115+
logger.warning(
116+
"No admins found for bundle %s, notifying ops inbox",
117+
bundle.group_id.value,
118+
)
119+
admin_emails.add("aai-dev@biocommons.org.au")
110120
subject, body_html = compose_biocommons_registration_email(registration, settings)
111-
enqueue_email(
112-
db_session,
113-
to_address="aai-dev@biocommons.org.au",
114-
subject=subject,
115-
body_html=body_html,
116-
)
121+
for email in admin_emails:
122+
enqueue_email(
123+
db_session,
124+
to_address=email,
125+
subject=subject,
126+
body_html=body_html,
127+
)
117128

118129
db_session.commit()
119130

scheduled_tasks/tasks.py

Lines changed: 52 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import re
22
from datetime import datetime, timezone
3+
from uuid import UUID
34

45
from loguru import logger
56
from pydantic import BaseModel
@@ -31,6 +32,7 @@
3132

3233
EMAIL_QUEUE_BATCH_SIZE = 25
3334
EMAIL_RETRY_DELAY_SECONDS = 300
35+
EMAIL_JOB_ID_PREFIX = "email_notification_"
3436

3537

3638
def _ensure_user_from_auth0(session: Session, user_data: Auth0UserData) -> tuple[BiocommonsUser, bool, bool]:
@@ -72,7 +74,7 @@ async def process_email_queue(
7274
retry_delay_seconds: int = EMAIL_RETRY_DELAY_SECONDS,
7375
) -> int:
7476
"""
75-
Send pending email notifications stored in the database.
77+
Schedule pending email notifications for delivery.
7678
"""
7779
logger.info("Processing email notification queue")
7880
session = next(get_db_session())
@@ -96,34 +98,60 @@ async def process_email_queue(
9698
if not notifications:
9799
logger.info("No email notifications ready for delivery")
98100
return 0
101+
scheduled = 0
99102
for notification in notifications:
100103
notification.mark_sending()
101104
session.add(notification)
105+
session.flush()
106+
job_id = f"{EMAIL_JOB_ID_PREFIX}{notification.id}"
107+
SCHEDULER.add_job(
108+
send_email_notification,
109+
args=[notification.id],
110+
id=job_id,
111+
replace_existing=True,
112+
kwargs={"retry_delay_seconds": retry_delay_seconds},
113+
)
114+
scheduled += 1
102115
session.commit()
116+
logger.info("Queued %d email notifications for delivery", scheduled)
117+
return scheduled
118+
finally:
119+
session.close()
120+
121+
122+
async def send_email_notification(
123+
notification_id: UUID,
124+
retry_delay_seconds: int = EMAIL_RETRY_DELAY_SECONDS,
125+
) -> bool:
126+
"""
127+
Deliver a single queued email notification.
128+
"""
129+
session = next(get_db_session())
130+
try:
131+
notification = session.get(EmailNotification, notification_id)
132+
if notification is None:
133+
logger.warning("Email notification %s not found", notification_id)
134+
return False
103135
email_service = get_email_service()
104-
sent_count = 0
105-
for notification in notifications:
106-
try:
107-
email_service.send(
108-
notification.to_address,
109-
notification.subject,
110-
notification.body_html,
111-
)
112-
except Exception as exc: # noqa: BLE001
113-
logger.warning(
114-
"Failed to send email %s: %s", notification.id, exc
115-
)
116-
session.refresh(notification)
117-
notification.mark_failed(str(exc), retry_delay_seconds)
118-
session.add(notification)
119-
session.commit()
120-
else:
121-
session.refresh(notification)
122-
notification.mark_sent()
123-
session.add(notification)
124-
session.commit()
125-
sent_count += 1
126-
return sent_count
136+
try:
137+
email_service.send(
138+
notification.to_address,
139+
notification.subject,
140+
notification.body_html,
141+
)
142+
except Exception as exc: # noqa: BLE001
143+
logger.warning(
144+
"Failed to send email %s: %s", notification.id, exc
145+
)
146+
notification.mark_failed(str(exc), retry_delay_seconds)
147+
session.add(notification)
148+
session.commit()
149+
return False
150+
else:
151+
notification.mark_sent()
152+
session.add(notification)
153+
session.commit()
154+
return True
127155
finally:
128156
session.close()
129157

services/email_queue.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ def enqueue_email(
1515
) -> EmailNotification:
1616
"""
1717
Persist an outbound email so the scheduler can deliver it later.
18+
19+
Note: the caller is responsible for committing the session after enqueueing.
1820
"""
1921
notification = EmailNotification(
2022
to_address=to_address,

tests/scheduled_tasks/test_tasks.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
_get_group_membership_including_deleted,
2222
populate_db_groups,
2323
process_email_queue,
24+
send_email_notification,
2425
sync_auth0_roles,
2526
sync_auth0_users,
2627
sync_group_user_roles,
@@ -94,7 +95,7 @@ async def test_sync_auth0_users_creates_and_soft_deletes(mocker, test_db_session
9495
mocker.patch.object(test_db_session, "close", return_value=None)
9596
mocker.patch(
9697
"scheduled_tasks.tasks.get_db_session",
97-
return_value=(test_db_session for _ in range(1)),
98+
return_value=iter(lambda: test_db_session, object()),
9899
)
99100

100101
await sync_auth0_users()
@@ -127,7 +128,7 @@ async def test_update_auth0_user_updates_existing(test_db_session, mocker, persi
127128
mocker.patch.object(test_db_session, "close", return_value=None)
128129
mocker.patch(
129130
"scheduled_tasks.tasks.get_db_session",
130-
return_value=(test_db_session for _ in range(1)),
131+
return_value=iter(lambda: test_db_session, object()),
131132
)
132133

133134
await update_auth0_user(user_data=user_data)
@@ -145,7 +146,7 @@ async def test_update_auth0_user_creates_when_missing(test_db_session, mocker):
145146
mocker.patch.object(test_db_session, "close", return_value=None)
146147
mocker.patch(
147148
"scheduled_tasks.tasks.get_db_session",
148-
return_value=(test_db_session for _ in range(1)),
149+
return_value=iter(lambda: test_db_session, object()),
149150
)
150151

151152
result = await update_auth0_user(user_data=user_data)
@@ -263,7 +264,7 @@ async def test_sync_auth0_roles_updates_and_soft_deletes(mocker, test_db_session
263264
mocker.patch.object(test_db_session, "close", return_value=None)
264265
mocker.patch(
265266
"scheduled_tasks.tasks.get_db_session",
266-
return_value=(test_db_session for _ in range(1)),
267+
return_value=iter(lambda: test_db_session, object()),
267268
)
268269

269270
await sync_auth0_roles()
@@ -356,7 +357,7 @@ async def test_sync_auth0_group_roles_syncs_assignments(mocker, test_db_session,
356357
mocker.patch.object(test_db_session, "close", return_value=None)
357358
mocker.patch(
358359
"scheduled_tasks.tasks.get_db_session",
359-
return_value=(test_db_session for _ in range(1)),
360+
return_value=iter(lambda: test_db_session, None),
360361
)
361362

362363
await sync_group_user_roles()
@@ -413,7 +414,7 @@ class TestGroups(Enum):
413414
mocker.patch("scheduled_tasks.tasks.get_settings", return_value=mock_settings)
414415
mocker.patch(
415416
"scheduled_tasks.tasks.get_db_session",
416-
return_value=(test_db_session for _ in range(1)),
417+
return_value=iter(lambda: test_db_session, None),
417418
)
418419

419420
BiocommonsGroupFactory.create_sync(group_id=TestGroups.TSI.value)
@@ -560,12 +561,15 @@ async def test_process_email_queue_sends_notifications(test_db_session, mocker):
560561
mocker.patch("scheduled_tasks.tasks.get_email_service", return_value=mock_service)
561562
mocker.patch(
562563
"scheduled_tasks.tasks.get_db_session",
563-
return_value=(test_db_session for _ in range(1)),
564+
return_value=iter(lambda: test_db_session, None),
564565
)
566+
mock_scheduler = mocker.patch("scheduled_tasks.tasks.SCHEDULER.add_job")
565567

566-
sent = await process_email_queue()
568+
scheduled = await process_email_queue()
567569

568-
assert sent == 1
570+
assert scheduled == 1
571+
mock_scheduler.assert_called_once()
572+
await send_email_notification(notification_id)
569573
updated = test_db_session.get(EmailNotification, notification_id)
570574
assert updated.status == EmailStatusEnum.SENT
571575
assert mock_service.send.called
@@ -587,12 +591,15 @@ async def test_process_email_queue_retries_failed_notifications(test_db_session,
587591
mocker.patch("scheduled_tasks.tasks.get_email_service", return_value=mock_service)
588592
mocker.patch(
589593
"scheduled_tasks.tasks.get_db_session",
590-
return_value=(test_db_session for _ in range(1)),
594+
return_value=iter(lambda: test_db_session, None),
591595
)
596+
mock_scheduler = mocker.patch("scheduled_tasks.tasks.SCHEDULER.add_job")
592597

593-
sent = await process_email_queue()
598+
scheduled = await process_email_queue()
594599

595-
assert sent == 0
600+
assert scheduled == 1
601+
mock_scheduler.assert_called_once()
602+
await send_email_notification(notification_id)
596603
updated = test_db_session.get(EmailNotification, notification_id)
597604
assert updated.status == EmailStatusEnum.FAILED
598605
assert updated.last_error is not None

tests/test_biocommons_register.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from tests.datagen import (
1313
Auth0UserDataFactory,
1414
BiocommonsRegistrationRequestFactory,
15+
RoleUserDataFactory,
1516
)
1617
from tests.db.datagen import Auth0RoleFactory, BiocommonsGroupFactory, PlatformFactory
1718

@@ -258,6 +259,12 @@ def test_successful_biocommons_registration_endpoint(
258259
"""Test successful biocommons registration via HTTP endpoint"""
259260
auth0_data = Auth0UserDataFactory.build()
260261
mock_auth0_client.create_user.return_value = auth0_data
262+
admin_stub = RoleUserDataFactory.build(email="tsi.admin@example.com")
263+
mock_auth0_client.get_all_role_users.return_value = [admin_stub]
264+
mock_auth0_client.get_user.return_value = Auth0UserDataFactory.build(
265+
user_id=admin_stub.user_id,
266+
email=admin_stub.email,
267+
)
261268

262269
registration_data = {
263270
"first_name": "Test",
@@ -291,7 +298,7 @@ def test_successful_biocommons_registration_endpoint(
291298
queued_emails = test_db_session.exec(select(EmailNotification)).all()
292299
assert len(queued_emails) == 1
293300
email = queued_emails[0]
294-
assert email.to_address == "aai-dev@biocommons.org.au"
301+
assert email.to_address == admin_stub.email
295302
assert email.status == EmailStatusEnum.PENDING
296303

297304

0 commit comments

Comments
 (0)