|
1 | 1 | import logging |
2 | | -from datetime import datetime, timedelta |
| 2 | +from datetime import datetime |
3 | 3 |
|
4 | 4 | import requests |
5 | | -from airflow.sdk import Asset, AssetWatcher, Context, dag, task |
| 5 | +import tenacity |
| 6 | +from airflow.providers.http.hooks.http import HttpHook |
| 7 | +from airflow.sdk import Asset, AssetWatcher, Context, Variable, dag, task |
6 | 8 |
|
7 | 9 | from triggers.finance_report import FinanceReportTrigger |
8 | 10 |
|
9 | | -logger = logging.getLogger(__name__) |
| 11 | +# get the airflow.task logger |
| 12 | +task_logger = logging.getLogger("airflow.task") |
| 13 | + |
10 | 14 |
|
11 | 15 | finance_report_asset = Asset( |
12 | 16 | name="finance_report", |
13 | 17 | watchers=[ |
14 | 18 | AssetWatcher( |
15 | 19 | name="finance_report_watcher", |
16 | 20 | trigger=FinanceReportTrigger( |
17 | | - # poke_interval=86400, # 60*60*24 |
18 | | - poke_interval=5, # 60*60*24 |
| 21 | + poke_interval=86400, # 60*60*24 |
19 | 22 | ), |
20 | 23 | ) |
21 | 24 | ], |
|
37 | 40 | }, |
38 | 41 | ) |
39 | 42 | def discord_message_notification(): |
40 | | - """Send Discord Message""" |
| 43 | + """Send Discord Message.""" |
41 | 44 |
|
42 | | - @task( |
43 | | - retries=10, |
44 | | - retry_delay=timedelta(seconds=10), |
45 | | - ) |
| 45 | + @task |
46 | 46 | def send_discord_message(**context: Context) -> None: |
47 | 47 | triggering_asset_events = context["triggering_asset_events"] |
48 | | - session = requests.session() |
49 | | - logger.info(f"Receive asset events {triggering_asset_events}") |
50 | 48 | for asset_uri, asset_events in triggering_asset_events.items(): |
51 | | - logger.info(f"Receive asset event from Asset uri={asset_uri}") |
| 49 | + task_logger.info(f"Receive asset event from Asset uri={asset_uri}") |
| 50 | + |
| 51 | + http_hook = HttpHook(method="POST", http_conn_id="discord_webhook") |
52 | 52 | for asset_event in asset_events: # type: ignore[attr-defined] |
53 | 53 | if asset_event.extra.get("from_trigger", False): |
54 | 54 | details = asset_event.extra["payload"] |
55 | 55 | else: |
56 | 56 | details = asset_event.extra |
57 | 57 |
|
58 | | - session.post( |
59 | | - details.get("webhook_url"), |
60 | | - json={ |
| 58 | + if not details: |
| 59 | + task_logger.error( |
| 60 | + f"Detail {details} cannot be empty. It's required to send discord message." |
| 61 | + ) |
| 62 | + continue |
| 63 | + |
| 64 | + task_logger.info("Start sending discord message") |
| 65 | + endpoint = Variable.get(details.get("webhook_endpoint_key")) |
| 66 | + http_hook.run_with_advanced_retry( |
| 67 | + endpoint=endpoint, |
| 68 | + data={ |
61 | 69 | "username": details.get("username"), |
62 | 70 | "content": details.get("content"), |
63 | 71 | }, |
| 72 | + _retry_args=dict( |
| 73 | + wait=tenacity.wait_random(min=1, max=10), |
| 74 | + stop=tenacity.stop_after_attempt(10), |
| 75 | + retry=tenacity.retry_if_exception_type( |
| 76 | + requests.exceptions.ConnectionError |
| 77 | + ), |
| 78 | + ), |
64 | 79 | ) |
| 80 | + task_logger.info("Discord message sent") |
65 | 81 |
|
66 | 82 | send_discord_message() |
67 | 83 |
|
|
0 commit comments