Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions examples/prefect/deploy_flow.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example should also be used as a pytest

Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""Example usage of deploy function"""

from datetime import timedelta
from os import environ

from prefect import flow
from prefect.artifacts import create_table_artifact

from osw.utils.prefect import DeployConfig, DeployParam, deploy, tagsStrToList

# Set environment variables
environ["PREFECT_DEPLOYMENT_NAME"] = "osw-python-deploy-example"
environ["PREFECT_DEPLOYMENT_DESCRIPTION"] = "Deployment of notify_teams.py"
environ["PREFECT_DEPLOYMENT_VERSION"] = "0.0.1"
environ["PREFECT_DEPLOYMENT_TAGS"] = "osw-python,example-deploy-flow"
environ["PREFECT_DEPLOYMENT_INTERVAL_MIN"] = "1"
# environ["PREFECT_DEPLOYMENT_CRON"] = "* * * * *"


@flow(log_prints=True)
def example_flow_to_deploy():
"""Example flow to be deployed"""
print(f"Execution of example: {example_flow_to_deploy.__name__}!")
# set example table artifact
create_table_artifact(
key="example-table",
table=[
{"name": "Alice", "age": 24},
{"name": "Bob", "age": 25},
],
description="Example table artifact",
)


if __name__ == "__main__":
"""Deploy the example flow"""
# Example using environment variables
deploy(
DeployParam(
deployments=[
DeployConfig(
flow=example_flow_to_deploy,
name=environ.get("PREFECT_DEPLOYMENT_NAME"),
description=environ.get("PREFECT_DEPLOYMENT_DESCRIPTION"),
version=environ.get("PREFECT_DEPLOYMENT_VERSION"),
tags=tagsStrToList(environ.get("PREFECT_DEPLOYMENT_TAGS")),
interval=timedelta(
minutes=int(environ.get("PREFECT_DEPLOYMENT_INTERVAL_MIN"))
), # either interval or cron
# cron=environ.get("PREFECT_DEPLOYMENT_CRON"),
)
],
# remove_existing_deployments=True,
)
)

# Clear secret environment variables
del environ["PREFECT_DEPLOYMENT_NAME"]
del environ["PREFECT_DEPLOYMENT_DESCRIPTION"]
del environ["PREFECT_DEPLOYMENT_VERSION"]
del environ["PREFECT_DEPLOYMENT_TAGS"]
del environ["PREFECT_DEPLOYMENT_INTERVAL_MIN"]
# del environ["PREFECT_DEPLOYMENT_CRON"]
43 changes: 43 additions & 0 deletions examples/prefect/notify_teams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""Example of sending notifications to MS Teams on prefect flow failures"""

from os import environ

from prefect import flow
from pydantic import SecretStr

from osw.utils.prefect import NotifyTeams, NotifyTeamsParam

# Prerequisite: Set environment variable TEAMS_WEBHOOK_URL
# in CLI: export TEAMS_WEBHOOK_URL="https://prod..."
# in python uncomment below, DO NOT PUSH SECRETS TO GIT

# environ["TEAMS_WEBHOOK_URL"] = "https://prod..."


# Decorator must be configured with on_failure argument
@flow(
# Microsoft Teams notification on failure ->
# on_failure use `notify_teams` function without brackets as list element
on_failure=[
NotifyTeams(
NotifyTeamsParam(
teams_webhook_url=SecretStr(environ.get("TEAMS_WEBHOOK_URL")),
# OPTIONAL, will be empty if no deploment is assigned
deployment_name="osw-python-notify-teams-example",
)
).notify_teams
],
log_prints=True,
)
def example_error_flow():
"""Test flow that always fails"""

raise ValueError(
"oops! LOREM IPSUM DOLOR SIT AMET CONSECTETUR ADIPISICING ELIT " * 1
)


if __name__ == "__main__":
example_error_flow()
# Clear secret environment variable
del environ["TEAMS_WEBHOOK_URL"]
142 changes: 135 additions & 7 deletions src/osw/utils/prefect.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
"""Prefect utils as support for OpenSemanticWorld."""

import asyncio
import re
import sys
from datetime import timedelta
from importlib.metadata import version
from typing import Iterable, List, Optional

from packaging.specifiers import SpecifierSet
from prefect import Flow, get_client, serve
from prefect.blocks.notifications import MicrosoftTeamsWebhook
from prefect.client.schemas.objects import FlowRun
from prefect.settings import PREFECT_API_URL
from prefect.states import State
from pydantic import BaseModel, SecretStr

# from prefect.settings import PREFECT_API_URL


# ------------------------------ NOTIFICATIONS ------------------------------
class NotifyTeamsParam(BaseModel):
"""Parameter set for notifying Microsoft Teams using class NotifyTeams"""

deployment_name: str
teams_webhook_url: SecretStr
"""Microsoft Teams webhook URL containing a secret"""
deployment_name: Optional[str] = None
"""Deployment name to be displayed in the notification"""


