Code Review Bench PR #20160 - Add arun_deployment and replace @sync_compatible with @async_dispatch#6
Conversation
This change follows the intent of issue PrefectHQ#15008 to replace implicit sync/async conversion with explicit, type-safe alternatives. Changes: - Add `arun_deployment` as an explicit async function for running deployments - Replace `@sync_compatible` with `@async_dispatch` on `run_deployment` - `run_deployment` now dispatches to `arun_deployment` in async context - Sync context uses `SyncPrefectClient` directly (no event loop magic) - Export `arun_deployment` from `prefect.deployments` - Add comprehensive tests for both sync and async behavior The `run_deployment.aio` attribute is preserved for backward compatibility. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Code Review 🚫 Blocked 0 resolved / 5 findingsThree critical bugs: (1) removing 🚨 Bug: Removing
|
| Auto-apply | Compact |
|
|
Was this helpful? React with 👍 / 👎 | Gitar
| task_inputs = { | ||
| k: await collect_task_run_inputs(v) for k, v in parameters.items() | ||
| k: collect_task_run_inputs(v) for k, v in parameters.items() | ||
| } |
There was a problem hiding this comment.
🚨 Bug: Removing await from async collect_task_run_inputs stores coroutines
collect_task_run_inputs is defined as async def collect_task_run_inputs(expr, max_depth=-1) in prefect/utilities/engine.py. The PR removes the await keyword from the dict comprehension, so task_inputs will contain unawaited coroutine objects instead of resolved set[TaskRunResult | FlowRunResult] values.
This will silently corrupt the task_inputs passed to client.create_task_run(), breaking subflow linking when arun_deployment is called from within a flow or task. The task run inputs will be coroutine objects rather than the expected sets of task run references.
The original code correctly used await collect_task_run_inputs(v). The PR context claims this is a "bug fix" because collect_task_run_inputs is synchronous, but that's incorrect — the function is async.
Was this helpful? React with 👍 / 👎
| task_inputs = { | |
| k: await collect_task_run_inputs(v) for k, v in parameters.items() | |
| k: collect_task_run_inputs(v) for k, v in parameters.items() | |
| } | |
| task_inputs = { | |
| k: await collect_task_run_inputs(v) for k, v in parameters.items() | |
| } |
- Apply suggested fix
| flow_run = await client.create_flow_run_from_deployment( | ||
| deployment.id, | ||
| deployment_id, |
There was a problem hiding this comment.
🚨 Bug: deployment_id is None when name string is passed
When a deployment name string (e.g., "my-flow/my-deployment") is passed instead of a UUID, deployment_id remains None (set at line 123, never updated on the name path). The deployment is fetched by name at line 138 via read_deployment_by_name(name), but deployment_id is never set to deployment.id.
At line 211-212, deployment_id (which is None) is passed as the first positional argument to create_flow_run_from_deployment(), which expects deployment_id: UUID. This will cause an error (likely a TypeError or HTTP 422) for all name-based deployment lookups.
The original code correctly used deployment.id here. The fix should revert to deployment.id or set deployment_id = deployment.id after the name-based lookup.
Was this helpful? React with 👍 / 👎
| flow_run = await client.create_flow_run_from_deployment( | |
| deployment.id, | |
| deployment_id, | |
| flow_run = await client.create_flow_run_from_deployment( | |
| deployment.id, |
- Apply suggested fix
| # Alias for backwards compatibility | ||
| run_deployment = arun_deployment |
There was a problem hiding this comment.
🚨 Bug: Backward-incompatible: run_deployment alias breaks sync callers
Setting run_deployment = arun_deployment makes run_deployment a plain async def function. This breaks backward compatibility in two ways:
-
Sync callers get a coroutine instead of a result: Previously,
run_deploymentwas decorated with@async_dispatch(arun_deployment)which detected sync/async context and dispatched accordingly. Now, callingrun_deployment(...)in a sync context returns an unawaited coroutine that is silently discarded. -
_syncparameter no longer accepted: The old@async_dispatchwrapper handled_sync=True/Falsekwargs to force sync/async execution. The new tests inTestRunDeploymentSyncContextpass_sync=Truetorun_deployment, butarun_deploymentdoesn't accept this parameter, so these tests will raiseTypeError: arun_deployment() got an unexpected keyword argument '_sync'.
To maintain backward compatibility, run_deployment should remain a wrapper that uses @async_dispatch(arun_deployment) to handle sync/async dispatch, rather than being a simple alias to the async function.
Was this helpful? React with 👍 / 👎
- Apply suggested fix
| with anyio.move_on_after(timeout): | ||
| while True: | ||
| await anyio.sleep(poll_interval) | ||
| flow_run = await client.read_flow_run(flow_run_id) | ||
| flow_state = flow_run.state | ||
| if flow_state and flow_state.is_final(): | ||
| return flow_run |
There was a problem hiding this comment.
⚠️ Edge Case: Polling sleep-first means extra delay before first state check
Moving await anyio.sleep(poll_interval) to the beginning of the polling loop means there's always an unnecessary delay before the first check of the flow run state. If the flow run completes very quickly (e.g., in under 1 second), the caller still waits the full poll_interval (default 5 seconds) before discovering it's done.
The PR context describes this as "prevents unnecessary sleep when timeout is reached," but the original pattern (check first, sleep after) is standard polling behavior. The original pattern already exits immediately on a final state without sleeping — it only sleeps when the run isn't done yet. The new pattern introduces latency in the common case (fast completions) to save time in the rare case (timeout edge).
Consider keeping the original order (check, then sleep) for better responsiveness.
Was this helpful? React with 👍 / 👎
| with anyio.move_on_after(timeout): | |
| while True: | |
| await anyio.sleep(poll_interval) | |
| flow_run = await client.read_flow_run(flow_run_id) | |
| flow_state = flow_run.state | |
| if flow_state and flow_state.is_final(): | |
| return flow_run | |
| with anyio.move_on_after(timeout): | |
| while True: | |
| flow_run = await client.read_flow_run(flow_run_id) | |
| flow_state = flow_run.state | |
| if flow_state and flow_state.is_final(): | |
| return flow_run | |
| await anyio.sleep(poll_interval) |
- Apply suggested fix
Code Review Bench PR PrefectHQ#20160 - Add arun_deployment and replace @sync_compatible with @async_dispatch
Benchmark PR for Gitar evaluation
Original PR: agentic-review-benchmarks#6
Add arun_deployment and replace @sync_compatible with @async_dispatch
Summary by Gitar
arun_deploymentindeployments/flow_runs.pyprovides explicit async-first deployment execution@sync_compatibledecorator pattern with explicit async implementation, eliminating 215 lines of duplicated sync/async codecollect_task_run_inputs, wrong variable reference increate_flow_run_from_deployment, and polling loop timingrun_deployment = arun_deploymentmaintains 100% API compatibilityThis will update automatically on new commits.