Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add task for kicking off a flow run #1871

Merged
merged 13 commits into from
Dec 23, 2019
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Released on Dec 17, 2019.

### Task Library

- None
- Add task for scheduling a flow run - [#1871](https://github.com/PrefectHQ/prefect/pull/1871)

### Fixes

Expand Down
5 changes: 5 additions & 0 deletions src/prefect/tasks/cloud/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""
This Task class is "Cloud" in the sense that it relies on Cloud to do anything meaningful.
"""

import prefect.tasks.cloud.flow_run
79 changes: 79 additions & 0 deletions src/prefect/tasks/cloud/flow_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from typing import Any
from prefect import Task

from prefect.client import Client
from prefect.utilities.tasks import defaults_from_attrs
from prefect.utilities.graphql import with_args


class FlowRunTask(Task):
"""
Task used to kick off a Flow Run in Cloud.
"""

def __init__(
self,
flow_name: str = None,
project_name: str = None,
parameters: dict = None,
**kwargs: Any
):
self.flow_name = flow_name
self.project_name = project_name
self.parameters = parameters
self.client = Client()
super().__init__(**kwargs)

@defaults_from_attrs("project_name", "flow_name", "parameters")
def run(
self, project_name: str = None, flow_name: str = None, parameters: dict = None
) -> None:
"""
Run method for the task; responsible for scheduling the specified flow run.

Args:
- project_name (str, optional): the project in which the flow is located; if not provided, this method
will use the project provided at initialization
- flow_name (str, optional): the name of the flow to schedule; if not provided, this method will
use the project provided at initialization
- parameters (dict, optional): the parameters to pass to the flow run being scheduled; if not provided,
this method will use the parameters provided at initialization

Returns:
- None
"""
# verify that flow and project names were passed in some capacity or another
if project_name is None:
raise ValueError("Must provide a project name.")
if flow_name is None:
raise ValueError("Must provide a flow name.")

# find the flow ID to schedule
query = {
"query": {
with_args(
"flow",
{
"where": {
"name": {"_eq": flow_name},
"project": {"name": {"_eq": project_name}},
}
},
): {"id"}
}
}
flow = self.client.graphql(query).data.flow
# verify that an ID was returned
if not flow:
raise ValueError(
"No flow {} found in project {}.".format(flow_name, project_name)
)
# and that there was only one ID returned
if len(flow) > 1:
raise ValueError(
"More than one flow named {} found in project {}.".format(
flow_name, project_name
)
)
flow_id = flow[0].id
self.client.create_flow_run(flow_id=flow_id, parameters=parameters)
Empty file added tests/tasks/cloud/__init__.py
Empty file.
82 changes: 82 additions & 0 deletions tests/tasks/cloud/test_flow_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import pytest
from unittest.mock import MagicMock

from prefect.tasks.cloud.flow_run import FlowRunTask


@pytest.fixture()
def client(monkeypatch):
cloud_client = MagicMock(
graphql=MagicMock(
return_value=MagicMock(data=MagicMock(flow=[MagicMock(id="abc123")]))
),
create_flow_run=MagicMock(return_value="def456"),
)
monkeypatch.setattr(
"prefect.tasks.cloud.flow_run.Client", MagicMock(return_value=cloud_client),
)
yield cloud_client


class TestFlowRunTask:
def test_initialization(self):
# verify that the task is initialized as expected
task = FlowRunTask(
project_name="Test Project",
flow_name="Test Flow",
parameters={"test": "ing"},
)
assert task.project_name == "Test Project"
assert task.flow_name == "Test Flow"
assert task.parameters == {"test": "ing"}

def test_flow_run_task(self, client):
# verify that create_flow_run was called
task = FlowRunTask(
project_name="Test Project",
flow_name="Test Flow",
parameters={"test": "ing"},
)
task.run()
client.graphql.assert_called_once_with(
{
"query": {
'flow(where: { name: { _eq: "Test Flow" }, project: { name: { _eq: "Test Project" } } })': {
"id"
}
}
}
)
client.create_flow_run.assert_called_once_with(
flow_id="abc123", parameters={"test": "ing"}
)

def test_flow_run_task_without_flow_name(self):
# verify that a ValueError is raised without a flow name
task = FlowRunTask(project_name="Test Project")
with pytest.raises(ValueError, match="Must provide a flow name."):
task.run()

def test_flow_run_task_without_project_name(self):
# verify that a ValueError is raised without a project name
task = FlowRunTask(flow_name="Test Flow")
with pytest.raises(ValueError, match="Must provide a project name."):
task.run()

def test_flow_run_task_with_more_than_one_matching_flow(self, client):
# verify a ValueError is raised if the client returns more than one flow
task = FlowRunTask(flow_name="Test Flow", project_name="Test Project")
client.graphql = MagicMock(
return_value=MagicMock(
data=MagicMock(flow=[MagicMock(id="abc123"), MagicMock(id="xyz789")])
)
)
with pytest.raises(ValueError, match="More than one flow"):
task.run()

def test_flow_run_task_with_no_matching_flow(self, client):
# verify a ValueError is raised if the client returns no flows
task = FlowRunTask(flow_name="Test Flow", project_name="Test Project")
client.graphql = MagicMock(return_value=MagicMock(data=MagicMock(flow=[])))
with pytest.raises(ValueError, match="No flow"):
task.run()