Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/prefect/deployments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@


if TYPE_CHECKING:
from .flow_runs import run_deployment
from .flow_runs import arun_deployment, run_deployment
from .base import initialize_project
from .runner import deploy

_public_api: dict[str, tuple[str, str]] = {
"initialize_project": (__spec__.parent, ".base"),
"arun_deployment": (__spec__.parent, ".flow_runs"),
"run_deployment": (__spec__.parent, ".flow_runs"),
"deploy": (__spec__.parent, ".runner"),
}

# Declare API for type-checkers
__all__ = ["initialize_project", "deploy", "run_deployment"]
__all__ = ["initialize_project", "deploy", "arun_deployment", "run_deployment"]


def __getattr__(attr_name: str) -> object:
Expand Down
35 changes: 26 additions & 9 deletions src/prefect/deployments/flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
import prefect
from prefect._result_records import ResultRecordMetadata
from prefect.client.schemas import FlowRun, TaskRunResult
from prefect.client.utilities import inject_client
from prefect.client.utilities import get_or_create_client
from prefect.context import FlowRunContext, TaskRunContext
from prefect.logging import get_logger
from prefect.states import Pending, Scheduled
from prefect.tasks import Task
from prefect.telemetry.run_telemetry import LABELS_TRACEPARENT_KEY, RunTelemetry
from prefect.types._datetime import now
from prefect.utilities.asyncutils import sync_compatible
from prefect.utilities._engine import dynamic_key_for_task_run
from prefect.utilities.slugify import slugify


Expand Down Expand Up @@ -45,9 +45,7 @@ def _is_instrumentation_enabled() -> bool:
logger: "logging.Logger" = get_logger(__name__)


