|
1 | | -import random |
2 | 1 | import re |
3 | 2 | from datetime import datetime, timedelta, timezone |
4 | 3 | from uuid import UUID |
5 | 4 |
|
6 | | -from botocore.exceptions import BotoCoreError, ClientError |
7 | 5 | from loguru import logger |
8 | 6 | from pydantic import BaseModel |
9 | 7 | from sqlalchemy import and_, or_ |
|
25 | 23 | ) |
26 | 24 | from db.setup import get_db_session |
27 | 25 | from db.types import GROUP_NAMES, ApprovalStatusEnum, EmailStatusEnum, GroupEnum |
| 26 | +from scheduled_tasks.email_retry import ( |
| 27 | + EMAIL_JOB_ID_PREFIX, |
| 28 | + EMAIL_MAX_ATTEMPTS, |
| 29 | + EMAIL_QUEUE_BATCH_SIZE, |
| 30 | + EMAIL_RETRY_WINDOW_SECONDS, |
| 31 | + can_schedule_notification, |
| 32 | + is_retryable_email_error, |
| 33 | + next_retry_delay_seconds, |
| 34 | + retry_deadline, |
| 35 | +) |
28 | 36 | from scheduled_tasks.scheduler import SCHEDULER |
29 | 37 | from schemas.auth0 import ( |
30 | 38 | GROUP_ROLE_PATTERN, |
|
33 | 41 | ) |
34 | 42 | from schemas.biocommons import Auth0UserData |
35 | 43 |
|
36 | | -EMAIL_QUEUE_BATCH_SIZE = 25 |
37 | | -EMAIL_MAX_ATTEMPTS = 2 |
38 | | -EMAIL_RETRY_DELAY_MIN_SECONDS = 15 * 60 |
39 | | -EMAIL_RETRY_DELAY_MAX_SECONDS = 30 * 60 |
40 | | -EMAIL_RETRY_WINDOW_SECONDS = 60 * 60 |
41 | | -EMAIL_JOB_ID_PREFIX = "email_notification_" |
42 | | - |
43 | | -TRANSIENT_CLIENT_ERROR_CODES = { |
44 | | - "Throttling", |
45 | | - "ThrottlingException", |
46 | | - "TooManyRequestsException", |
47 | | - "ServiceUnavailable", |
48 | | - "InternalFailure", |
49 | | - "InternalError", |
50 | | - "RequestThrottled", |
51 | | -} |
52 | | - |
53 | | - |
54 | | -def _ensure_aware(dt: datetime | None) -> datetime | None: |
55 | | - if dt is None: |
56 | | - return None |
57 | | - if dt.tzinfo is None: |
58 | | - return dt.replace(tzinfo=timezone.utc) |
59 | | - return dt |
60 | | - |
61 | | - |
62 | | -def _retry_deadline(notification: EmailNotification) -> datetime | None: |
63 | | - first_attempt = _ensure_aware(notification.last_attempt_at) |
64 | | - if first_attempt is None: |
65 | | - return None |
66 | | - return first_attempt + timedelta(seconds=EMAIL_RETRY_WINDOW_SECONDS) |
67 | | - |
68 | | - |
69 | | -def _can_schedule_now(notification: EmailNotification, now: datetime) -> bool: |
70 | | - if notification.attempts >= EMAIL_MAX_ATTEMPTS: |
71 | | - return False |
72 | | - deadline = _retry_deadline(notification) |
73 | | - return deadline is None or now < deadline |
74 | | - |
75 | | - |
76 | | -def _is_retryable_email_error(exc: Exception) -> bool: |
77 | | - if isinstance(exc, ClientError): |
78 | | - code = exc.response.get("Error", {}).get("Code") |
79 | | - return code in TRANSIENT_CLIENT_ERROR_CODES |
80 | | - if isinstance(exc, BotoCoreError): |
81 | | - return True |
82 | | - if isinstance(exc, OSError): |
83 | | - return True |
84 | | - return False |
85 | | - |
86 | | - |
87 | | -def _next_retry_delay_seconds() -> int: |
88 | | - return random.randint(EMAIL_RETRY_DELAY_MIN_SECONDS, EMAIL_RETRY_DELAY_MAX_SECONDS) |
89 | | - |
90 | | - |
91 | 44 | def _ensure_user_from_auth0(session: Session, user_data: Auth0UserData) -> tuple[BiocommonsUser, bool, bool]: |
92 | 45 | """ |
93 | 46 | Ensure the Auth0 user exists in the database, creating or restoring if required. |
@@ -156,7 +109,7 @@ async def process_email_queue( |
156 | 109 | return 0 |
157 | 110 | scheduled = 0 |
158 | 111 | for notification in notifications: |
159 | | - if not _can_schedule_now(notification, now): |
| 112 | + if not can_schedule_notification(notification, now): |
160 | 113 | logger.info( |
161 | 114 | "Skipping email %s: retry window exhausted or max attempts reached", |
162 | 115 | notification.id, |
@@ -205,19 +158,17 @@ async def send_email_notification( |
205 | 158 | except Exception as exc: # noqa: BLE001 |
206 | 159 | logger.warning("Failed to send email %s: %s", notification.id, exc) |
207 | 160 | now = datetime.now(timezone.utc) |
208 | | - should_retry = _is_retryable_email_error(exc) |
209 | | - deadline = _retry_deadline(notification) |
| 161 | + should_retry = is_retryable_email_error(exc) |
| 162 | + deadline = retry_deadline(notification) |
210 | 163 | if deadline is None: |
211 | 164 | deadline = now + timedelta(seconds=EMAIL_RETRY_WINDOW_SECONDS) |
212 | | - else: |
213 | | - deadline = _ensure_aware(deadline) |
214 | 165 | attempts_remaining = notification.attempts < EMAIL_MAX_ATTEMPTS |
215 | 166 | if ( |
216 | 167 | should_retry |
217 | 168 | and attempts_remaining |
218 | 169 | and now < deadline |
219 | 170 | ): |
220 | | - delay_seconds = _next_retry_delay_seconds() |
| 171 | + delay_seconds = next_retry_delay_seconds() |
221 | 172 | retry_time = now + timedelta(seconds=delay_seconds) |
222 | 173 | if retry_time <= deadline: |
223 | 174 | notification.schedule_retry(str(exc), retry_time) |
|
0 commit comments