Skip to content

Commit

Permalink
Prefect Cloud Handoff (PrefectHQ#313)
Browse files Browse the repository at this point in the history
* Configure file read toml

* Intermittent commit before merge

* Flow runnin in container!

* Minor changes

* Removed config var

* Logging

* Env config changes

* Intermittent commit before merge

* Flows CLI touchup

* CLI and GQL work

* Black formatting

* Prefect version tag in docker image

* Flow execution local loading and env changes

* Flow Run ID

* Black formatting;

* Branch in dockerfile

* Added task and flow running callbacks

* Black formatting

* Config access

* Env config holding

* Black formatting

* Moved state setting

* State result temporarily set to none

* Debug

* More cloud debug work

* Task id handle

* Comment

* Merge master

* Readd timeout handler

* Parameters adjustment

* Path update

* Flow run config loading

* Flows cli update

* Black formatting

* Local execution

* Black formatting

* Paramater tuning

* Temp empty param handling until new flow run create functionality

* Black formatting

* Removed loading non-existent params

* Removed client initialization from task_runner

* Test fixing

* CLI refactor

* Adjustments for server changes

* CLI tweaks

* Address comments

* Black formatting

* Removed running of flow due to scheduler existance

* Scheduled flow runs

* Black formatting

* Added refresh_token into client.py

* Handle nonetype for schedule

* Task runner and flow runner callouts to prefect cloud

* Remove config data in cloud context

* Removed client self reference

* Remove comment

* Address comments

* Testing option

* Client call to delete flow from serialized

* Adjusted to query then delete

* Debugging

* Debugging

* Result handling

* Client deleteflow change

* Minor refactor

* Raised on exception remove

* Debugging

* Revert changes
  • Loading branch information
joshmeek authored and cicdw committed Nov 2, 2018
1 parent e42fce4 commit b786199
Show file tree
Hide file tree
Showing 13 changed files with 678 additions and 167 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ local_settings.py

# Dask stuff:
*dask-worker-space/*
*.dirlock
*.lock

# Flask stuff:
instance/
Expand Down
88 changes: 41 additions & 47 deletions src/prefect/cli/configure.py
Original file line number Diff line number Diff line change
@@ -1,95 +1,89 @@
# Licensed under LICENSE.md; also available at https://www.prefect.io/licenses/alpha-eula

import json
import os
from pathlib import Path
import sys

import click
import toml

from prefect.client import Client
from prefect.utilities.cli import load_prefect_config, PATH


@click.group()
def configure():
"""
Configure communication with Prefect Cloud
Configure communication with Prefect Cloud.
"""
pass


@configure.command()
@click.argument("path", required=False)
def init(path):
def init():
"""
Initialize cloud communication config options
Initialize cloud communication config options.
"""
config_data = load_prefect_config()

if not path:
path = "{}/.prefect/config.toml".format(os.getenv("HOME"))

if Path(path).is_file():
config_data = toml.load(path)
else:
config_data = {}

# Do under .server block
config_data["REGISTRY_URL"] = click.prompt(
"Registry URL", default=config_data.get("REGISTRY_URL")
)
config_data["API_URL"] = click.prompt("API URL", default=config_data.get("API_URL"))
config_data["API_ACCESS_KEY"] = click.prompt(
"API Access Key", default=config_data.get("API_ACCESS_KEY")
)

toml.dump(config_data, path)
with open(PATH, "w") as config_file:
toml.dump(config_data, config_file)


@configure.command()
@click.argument("variable")
@click.argument("path", required=False)
def set_variable(variable, path):
def set_variable(variable):
"""
Sets a specific configuration variable
Sets a specific configuration variable.
"""
if not path:
path = "{}/.prefect/config.toml".format(os.getenv("HOME"))

if Path(path).is_file():
config_data = toml.load(path)
else:
config_data = {}
config_data = load_prefect_config()

config_data[variable] = click.prompt(
"{}".format(variable), default=config_data.get(variable)
)

toml.dump(config_data, path)
with open(PATH, "w") as config_file:
toml.dump(config_data, config_file)


@configure.command()
@click.argument("path", required=False)
def list_config(path):
def list_config():
"""
List all configuration variables
List all configuration variables.
"""
if not path:
path = "{}/.prefect/config.toml".format(os.getenv("HOME"))

if Path(path).is_file():
config_data = toml.load(path)
else:
config_data = {}
config_data = load_prefect_config()

click.echo(config_data)


@configure.command()
@click.argument("path", required=False)
def open_config(path):
def open_config():
"""
Opens the configuration file.
"""
click.launch(PATH)


@configure.command()
def login():
"""
Opens the configuration file
Login to Prefect Cloud.
"""
if not path:
path = "{}/.prefect/config.toml".format(os.getenv("HOME"))
config_data = load_prefect_config()

config_data["EMAIL"] = click.prompt("email", default=config_data.get("EMAIL"))
config_data["PASSWORD"] = click.prompt(
"password", hide_input=True, confirmation_prompt=True
)

client = Client(
config_data["API_URL"], os.path.join(config_data["API_URL"], "graphql/")
)

client.login(email=config_data["EMAIL"], password=config_data["PASSWORD"])

click.launch(path)
with open(PATH, "w") as config_file:
toml.dump(config_data, config_file)
192 changes: 142 additions & 50 deletions src/prefect/cli/flows.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,43 @@
# Licensed under LICENSE.md; also available at https://www.prefect.io/licenses/alpha-eula

import json
import os
from pathlib import Path
import sys

import click
import docker
import requests
import toml

import prefect
from prefect.client import RunFlow
from prefect.client import Client, Flows, FlowRuns
from prefect import config
from prefect.core import registry
from prefect.environments import ContainerEnvironment
from prefect.utilities import json as prefect_json
from prefect.utilities.cli import load_prefect_config


def load_flow(project, name, version, file):
if file:
# Load the registry from the file into the current process's environment
exec(open(file).read(), locals())

# Load the user specified flow
flow = None
for flow_id, registry_flow in registry.REGISTRY.items():
if (
registry_flow.project == project
and registry_flow.name == name
and registry_flow.version == version
):
flow = prefect.core.registry.load_flow(flow_id)

if not flow:
raise click.ClickException("{} not found in {}".format(name, file))

return flow


@click.group()
def flows():
"""
Interact with Prefect flows
Interact with Prefect flows.
"""
pass

Expand All @@ -29,7 +47,7 @@ def info():
"""
Prints a JSON string of information about all registered flows.
"""
print(prefect_json.dumps([f.serialize() for f in registry.REGISTRY.values()]))
click.echo(prefect_json.dumps([f.serialize() for f in registry.REGISTRY.values()]))


@flows.command()
Expand All @@ -38,7 +56,7 @@ def ids():
Prints all the flows in the registry.
"""
output = {id: f.key() for id, f in registry.REGISTRY.items()}
print(prefect_json.dumps(output, sort_keys=True))
click.echo(prefect_json.dumps(output, sort_keys=True))


@flows.command()
Expand All @@ -49,37 +67,69 @@ def run(id):
"""
flow = prefect.core.registry.load_flow(id)
flow_runner = prefect.engine.FlowRunner(flow=flow)
return flow_runner.run()

# Load optional parameters
parameters = None
flow_run_id = config.get("flow_run_id", None)

if flow_run_id:
client = Client(config.API_URL, os.path.join(config.API_URL, "graphql/"))
client.login(email=config.EMAIL, password=config.PASSWORD)

flow_runs_gql = FlowRuns(client=client)
stored_parameters = flow_runs_gql.query(flow_run_id=flow_run_id)

parameters = stored_parameters.flowRuns[0].parameters

return flow_runner.run(parameters=parameters)


@flows.command()
@click.argument("id")
def build(id):
@click.argument("project")
@click.argument("name")
@click.argument("version")
@click.option(
"--file",
required=False,
help="Path to a file which contains the flow.",
type=click.Path(exists=True),
)
def build(project, name, version, file):
"""
Build a flow's environment
Build a flow's environment.
"""
flow = prefect.core.registry.load_flow(id)
return flow.environment.build(flow=flow)
flow = load_flow(project, name, version, file)

# Store output from building environment
# Use metadata instead of environment object to avoid storing client secrets
environment_metadata = {
type(flow.environment).__name__: flow.environment.build(flow=flow)
}

return environment_metadata


@flows.command()
@click.argument("id")
@click.argument("path", required=False)
def push(id, path):
@click.argument("project")
@click.argument("name")
@click.argument("version")
@click.option(
"--file",
required=False,
help="Path to a file which contains the flow.",
type=click.Path(exists=True),
)
def push(project, name, version, file):
"""
Push a flow's container environment to a registry
Push a flow's container environment to a registry.
"""
if not path:
path = "{}/.prefect/config.toml".format(os.getenv("HOME"))

if Path(path).is_file():
config_data = toml.load(path)

if not config_data:
click.echo("CLI not configured. Run 'prefect configure init'")
return
config_data = load_prefect_config()
flow = load_flow(project, name, version, file)

flow = prefect.core.registry.load_flow(id)
if not isinstance(flow.environment, ContainerEnvironment):
raise click.ClickException(
"{} does not have a ContainerEnvironment".format(name)
)

# Check if login access was provided for registry
if config_data.get("REGISTRY_USERNAME", None) and config_data.get(
Expand All @@ -98,30 +148,72 @@ def push(id, path):


@flows.command()
@click.argument("id")
@click.argument("path", required=False)
@click.option("--run", "-r", multiple=True)
def exec_command(id, path, run):
@click.argument("project")
@click.argument("name")
@click.argument("version")
@click.option(
"--file",
required=False,
help="Path to a file which contains the flow.",
type=click.Path(exists=True),
)
@click.option(
"--testing", required=False, is_flag=True, help="Deploy flow in testing mode."
)
@click.argument("parameters", required=False)
def deploy(project, name, version, file, testing, parameters):
"""
Send flow command
Deploy a flow to Prefect Cloud.
"""
if not path:
path = "{}/.prefect/config.toml".format(os.getenv("HOME"))
config_data = load_prefect_config()
flow = load_flow(project, name, version, file)

if Path(path).is_file():
config_data = toml.load(path)
client = Client(
config_data["API_URL"], os.path.join(config_data["API_URL"], "graphql/")
)
client.login(email=config_data["EMAIL"], password=config_data["PASSWORD"])

if not config_data:
click.echo("CLI not configured. Run 'prefect configure init'")
return
# Store output from building environment
# Use metadata instead of environment object to avoid storing client secrets
environment_metadata = {
type(flow.environment).__name__: flow.environment.build(flow=flow)
}
serialized_flow = flow.serialize()
serialized_flow["environment"] = prefect_json.dumps(environment_metadata)

flow = prefect.core.registry.load_flow(id)
flows_gql = Flows(client=client)

if run:
RunFlow().run_flow(
image_name=flow.environment.image,
image_tag=flow.environment.tag,
flow_id=id,
if testing:
click.echo(
"Warning: Testing mode overwrites flows with similar project/name/version."
)
flow_id = flows_gql.query(
project_name=project, flow_name=name, flow_version=version
)
else:
click.echo("No command specified")

if flow_id.flows:
flows_gql.delete(flow_id=flow_id.flows[0].id)

# Create the flow in the database
try:
flow_create_output = flows_gql.create(serialized_flow=serialized_flow)
except ValueError as value_error:
if "No project found for" in str(value_error):
raise click.ClickException("No project found for {}".format(project))
else:
raise click.ClickException(str(value_error))

flow_db_id = flow_create_output.createFlow.flow.id

next_scheduled_run = None
if flow.schedule.next(1):
next_scheduled_run = flow.schedule.next(1)[0]
next_scheduled_run = next_scheduled_run.isoformat()

# Create Flow Run
flow_runs_gql = FlowRuns(client=client)
flow_runs_gql.create(
flow_id=flow_db_id, parameters=parameters, start_time=next_scheduled_run
)

click.echo("{} deployed.".format(name))
Loading

0 comments on commit b786199

Please sign in to comment.