|  | 
|  | 1 | +"""Prefect utils as support for OpenSemanticWorld.""" | 
|  | 2 | + | 
|  | 3 | +from prefect.blocks.notifications import MicrosoftTeamsWebhook | 
|  | 4 | +from prefect.client.schemas.objects import FlowRun | 
|  | 5 | +from prefect.settings import PREFECT_API_URL | 
|  | 6 | +from prefect.states import State | 
|  | 7 | +from pydantic import BaseModel, SecretStr | 
|  | 8 | + | 
|  | 9 | + | 
|  | 10 | +class NotifyTeamsParam(BaseModel): | 
|  | 11 | +    """Parameter set for notifying Microsoft Teams using class NotifyTeams""" | 
|  | 12 | + | 
|  | 13 | +    deployment_name: str | 
|  | 14 | +    teams_webhook_url: SecretStr | 
|  | 15 | + | 
|  | 16 | + | 
|  | 17 | +class NotifyTeams(NotifyTeamsParam): | 
|  | 18 | +    """Notify Microsoft Teams channel using a webhook""" | 
|  | 19 | + | 
|  | 20 | +    def __init__(self, notify_teams_param: NotifyTeamsParam): | 
|  | 21 | +        super().__init__(**notify_teams_param.model_dump()) | 
|  | 22 | + | 
|  | 23 | +    def notify_teams( | 
|  | 24 | +        self, | 
|  | 25 | +        flow, | 
|  | 26 | +        flow_run: FlowRun, | 
|  | 27 | +        state: State, | 
|  | 28 | +    ): | 
|  | 29 | + | 
|  | 30 | +        host_url = str(PREFECT_API_URL.value()).replace("/api", "") | 
|  | 31 | + | 
|  | 32 | +        if flow_run.deployment_id is not None: | 
|  | 33 | +            deployment_url = ( | 
|  | 34 | +                f"{host_url}/deployments/deployment/{flow_run.deployment_id}" | 
|  | 35 | +            ) | 
|  | 36 | +        else: | 
|  | 37 | +            deployment_url = "" | 
|  | 38 | +        _flow_run = f"**🚨Flow Run: [{flow.name} > {flow_run.name}]({host_url}/flow-runs/flow-run/{flow_run.id}) ❗{state.name}❗**\n\n"  # noqa | 
|  | 39 | +        if self.deployment_name == "" or self.deployment_name is None: | 
|  | 40 | +            _deployment = f"🚀 Deployment: _[{flow_run.deployment_id}]({deployment_url})_\n\n"  # noqa | 
|  | 41 | +        else: | 
|  | 42 | +            _deployment = f"🚀 Deployment: _[{self.deployment_name}]({deployment_url})_\n\n"  # noqa | 
|  | 43 | +        _ts = f"🕑 Timestamp: _{flow_run.state.timestamp.strftime('%Y-%m-%d %H:%M:%S %Z')}_\n\n"  # noqa | 
|  | 44 | +        if flow_run.tags != []: | 
|  | 45 | +            _tags = f"🏷️ Tags: _#{' #'.join(flow_run.tags)}_\n\n" | 
|  | 46 | +        else: | 
|  | 47 | +            _tags = "" | 
|  | 48 | + | 
|  | 49 | +        _message = f"📜 Message:\n\n_`{state.message}`_" | 
|  | 50 | + | 
|  | 51 | +        MicrosoftTeamsWebhook(url=self.teams_webhook_url.get_secret_value()).notify( | 
|  | 52 | +            body=(_flow_run + _deployment + _ts + _tags + _message) | 
|  | 53 | +        ) | 
0 commit comments