Skip to content

Commit c4cac9e

Browse files
committed
init utils prefect ms teams notification
1 parent a3bfed0 commit c4cac9e

File tree

3 files changed

+58
-0
lines changed

3 files changed

+58
-0
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,5 @@ site
6161
# Specific to this project
6262
src/osw/model/*.json
6363
playground
64+
65+
.env

setup.cfg

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ dataimport =
9292
openpyxl
9393
UI =
9494
pysimplegui
95+
workflow =
96+
prefect==2.20.0
9597
tutorial =
9698
%(dataimport)s
9799
all =
@@ -101,6 +103,7 @@ all =
101103
%(S3)s
102104
%(wikitext)s
103105

106+
104107
# Add here dev requirements (semicolon/line-separated)
105108
dev =
106109
pre-commit>=3.2.0

src/osw/utils/prefect.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
"""Prefect utils as support for OpenSemanticWorld."""
2+
3+
from prefect.blocks.notifications import MicrosoftTeamsWebhook
4+
from prefect.client.schemas.objects import FlowRun
5+
from prefect.settings import PREFECT_API_URL
6+
from prefect.states import State
7+
from pydantic import BaseModel, SecretStr
8+
9+
10+
class NotifyTeamsParam(BaseModel):
11+
"""Parameter set for notifying Microsoft Teams using class NotifyTeams"""
12+
13+
deployment_name: str
14+
teams_webhook_url: SecretStr
15+
16+
17+
class NotifyTeams(NotifyTeamsParam):
18+
"""Notify Microsoft Teams channel using a webhook"""
19+
20+
def __init__(self, notify_teams_param: NotifyTeamsParam):
21+
super().__init__(**notify_teams_param.model_dump())
22+
23+
def notify_teams(
24+
self,
25+
flow,
26+
flow_run: FlowRun,
27+
state: State,
28+
):
29+
30+
host_url = str(PREFECT_API_URL.value()).replace("/api", "")
31+
32+
if flow_run.deployment_id is not None:
33+
deployment_url = (
34+
f"{host_url}/deployments/deployment/{flow_run.deployment_id}"
35+
)
36+
else:
37+
deployment_url = ""
38+
_flow_run = f"**🚨Flow Run: [{flow.name} > {flow_run.name}]({host_url}/flow-runs/flow-run/{flow_run.id}) ❗{state.name}❗**\n\n" # noqa
39+
if self.deployment_name == "" or self.deployment_name is None:
40+
_deployment = f"🚀 Deployment: _[{flow_run.deployment_id}]({deployment_url})_\n\n" # noqa
41+
else:
42+
_deployment = f"🚀 Deployment: _[{self.deployment_name}]({deployment_url})_\n\n" # noqa
43+
_ts = f"🕑 Timestamp: _{flow_run.state.timestamp.strftime('%Y-%m-%d %H:%M:%S %Z')}_\n\n" # noqa
44+
if flow_run.tags != []:
45+
_tags = f"🏷️ Tags: _#{' #'.join(flow_run.tags)}_\n\n"
46+
else:
47+
_tags = ""
48+
49+
_message = f"📜 Message:\n\n_`{state.message}`_"
50+
51+
MicrosoftTeamsWebhook(url=self.teams_webhook_url.get_secret_value()).notify(
52+
body=(_flow_run + _deployment + _ts + _tags + _message)
53+
)

0 commit comments

Comments
 (0)