-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Configuring different task runner for dev and prod deployments #5560
Comments
An example workaround import os
from prefect import flow, task
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import UniversalFlowRunner
from prefect.task_runners import ConcurrentTaskRunner, DaskTaskRunner
@flow(
task_runner=DaskTaskRunner()
if os.environ.get("MY_ENV") == "prod"
else ConcurrentTaskRunner(),
)
def my_flow():
...
# Edit: Note flow runners are now infrastructure blocks
prod = DeploymentSpec(
flow=my_flow, flow_runner=UniversalFlowRunner(env={"MY_ENV": "prod"})
)
dev = DeploymentSpec(
flow=my_flow, flow_runner=UniversalFlowRunner(env={"MY_ENV": "dev"})
) We are likely to support setting the task runner from the deployment directly in the future. |
Hi, I have the same issues and ended up going with a similar workaround - though my workaround quickly became verbose due to ,my dask and dask_kubernetes configurations not fitting into 1-3 lines - is there any ETA on this being supported? |
no ETA so far, following this issue is the best way to stay up to date on the progress here |
Hi 👋 |
Hi @john-jam, the pattern @madkinsz showed above should still be valid, changing out the @flow(
task_runner=DaskTaskRunner()
if os.environ.get("MY_ENV") == "prod"
else ConcurrentTaskRunner(),
)
def my_flow(): Then you can set env vars when initializing the from <some_module> import my_flow
deployment = Deployment.build_from_flow(
flow=my_flow,
name="my-flow",
infra_overrides={"env": {"MY_ENV": "prod"}},
work_queue_name="test",
)
if __name__ == "__main__":
deployment.apply() Hopefully that helps. |
Thank you for your response @bunchesofdonald ! Btw, if someone else is interested, we have to define the @flow(
task_runner=DaskTaskRunner()
if os.environ.get("MY_ENV") == "prod"
else ConcurrentTaskRunner(),
)
def my_flow(): If not, a serialization error occurs. |
Thanks! I've updated my example. I make that mistake often when writing flows that don't actually do anything :) |
Seems like this pattern satisfies the original request, so closing as complete |
It does work but it runs at import time which is a bit awkward. If we could avoid import time logic, that would be cleaner I think eg: a hook that ran after import time but before the flow starts. |
@tekumara would you mind opening a new issue with that as an |
Opened from the Prefect Public Slack Community
davzucky: How can I setup in Orion the task runner at deployment? I want to be able to change the mode between local dev and prod. I can only see how to do that for flow runners at the deployment level
anna: It's not possible to do that directly atm since currently the task runner must be specified on the
flow
decorator rather than on theDeploymentSpec
. And the more I think about it, the more it actually makes sense.The problem you are describing is not "How can I override the task runner used by a flow on a
DeploymentSpec
" - this is one possible solution. The actual problem is: "How can I use a different task runner for development and production deployments". And to solve that problem, the intended solution is to have two different deployments: one for dev, and one for prod. The entire dev vs. prod story is not yet fully established - we are working on adding e.g. GitHub storage. What would likely be a good solution to your issue (that is currently not possible yet) is that you may have in the end two different branches on the same repo:• the "
dev
" branch may have flow code with, sayDaskTaskRunner
,• the "
main
" or "prod
" branch may have flow code with a different task runner e.g.ConcurrentTaskRunner
and then you can have two different deployments - one for dev and one for prod - each of those references the flow code on a given branch. This would give a clear separation of code and environments and would make building CI/CD pipelines much easier.
Having said that, it could be worth exploring adding some sort of override on the
DeploymentSpec
(exactly as you mentioned) - I will open an issue to open this up for discussion. The only problem I currently see with that is that it goes a bit against the runtime discoverability in Orion - the way I understand how deployments work is that you should be able to create a single deployment foryour_flow.py
- then, you can run your deployed flow first using sayDaskTaskRunner
. But then you can modify the flow codeyour_flow.py
, referenced asflow_location
on theDeploymentSpec
, change the task runner to e.g.ConcurrentTaskRunner
and you can create a flow run of this deployed flow with this new task runner without having to recreate the deployment as Orion allows for runtime discoverability and doesn't force you to preregister any DAG's metadata.Frankly, even in Prefect 1.0, Prefect doesn't store the executor (effectively the same as task runner in 2.0) information in the backend for privacy reasons because it may contain private information such as your Dask/Ray cluster address. Instead, Prefect retrieves this information at runtime from storage - this is another reason I would be leaning more towards two different versions of this flow in dev and prod branches as a solution to this problem.
For now, you could certainly introduce a hack by setting some custom parameter value (that can be set on your
DeploymentSpec
) that determines which subflow to call:<@ULVA73B9P> open "Orion: as a user, how can I use a different task runner for development and production deployments?"
Original thread can be found here.
The text was updated successfully, but these errors were encountered: