-
Notifications
You must be signed in to change notification settings - Fork 0
Code Review Bench PR #20160 - Add arun_deployment and replace @sync_compatible with @async_dispatch #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Code Review Bench PR #20160 - Add arun_deployment and replace @sync_compatible with @async_dispatch #6
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -8,14 +8,14 @@ | |||||||||||||||||||||||||||||
| import prefect | ||||||||||||||||||||||||||||||
| from prefect._result_records import ResultRecordMetadata | ||||||||||||||||||||||||||||||
| from prefect.client.schemas import FlowRun, TaskRunResult | ||||||||||||||||||||||||||||||
| from prefect.client.utilities import inject_client | ||||||||||||||||||||||||||||||
| from prefect.client.utilities import get_or_create_client | ||||||||||||||||||||||||||||||
| from prefect.context import FlowRunContext, TaskRunContext | ||||||||||||||||||||||||||||||
| from prefect.logging import get_logger | ||||||||||||||||||||||||||||||
| from prefect.states import Pending, Scheduled | ||||||||||||||||||||||||||||||
| from prefect.tasks import Task | ||||||||||||||||||||||||||||||
| from prefect.telemetry.run_telemetry import LABELS_TRACEPARENT_KEY, RunTelemetry | ||||||||||||||||||||||||||||||
| from prefect.types._datetime import now | ||||||||||||||||||||||||||||||
| from prefect.utilities.asyncutils import sync_compatible | ||||||||||||||||||||||||||||||
| from prefect.utilities._engine import dynamic_key_for_task_run | ||||||||||||||||||||||||||||||
| from prefect.utilities.slugify import slugify | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
|
|
@@ -45,9 +45,7 @@ def _is_instrumentation_enabled() -> bool: | |||||||||||||||||||||||||||||
| logger: "logging.Logger" = get_logger(__name__) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| @sync_compatible | ||||||||||||||||||||||||||||||
| @inject_client | ||||||||||||||||||||||||||||||
| async def run_deployment( | ||||||||||||||||||||||||||||||
| async def arun_deployment( | ||||||||||||||||||||||||||||||
| name: Union[str, UUID], | ||||||||||||||||||||||||||||||
| client: Optional["PrefectClient"] = None, | ||||||||||||||||||||||||||||||
| parameters: Optional[dict[str, Any]] = None, | ||||||||||||||||||||||||||||||
|
|
@@ -62,7 +60,7 @@ async def run_deployment( | |||||||||||||||||||||||||||||
| job_variables: Optional[dict[str, Any]] = None, | ||||||||||||||||||||||||||||||
| ) -> "FlowRun": | ||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||
| Create a flow run for a deployment and return it after completion or a timeout. | ||||||||||||||||||||||||||||||
| Asynchronously create a flow run for a deployment and return it after completion or a timeout. | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| By default, this function blocks until the flow run finishes executing. | ||||||||||||||||||||||||||||||
| Specify a timeout (in seconds) to wait for the flow run to execute before | ||||||||||||||||||||||||||||||
|
|
@@ -79,6 +77,7 @@ async def run_deployment( | |||||||||||||||||||||||||||||
| Args: | ||||||||||||||||||||||||||||||
| name: The deployment id or deployment name in the form: | ||||||||||||||||||||||||||||||
| `"flow name/deployment name"` | ||||||||||||||||||||||||||||||
| client: An optional PrefectClient to use for API requests. | ||||||||||||||||||||||||||||||
| parameters: Parameter overrides for this flow run. Merged with the deployment | ||||||||||||||||||||||||||||||
| defaults. | ||||||||||||||||||||||||||||||
| scheduled_time: The time to schedule the flow run for, defaults to scheduling | ||||||||||||||||||||||||||||||
|
|
@@ -100,6 +99,18 @@ async def run_deployment( | |||||||||||||||||||||||||||||
| job_variables: A dictionary of dot delimited infrastructure overrides that | ||||||||||||||||||||||||||||||
| will be applied at runtime; for example `env.CONFIG_KEY=config_value` or | ||||||||||||||||||||||||||||||
| `namespace='prefect'` | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| Example: | ||||||||||||||||||||||||||||||
| ```python | ||||||||||||||||||||||||||||||
| import asyncio | ||||||||||||||||||||||||||||||
| from prefect.deployments import arun_deployment | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| async def main(): | ||||||||||||||||||||||||||||||
| flow_run = await arun_deployment("my-flow/my-deployment") | ||||||||||||||||||||||||||||||
| print(flow_run.state) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| asyncio.run(main()) | ||||||||||||||||||||||||||||||
| ``` | ||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||
| if timeout is not None and timeout < 0: | ||||||||||||||||||||||||||||||
| raise ValueError("`timeout` cannot be negative") | ||||||||||||||||||||||||||||||
|
|
@@ -119,6 +130,8 @@ async def run_deployment( | |||||||||||||||||||||||||||||
| except ValueError: | ||||||||||||||||||||||||||||||
| pass | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| client, _ = get_or_create_client(client) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| if deployment_id: | ||||||||||||||||||||||||||||||
| deployment = await client.read_deployment(deployment_id=deployment_id) | ||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||
|
|
@@ -133,7 +146,7 @@ async def run_deployment( | |||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # This was called from a flow. Link the flow run as a subflow. | ||||||||||||||||||||||||||||||
| task_inputs = { | ||||||||||||||||||||||||||||||
| k: await collect_task_run_inputs(v) for k, v in parameters.items() | ||||||||||||||||||||||||||||||
| k: collect_task_run_inputs(v) for k, v in parameters.items() | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| # Track parent task if this is being called from within a task | ||||||||||||||||||||||||||||||
|
|
@@ -196,7 +209,7 @@ async def run_deployment( | |||||||||||||||||||||||||||||
| trace_labels = {LABELS_TRACEPARENT_KEY: traceparent} if traceparent else {} | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| flow_run = await client.create_flow_run_from_deployment( | ||||||||||||||||||||||||||||||
| deployment.id, | ||||||||||||||||||||||||||||||
| deployment_id, | ||||||||||||||||||||||||||||||
|
Comment on lines
211
to
+212
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚨 Bug:
|
||||||||||||||||||||||||||||||
| flow_run = await client.create_flow_run_from_deployment( | |
| deployment.id, | |
| deployment_id, | |
| flow_run = await client.create_flow_run_from_deployment( | |
| deployment.id, |
- Apply suggested fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⚠️ Edge Case: Polling sleep-first means extra delay before first state check
Moving await anyio.sleep(poll_interval) to the beginning of the polling loop means there's always an unnecessary delay before the first check of the flow run state. If the flow run completes very quickly (e.g., in under 1 second), the caller still waits the full poll_interval (default 5 seconds) before discovering it's done.
The PR context describes this as "prevents unnecessary sleep when timeout is reached," but the original pattern (check first, sleep after) is standard polling behavior. The original pattern already exits immediately on a final state without sleeping — it only sleeps when the run isn't done yet. The new pattern introduces latency in the common case (fast completions) to save time in the rare case (timeout edge).
Consider keeping the original order (check, then sleep) for better responsiveness.
Was this helpful? React with 👍 / 👎
| with anyio.move_on_after(timeout): | |
| while True: | |
| await anyio.sleep(poll_interval) | |
| flow_run = await client.read_flow_run(flow_run_id) | |
| flow_state = flow_run.state | |
| if flow_state and flow_state.is_final(): | |
| return flow_run | |
| with anyio.move_on_after(timeout): | |
| while True: | |
| flow_run = await client.read_flow_run(flow_run_id) | |
| flow_state = flow_run.state | |
| if flow_state and flow_state.is_final(): | |
| return flow_run | |
| await anyio.sleep(poll_interval) |
- Apply suggested fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚨 Bug: Backward-incompatible: run_deployment alias breaks sync callers
Setting run_deployment = arun_deployment makes run_deployment a plain async def function. This breaks backward compatibility in two ways:
-
Sync callers get a coroutine instead of a result: Previously,
run_deploymentwas decorated with@async_dispatch(arun_deployment)which detected sync/async context and dispatched accordingly. Now, callingrun_deployment(...)in a sync context returns an unawaited coroutine that is silently discarded. -
_syncparameter no longer accepted: The old@async_dispatchwrapper handled_sync=True/Falsekwargs to force sync/async execution. The new tests inTestRunDeploymentSyncContextpass_sync=Truetorun_deployment, butarun_deploymentdoesn't accept this parameter, so these tests will raiseTypeError: arun_deployment() got an unexpected keyword argument '_sync'.
To maintain backward compatibility, run_deployment should remain a wrapper that uses @async_dispatch(arun_deployment) to handle sync/async dispatch, rather than being a simple alias to the async function.
Was this helpful? React with 👍 / 👎
- Apply suggested fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚨 Bug: Removing
awaitfrom asynccollect_task_run_inputsstores coroutinescollect_task_run_inputsis defined asasync def collect_task_run_inputs(expr, max_depth=-1)inprefect/utilities/engine.py. The PR removes theawaitkeyword from the dict comprehension, sotask_inputswill contain unawaited coroutine objects instead of resolvedset[TaskRunResult | FlowRunResult]values.This will silently corrupt the
task_inputspassed toclient.create_task_run(), breaking subflow linking whenarun_deploymentis called from within a flow or task. The task run inputs will be coroutine objects rather than the expected sets of task run references.The original code correctly used
await collect_task_run_inputs(v). The PR context claims this is a "bug fix" becausecollect_task_run_inputsis synchronous, but that's incorrect — the function is async.Was this helpful? React with 👍 / 👎