class NotifyTeams(NotifyTeamsParam):
Expand All @@ -29,17 +43,21 @@ def notify_teams(

host_url = str(PREFECT_API_URL.value()).replace("/api", "")

_flow_run = f"**🚨Flow Run: [{flow.name} > {flow_run.name}]({host_url}/flow-runs/flow-run/{flow_run.id}) ❗{state.name}❗**\n\n" # noqa

if flow_run.deployment_id is not None:
# Assigned deployment found
deployment_url = (
f"{host_url}/deployments/deployment/{flow_run.deployment_id}"
)
if self.deployment_name == "" or self.deployment_name is None:
_deployment = f"🚀 Deployment: _[{flow_run.deployment_id}]({deployment_url})_\n\n" # noqa
else:
_deployment = f"🚀 Deployment: _[{self.deployment_name}]({deployment_url})_\n\n" # noqa
else:
deployment_url = ""
_flow_run = f"**🚨Flow Run: [{flow.name} > {flow_run.name}]({host_url}/flow-runs/flow-run/{flow_run.id}) ❗{state.name}❗**\n\n" # noqa
if self.deployment_name == "" or self.deployment_name is None:
_deployment = f"🚀 Deployment: _[{flow_run.deployment_id}]({deployment_url})_\n\n" # noqa
else:
_deployment = f"🚀 Deployment: _[{self.deployment_name}]({deployment_url})_\n\n" # noqa
# No deployment assigned
_deployment = "🚀 Deployment: _Just flow, no deployment_\n\n"

_ts = f"🕑 Timestamp: _{flow_run.state.timestamp.strftime('%Y-%m-%d %H:%M:%S %Z')}_\n\n" # noqa
if flow_run.tags != []:
_tags = f"🏷️ Tags: _#{' #'.join(flow_run.tags)}_\n\n"
Expand All @@ -48,6 +66,116 @@ def notify_teams(

_message = f"📜 Message:\n\n_`{state.message}`_"

# # DEBUG
# print(_flow_run + _deployment + _ts + _tags + _message)
# print(f"Teams webhook URL: {self.teams_webhook_url}")
# print(f"Deployment name: {self.deployment_name}")
# print(f"Flow name: {flow.name}")
# print(f"Flow run name: {flow_run.name}")
# print(f"Flow run ID: {flow_run.id}")

MicrosoftTeamsWebhook(url=self.teams_webhook_url.get_secret_value()).notify(
body=(_flow_run + _deployment + _ts + _tags + _message)
)


# ------------------------------- DEPLOYMENTS -------------------------------
def tagsStrToList(tags: str) -> List[str]:
"""Remove tags whitespaces, newlines, tabs, empty strings, split comma"""
return list(filter(None, re.sub(r"\s+", "", tags).split(",")))


class DeployConfig(BaseModel):
"""Prefect deployment configuration"""

flow: Flow
name: str | None = None
description: str | None = None
interval: Iterable[int | float | timedelta] | int | float | timedelta | None = None
cron: Iterable[str] | str | None = None
version: str | None = None
tags: List[str] | None = None

# Parameters that could be added in future, see to_deployment function:
# rrule: Iterable[str] | str | None = None
# paused: bool | None = None
# schedules: List[FlexibleScheduleList] | None = None
# schedule: SCHEDULE_TYPES | None = None
# is_schedule_active: bool | None = None
# parameters: dict | None = None
# triggers: List[DeploymentTriggerTypes | TriggerTypes] | None = None
# enforce_parameter_schema: bool = False
# work_pool_name: str | None = None
# work_queue_name: str | None = None
# job_variables: Dict[str, Any] | None = None
# deployment_id: str | None = None
# prefect_api_url: str = PREFECT_API_URL
class Config:
arbitrary_types_allowed = True


class DeployParam(BaseModel):
"""Parameter set for deploying flows as deployments"""

deployments: List[DeployConfig]
"""List of deployments to be served"""
# TODO: Implement remove_existing_deployments
remove_existing_deployments: Optional[bool] = False
"""Will remove existing deployments of the specified flows/software"""
# TODO: Add parameter for OSW support in next version


async def _deploy(param: DeployParam):
"""programmatic deployment supported in newer prefect versions
This should become part of osw-python
"""

deployments = []

for deploy_config in param.deployments:
flow: Flow = deploy_config.flow
# Set deployment name if not provided
if deploy_config.name is None or deploy_config.name == "":
deploy_config.name = flow.name + "-deployment"
config = await flow.to_deployment(
name=deploy_config.name,
tags=deploy_config.tags,
cron=deploy_config.cron,
interval=deploy_config.interval,
description=deploy_config.description,
version=deploy_config.version,
)
await config.apply() # returns the deployment_uuid

deployments.append(config)

# fetch flow uuid
async with get_client() as client:
response = await client.read_flow_by_name(flow.name)
print(response.json())
flow_uuid = response.id
print("Flow UUID:", flow_uuid)

# prefect_domain = (
# environ.get("PREFECT_API_URL").split("//")[-1].split("/")[0]
# ) # noqa
# print("Prefect domain:", prefect_domain)
# start agent to serve deployment
# await deploy_config.flow.serve(name=deployment_name)
if version("prefect") in SpecifierSet(">=3.0"):
return deployments
else:
await serve(*deployments)


def deploy(param: DeployParam):
"""Function to serve configured flows as deployments by python version."""
if sys.version_info >= (3, 11):
# python >= 3.11
with asyncio.Runner() as runner:
deployments = runner.run(_deploy(param))
else:
# python < 3.11
deployments = asyncio.run(_deploy(param))
if version("prefect") in SpecifierSet(">=3.0"):
serve(*deployments)
Loading