@sync_compatible
@inject_client
async def run_deployment(
async def arun_deployment(
name: Union[str, UUID],
client: Optional["PrefectClient"] = None,
parameters: Optional[dict[str, Any]] = None,
Expand All @@ -62,7 +60,7 @@ async def run_deployment(
job_variables: Optional[dict[str, Any]] = None,
) -> "FlowRun":
"""
Create a flow run for a deployment and return it after completion or a timeout.
Asynchronously create a flow run for a deployment and return it after completion or a timeout.

By default, this function blocks until the flow run finishes executing.
Specify a timeout (in seconds) to wait for the flow run to execute before
Expand All @@ -79,6 +77,7 @@ async def run_deployment(
Args:
name: The deployment id or deployment name in the form:
`"flow name/deployment name"`
client: An optional PrefectClient to use for API requests.
parameters: Parameter overrides for this flow run. Merged with the deployment
defaults.
scheduled_time: The time to schedule the flow run for, defaults to scheduling
Expand All @@ -100,6 +99,18 @@ async def run_deployment(
job_variables: A dictionary of dot delimited infrastructure overrides that
will be applied at runtime; for example `env.CONFIG_KEY=config_value` or
`namespace='prefect'`

Example:
```python
import asyncio
from prefect.deployments import arun_deployment

async def main():
flow_run = await arun_deployment("my-flow/my-deployment")
print(flow_run.state)

asyncio.run(main())
```
"""
if timeout is not None and timeout < 0:
raise ValueError("`timeout` cannot be negative")
Expand All @@ -119,6 +130,8 @@ async def run_deployment(
except ValueError:
pass

client, _ = get_or_create_client(client)

if deployment_id:
deployment = await client.read_deployment(deployment_id=deployment_id)
else:
Expand All @@ -133,7 +146,7 @@ async def run_deployment(

# This was called from a flow. Link the flow run as a subflow.
task_inputs = {
k: await collect_task_run_inputs(v) for k, v in parameters.items()
k: collect_task_run_inputs(v) for k, v in parameters.items()
}
Comment on lines 148 to 150

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 Bug: Removing await from async collect_task_run_inputs stores coroutines

collect_task_run_inputs is defined as async def collect_task_run_inputs(expr, max_depth=-1) in prefect/utilities/engine.py. The PR removes the await keyword from the dict comprehension, so task_inputs will contain unawaited coroutine objects instead of resolved set[TaskRunResult | FlowRunResult] values.

This will silently corrupt the task_inputs passed to client.create_task_run(), breaking subflow linking when arun_deployment is called from within a flow or task. The task run inputs will be coroutine objects rather than the expected sets of task run references.

The original code correctly used await collect_task_run_inputs(v). The PR context claims this is a "bug fix" because collect_task_run_inputs is synchronous, but that's incorrect — the function is async.

Was this helpful? React with 👍 / 👎

Suggested change
task_inputs = {
k: await collect_task_run_inputs(v) for k, v in parameters.items()
k: collect_task_run_inputs(v) for k, v in parameters.items()
}
task_inputs = {
k: await collect_task_run_inputs(v) for k, v in parameters.items()
}
  • Apply suggested fix


# Track parent task if this is being called from within a task
Expand Down Expand Up @@ -196,7 +209,7 @@ async def run_deployment(
trace_labels = {LABELS_TRACEPARENT_KEY: traceparent} if traceparent else {}

flow_run = await client.create_flow_run_from_deployment(
deployment.id,
deployment_id,
Comment on lines 211 to +212

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 Bug: deployment_id is None when name string is passed

When a deployment name string (e.g., "my-flow/my-deployment") is passed instead of a UUID, deployment_id remains None (set at line 123, never updated on the name path). The deployment is fetched by name at line 138 via read_deployment_by_name(name), but deployment_id is never set to deployment.id.

At line 211-212, deployment_id (which is None) is passed as the first positional argument to create_flow_run_from_deployment(), which expects deployment_id: UUID. This will cause an error (likely a TypeError or HTTP 422) for all name-based deployment lookups.

The original code correctly used deployment.id here. The fix should revert to deployment.id or set deployment_id = deployment.id after the name-based lookup.

Was this helpful? React with 👍 / 👎

Suggested change
flow_run = await client.create_flow_run_from_deployment(
deployment.id,
deployment_id,
flow_run = await client.create_flow_run_from_deployment(
deployment.id,
  • Apply suggested fix

parameters=parameters,
state=Scheduled(scheduled_time=scheduled_time),
name=flow_run_name,
Expand All @@ -215,10 +228,14 @@ async def run_deployment(

with anyio.move_on_after(timeout):
while True:
await anyio.sleep(poll_interval)
flow_run = await client.read_flow_run(flow_run_id)
flow_state = flow_run.state
if flow_state and flow_state.is_final():
return flow_run
Comment on lines 229 to 235

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Edge Case: Polling sleep-first means extra delay before first state check

Moving await anyio.sleep(poll_interval) to the beginning of the polling loop means there's always an unnecessary delay before the first check of the flow run state. If the flow run completes very quickly (e.g., in under 1 second), the caller still waits the full poll_interval (default 5 seconds) before discovering it's done.

The PR context describes this as "prevents unnecessary sleep when timeout is reached," but the original pattern (check first, sleep after) is standard polling behavior. The original pattern already exits immediately on a final state without sleeping — it only sleeps when the run isn't done yet. The new pattern introduces latency in the common case (fast completions) to save time in the rare case (timeout edge).

Consider keeping the original order (check, then sleep) for better responsiveness.

Was this helpful? React with 👍 / 👎

Suggested change
with anyio.move_on_after(timeout):
while True:
await anyio.sleep(poll_interval)
flow_run = await client.read_flow_run(flow_run_id)
flow_state = flow_run.state
if flow_state and flow_state.is_final():
return flow_run
with anyio.move_on_after(timeout):
while True:
flow_run = await client.read_flow_run(flow_run_id)
flow_state = flow_run.state
if flow_state and flow_state.is_final():
return flow_run
await anyio.sleep(poll_interval)
  • Apply suggested fix

await anyio.sleep(poll_interval)

return flow_run


# Alias for backwards compatibility
run_deployment = arun_deployment
Comment on lines +240 to +241

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 Bug: Backward-incompatible: run_deployment alias breaks sync callers

Setting run_deployment = arun_deployment makes run_deployment a plain async def function. This breaks backward compatibility in two ways:

  1. Sync callers get a coroutine instead of a result: Previously, run_deployment was decorated with @async_dispatch(arun_deployment) which detected sync/async context and dispatched accordingly. Now, calling run_deployment(...) in a sync context returns an unawaited coroutine that is silently discarded.

  2. _sync parameter no longer accepted: The old @async_dispatch wrapper handled _sync=True/False kwargs to force sync/async execution. The new tests in TestRunDeploymentSyncContext pass _sync=True to run_deployment, but arun_deployment doesn't accept this parameter, so these tests will raise TypeError: arun_deployment() got an unexpected keyword argument '_sync'.

To maintain backward compatibility, run_deployment should remain a wrapper that uses @async_dispatch(arun_deployment) to handle sync/async dispatch, rather than being a simple alias to the async function.

Was this helpful? React with 👍 / 👎

  • Apply suggested fix

Loading
Loading