Skip to content

Commit e727497

Browse files
Merge pull request #136 from AustralianBioCommons/fix-robust-email-sending
feat: robust email sending updates
2 parents 0945278 + e213aa8 commit e727497

File tree

5 files changed

+261
-22
lines changed

5 files changed

+261
-22
lines changed

db/models.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import uuid
2-
from datetime import datetime, timedelta, timezone
2+
from datetime import datetime, timezone
33
from logging import getLogger
44
from typing import Optional, Self
55

@@ -910,6 +910,7 @@ class EmailNotification(BaseModel, table=True):
910910
last_error: str | None = Field(default=None, sa_column=Column(String(1024), nullable=True))
911911
send_after: AwareDatetime | None = Field(default=None, sa_type=DateTime(timezone=True))
912912
sent_at: AwareDatetime | None = Field(default=None, sa_type=DateTime(timezone=True))
913+
last_attempt_at: AwareDatetime | None = Field(default=None, sa_type=DateTime(timezone=True))
913914
created_at: AwareDatetime = Field(
914915
default_factory=lambda: datetime.now(timezone.utc),
915916
sa_type=DateTime(timezone=True),
@@ -920,9 +921,12 @@ class EmailNotification(BaseModel, table=True):
920921
)
921922

922923
def mark_sending(self) -> None:
924+
now = datetime.now(timezone.utc)
925+
if self.last_attempt_at is None:
926+
self.last_attempt_at = now
923927
self.status = EmailStatusEnum.SENDING
924928
self.attempts += 1
925-
self.updated_at = datetime.now(timezone.utc)
929+
self.updated_at = now
926930

927931
def mark_sent(self) -> None:
928932
now = datetime.now(timezone.utc)
@@ -931,13 +935,18 @@ def mark_sent(self) -> None:
931935
self.updated_at = now
932936
self.last_error = None
933937

934-
def mark_failed(self, error: str, retry_delay_seconds: int | None = None) -> None:
938+
def mark_failed(self, error: str) -> None:
935939
now = datetime.now(timezone.utc)
936940
self.status = EmailStatusEnum.FAILED
937941
self.last_error = error[:1024]
938942
self.updated_at = now
939-
if retry_delay_seconds:
940-
self.send_after = now + timedelta(seconds=retry_delay_seconds)
943+
self.send_after = None
944+
945+
def schedule_retry(self, error: str, retry_time: datetime) -> None:
946+
self.status = EmailStatusEnum.FAILED
947+
self.last_error = error[:1024]
948+
self.send_after = retry_time
949+
self.updated_at = datetime.now(timezone.utc)
941950

942951

943952
class EmailChangeOtp(BaseModel, table=True):
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
"""email_scheduler_robust
2+
3+
Revision ID: d6a5578732bc
4+
Revises: 9f2d8c1b5d4e
5+
Create Date: 2025-11-24 15:37:19.537530
6+
7+
"""
8+
from typing import Sequence, Union
9+
10+
from alembic import op
11+
import sqlalchemy as sa
12+
import sqlmodel
13+
14+
15+
# revision identifiers, used by Alembic.
16+
revision: str = 'd6a5578732bc'
17+
down_revision: Union[str, None] = '9f2d8c1b5d4e'
18+
branch_labels: Union[str, Sequence[str], None] = None
19+
depends_on: Union[str, Sequence[str], None] = None
20+
21+
22+
def upgrade() -> None:
23+
# ### commands auto generated by Alembic - please adjust! ###
24+
op.add_column('emailnotification', sa.Column('last_attempt_at', sa.DateTime(timezone=True), nullable=True))
25+
# ### end Alembic commands ###
26+
27+
28+
def downgrade() -> None:
29+
# ### commands auto generated by Alembic - please adjust! ###
30+
op.drop_column('emailnotification', 'last_attempt_at')
31+
# ### end Alembic commands ###

scheduled_tasks/email_retry.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import random
2+
from datetime import datetime, timedelta, timezone
3+
4+
from botocore.exceptions import BotoCoreError, ClientError
5+
6+
from db.models import EmailNotification
7+
8+
EMAIL_QUEUE_BATCH_SIZE = 25
9+
EMAIL_MAX_ATTEMPTS = 2
10+
EMAIL_RETRY_DELAY_MIN_SECONDS = 15 * 60
11+
EMAIL_RETRY_DELAY_MAX_SECONDS = 30 * 60
12+
EMAIL_RETRY_WINDOW_SECONDS = 60 * 60
13+
EMAIL_JOB_ID_PREFIX = "email_notification_"
14+
15+
TRANSIENT_CLIENT_ERROR_CODES = {
16+
"Throttling",
17+
"ThrottlingException",
18+
"TooManyRequestsException",
19+
"ServiceUnavailable",
20+
"InternalFailure",
21+
"InternalError",
22+
"RequestThrottled",
23+
}
24+
25+
26+
def _ensure_aware(dt: datetime | None) -> datetime | None:
27+
if dt is None:
28+
return None
29+
if dt.tzinfo is None:
30+
return dt.replace(tzinfo=timezone.utc)
31+
return dt
32+
33+
34+
def retry_deadline(notification: EmailNotification) -> datetime | None:
35+
first_attempt = _ensure_aware(notification.last_attempt_at)
36+
if first_attempt is None:
37+
return None
38+
return first_attempt + timedelta(seconds=EMAIL_RETRY_WINDOW_SECONDS)
39+
40+
41+
def can_schedule_notification(notification: EmailNotification, now: datetime) -> bool:
42+
if notification.attempts >= EMAIL_MAX_ATTEMPTS:
43+
return False
44+
deadline = retry_deadline(notification)
45+
return deadline is None or now < deadline
46+
47+
48+
def is_retryable_email_error(exc: Exception) -> bool:
49+
if isinstance(exc, ClientError):
50+
code = exc.response.get("Error", {}).get("Code")
51+
return code in TRANSIENT_CLIENT_ERROR_CODES
52+
if isinstance(exc, BotoCoreError):
53+
return True
54+
if isinstance(exc, OSError):
55+
return True
56+
return False
57+
58+
59+
def next_retry_delay_seconds() -> int:
60+
return random.randint(EMAIL_RETRY_DELAY_MIN_SECONDS, EMAIL_RETRY_DELAY_MAX_SECONDS)

scheduled_tasks/tasks.py

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import re
2-
from datetime import datetime, timezone
2+
from datetime import datetime, timedelta, timezone
33
from uuid import UUID
44

55
from loguru import logger
66
from pydantic import BaseModel
7-
from sqlalchemy import or_
7+
from sqlalchemy import and_, or_
88
from sqlmodel import Session, select
99

1010
from auth.management import get_management_token
@@ -23,6 +23,16 @@
2323
)
2424
from db.setup import get_db_session
2525
from db.types import GROUP_NAMES, ApprovalStatusEnum, EmailStatusEnum, GroupEnum
26+
from scheduled_tasks.email_retry import (
27+
EMAIL_JOB_ID_PREFIX,
28+
EMAIL_MAX_ATTEMPTS,
29+
EMAIL_QUEUE_BATCH_SIZE,
30+
EMAIL_RETRY_WINDOW_SECONDS,
31+
can_schedule_notification,
32+
is_retryable_email_error,
33+
next_retry_delay_seconds,
34+
retry_deadline,
35+
)
2636
from scheduled_tasks.scheduler import SCHEDULER
2737
from schemas.auth0 import (
2838
GROUP_ROLE_PATTERN,
@@ -31,10 +41,6 @@
3141
)
3242
from schemas.biocommons import Auth0UserData
3343

34-
EMAIL_QUEUE_BATCH_SIZE = 25
35-
EMAIL_RETRY_DELAY_SECONDS = 300
36-
EMAIL_JOB_ID_PREFIX = "email_notification_"
37-
3844

3945
def _ensure_user_from_auth0(session: Session, user_data: Auth0UserData) -> tuple[BiocommonsUser, bool, bool]:
4046
"""
@@ -72,7 +78,6 @@ def _get_group_membership_including_deleted(session: Session, user_id: str, grou
7278

7379
async def process_email_queue(
7480
batch_size: int = EMAIL_QUEUE_BATCH_SIZE,
75-
retry_delay_seconds: int = EMAIL_RETRY_DELAY_SECONDS,
7681
) -> int:
7782
"""
7883
Schedule pending email notifications for delivery.
@@ -84,8 +89,12 @@ async def process_email_queue(
8489
stmt = (
8590
select(EmailNotification)
8691
.where(
87-
EmailNotification.status.in_(
88-
[EmailStatusEnum.PENDING, EmailStatusEnum.FAILED]
92+
or_(
93+
EmailNotification.status == EmailStatusEnum.PENDING,
94+
and_(
95+
EmailNotification.status == EmailStatusEnum.FAILED,
96+
EmailNotification.send_after.is_not(None),
97+
),
8998
),
9099
or_(
91100
EmailNotification.send_after.is_(None),
@@ -101,6 +110,15 @@ async def process_email_queue(
101110
return 0
102111
scheduled = 0
103112
for notification in notifications:
113+
if not can_schedule_notification(notification, now):
114+
logger.info(
115+
"Skipping email %s: retry window exhausted or max attempts reached",
116+
notification.id,
117+
)
118+
notification.status = EmailStatusEnum.FAILED
119+
notification.send_after = None
120+
session.add(notification)
121+
continue
104122
notification.mark_sending()
105123
session.add(notification)
106124
session.flush()
@@ -110,7 +128,6 @@ async def process_email_queue(
110128
args=[notification.id],
111129
id=job_id,
112130
replace_existing=True,
113-
kwargs={"retry_delay_seconds": retry_delay_seconds},
114131
)
115132
scheduled += 1
116133
session.commit()
@@ -122,7 +139,6 @@ async def process_email_queue(
122139

123140
async def send_email_notification(
124141
notification_id: UUID,
125-
retry_delay_seconds: int = EMAIL_RETRY_DELAY_SECONDS,
126142
) -> bool:
127143
"""
128144
Deliver a single queued email notification.
@@ -141,10 +157,26 @@ async def send_email_notification(
141157
notification.body_html,
142158
)
143159
except Exception as exc: # noqa: BLE001
144-
logger.warning(
145-
"Failed to send email %s: %s", notification.id, exc
146-
)
147-
notification.mark_failed(str(exc), retry_delay_seconds)
160+
logger.warning("Failed to send email %s: %s", notification.id, exc)
161+
now = datetime.now(timezone.utc)
162+
should_retry = is_retryable_email_error(exc)
163+
deadline = retry_deadline(notification)
164+
if deadline is None:
165+
deadline = now + timedelta(seconds=EMAIL_RETRY_WINDOW_SECONDS)
166+
attempts_remaining = notification.attempts < EMAIL_MAX_ATTEMPTS
167+
if (
168+
should_retry
169+
and attempts_remaining
170+
and now < deadline
171+
):
172+
delay_seconds = next_retry_delay_seconds()
173+
retry_time = now + timedelta(seconds=delay_seconds)
174+
if retry_time <= deadline:
175+
notification.schedule_retry(str(exc), retry_time)
176+
session.add(notification)
177+
session.commit()
178+
return False
179+
notification.mark_failed(str(exc))
148180
session.add(notification)
149181
session.commit()
150182
return False

0 commit comments

Comments
 (0)