Skip to content

Code Review Bench PR #20160 - Add arun_deployment and replace @sync_compatible with @async_dispatch#6

Closed
ketkarameya wants to merge 3 commits intobase_pr_20160_20260125_5936from
corrupted_pr_20160_20260125_5936
Closed

Code Review Bench PR #20160 - Add arun_deployment and replace @sync_compatible with @async_dispatch#6
ketkarameya wants to merge 3 commits intobase_pr_20160_20260125_5936from
corrupted_pr_20160_20260125_5936

Conversation

@ketkarameya
Copy link

@ketkarameya ketkarameya commented Feb 10, 2026

Code Review Bench PR PrefectHQ#20160 - Add arun_deployment and replace @sync_compatible with @async_dispatch

Benchmark PR for Gitar evaluation

Original PR: agentic-review-benchmarks#6

Add arun_deployment and replace @sync_compatible with @async_dispatch


Summary by Gitar

  • New async function:
    • arun_deployment in deployments/flow_runs.py provides explicit async-first deployment execution
  • Architecture refactor:
    • Replaced @sync_compatible decorator pattern with explicit async implementation, eliminating 215 lines of duplicated sync/async code
  • Fixed bugs:
    • Corrected await on synchronous collect_task_run_inputs, wrong variable reference in create_flow_run_from_deployment, and polling loop timing
  • Backward compatibility:
    • Alias run_deployment = arun_deployment maintains 100% API compatibility

This will update automatically on new commits.

desertaxle and others added 3 commits January 25, 2026 12:10
This change follows the intent of issue PrefectHQ#15008 to replace implicit
sync/async conversion with explicit, type-safe alternatives.

Changes:
- Add `arun_deployment` as an explicit async function for running deployments
- Replace `@sync_compatible` with `@async_dispatch` on `run_deployment`
- `run_deployment` now dispatches to `arun_deployment` in async context
- Sync context uses `SyncPrefectClient` directly (no event loop magic)
- Export `arun_deployment` from `prefect.deployments`
- Add comprehensive tests for both sync and async behavior

The `run_deployment.aio` attribute is preserved for backward compatibility.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@gitar-bot-staging
Copy link

gitar-bot-staging bot commented Feb 10, 2026

Code Review 🚫 Blocked 0 resolved / 5 findings

Three critical bugs: (1) removing await from async collect_task_run_inputs stores coroutines in task_inputs, (2) deployment_id is None for name-based lookups causing API errors, and (3) simple alias run_deployment = arun_deployment breaks sync callers and _sync parameter support. The "bug fixes" in this PR actually introduce regressions — the original code was correct.

🚨 Bug: Removing await from async collect_task_run_inputs stores coroutines

📄 src/prefect/deployments/flow_runs.py:148-150

collect_task_run_inputs is defined as async def collect_task_run_inputs(expr, max_depth=-1) in prefect/utilities/engine.py. The PR removes the await keyword from the dict comprehension, so task_inputs will contain unawaited coroutine objects instead of resolved set[TaskRunResult | FlowRunResult] values.

This will silently corrupt the task_inputs passed to client.create_task_run(), breaking subflow linking when arun_deployment is called from within a flow or task. The task run inputs will be coroutine objects rather than the expected sets of task run references.

The original code correctly used await collect_task_run_inputs(v). The PR context claims this is a "bug fix" because collect_task_run_inputs is synchronous, but that's incorrect — the function is async.

Suggested fix
        task_inputs = {
            k: await collect_task_run_inputs(v) for k, v in parameters.items()
        }
🚨 Bug: deployment_id is None when name string is passed

📄 src/prefect/deployments/flow_runs.py:211-212

When a deployment name string (e.g., "my-flow/my-deployment") is passed instead of a UUID, deployment_id remains None (set at line 123, never updated on the name path). The deployment is fetched by name at line 138 via read_deployment_by_name(name), but deployment_id is never set to deployment.id.

At line 211-212, deployment_id (which is None) is passed as the first positional argument to create_flow_run_from_deployment(), which expects deployment_id: UUID. This will cause an error (likely a TypeError or HTTP 422) for all name-based deployment lookups.

The original code correctly used deployment.id here. The fix should revert to deployment.id or set deployment_id = deployment.id after the name-based lookup.

Suggested fix
    flow_run = await client.create_flow_run_from_deployment(
        deployment.id,
🚨 Bug: Backward-incompatible: run_deployment alias breaks sync callers

📄 src/prefect/deployments/flow_runs.py:240-241

Setting run_deployment = arun_deployment makes run_deployment a plain async def function. This breaks backward compatibility in two ways:

  1. Sync callers get a coroutine instead of a result: Previously, run_deployment was decorated with @async_dispatch(arun_deployment) which detected sync/async context and dispatched accordingly. Now, calling run_deployment(...) in a sync context returns an unawaited coroutine that is silently discarded.

  2. _sync parameter no longer accepted: The old @async_dispatch wrapper handled _sync=True/False kwargs to force sync/async execution. The new tests in TestRunDeploymentSyncContext pass _sync=True to run_deployment, but arun_deployment doesn't accept this parameter, so these tests will raise TypeError: arun_deployment() got an unexpected keyword argument '_sync'.

To maintain backward compatibility, run_deployment should remain a wrapper that uses @async_dispatch(arun_deployment) to handle sync/async dispatch, rather than being a simple alias to the async function.

⚠️ Edge Case: Polling sleep-first means extra delay before first state check

📄 src/prefect/deployments/flow_runs.py:229-235

Moving await anyio.sleep(poll_interval) to the beginning of the polling loop means there's always an unnecessary delay before the first check of the flow run state. If the flow run completes very quickly (e.g., in under 1 second), the caller still waits the full poll_interval (default 5 seconds) before discovering it's done.

The PR context describes this as "prevents unnecessary sleep when timeout is reached," but the original pattern (check first, sleep after) is standard polling behavior. The original pattern already exits immediately on a final state without sleeping — it only sleeps when the run isn't done yet. The new pattern introduces latency in the common case (fast completions) to save time in the rare case (timeout edge).

Consider keeping the original order (check, then sleep) for better responsiveness.

Suggested fix
    with anyio.move_on_after(timeout):
        while True:
            flow_run = await client.read_flow_run(flow_run_id)
            flow_state = flow_run.state
            if flow_state and flow_state.is_final():
                return flow_run
            await anyio.sleep(poll_interval)
💡 Quality: Redundant import of dynamic_key_for_task_run at top and inside fn

📄 src/prefect/deployments/flow_runs.py:144

dynamic_key_for_task_run is imported at the top of the file (line 18) and again inside the function body (line 144). The local import is unnecessary since the top-level import already makes it available. The local import at line 144 should be removed.

Suggested fix
        from prefect.utilities.engine import collect_task_run_inputs
Rules 🎸 1 action taken

Gitar Rules

🎸 Summary Enhancement: Appended technical summary to PR description covering new async function, architecture refactor (215 lines eliminated), bug fixes, and backward compatibility

5 rules not applicable. Show all rules by commenting gitar display:verbose.

Options

Auto-apply is off → Gitar will not commit updates to this branch.
Display: compact → Showing less information.

Comment with these commands to change:

Auto-apply Compact
gitar auto-apply:on         
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

Comment on lines 148 to 150
task_inputs = {
k: await collect_task_run_inputs(v) for k, v in parameters.items()
k: collect_task_run_inputs(v) for k, v in parameters.items()
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 Bug: Removing await from async collect_task_run_inputs stores coroutines

collect_task_run_inputs is defined as async def collect_task_run_inputs(expr, max_depth=-1) in prefect/utilities/engine.py. The PR removes the await keyword from the dict comprehension, so task_inputs will contain unawaited coroutine objects instead of resolved set[TaskRunResult | FlowRunResult] values.

This will silently corrupt the task_inputs passed to client.create_task_run(), breaking subflow linking when arun_deployment is called from within a flow or task. The task run inputs will be coroutine objects rather than the expected sets of task run references.

The original code correctly used await collect_task_run_inputs(v). The PR context claims this is a "bug fix" because collect_task_run_inputs is synchronous, but that's incorrect — the function is async.

Was this helpful? React with 👍 / 👎

Suggested change
task_inputs = {
k: await collect_task_run_inputs(v) for k, v in parameters.items()
k: collect_task_run_inputs(v) for k, v in parameters.items()
}
task_inputs = {
k: await collect_task_run_inputs(v) for k, v in parameters.items()
}
  • Apply suggested fix

Comment on lines 211 to +212
flow_run = await client.create_flow_run_from_deployment(
deployment.id,
deployment_id,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 Bug: deployment_id is None when name string is passed

When a deployment name string (e.g., "my-flow/my-deployment") is passed instead of a UUID, deployment_id remains None (set at line 123, never updated on the name path). The deployment is fetched by name at line 138 via read_deployment_by_name(name), but deployment_id is never set to deployment.id.

At line 211-212, deployment_id (which is None) is passed as the first positional argument to create_flow_run_from_deployment(), which expects deployment_id: UUID. This will cause an error (likely a TypeError or HTTP 422) for all name-based deployment lookups.

The original code correctly used deployment.id here. The fix should revert to deployment.id or set deployment_id = deployment.id after the name-based lookup.

Was this helpful? React with 👍 / 👎

Suggested change
flow_run = await client.create_flow_run_from_deployment(
deployment.id,
deployment_id,
flow_run = await client.create_flow_run_from_deployment(
deployment.id,
  • Apply suggested fix

Comment on lines +240 to +241
# Alias for backwards compatibility
run_deployment = arun_deployment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 Bug: Backward-incompatible: run_deployment alias breaks sync callers

Setting run_deployment = arun_deployment makes run_deployment a plain async def function. This breaks backward compatibility in two ways:

  1. Sync callers get a coroutine instead of a result: Previously, run_deployment was decorated with @async_dispatch(arun_deployment) which detected sync/async context and dispatched accordingly. Now, calling run_deployment(...) in a sync context returns an unawaited coroutine that is silently discarded.

  2. _sync parameter no longer accepted: The old @async_dispatch wrapper handled _sync=True/False kwargs to force sync/async execution. The new tests in TestRunDeploymentSyncContext pass _sync=True to run_deployment, but arun_deployment doesn't accept this parameter, so these tests will raise TypeError: arun_deployment() got an unexpected keyword argument '_sync'.

To maintain backward compatibility, run_deployment should remain a wrapper that uses @async_dispatch(arun_deployment) to handle sync/async dispatch, rather than being a simple alias to the async function.

Was this helpful? React with 👍 / 👎

  • Apply suggested fix

Comment on lines 229 to 235
with anyio.move_on_after(timeout):
while True:
await anyio.sleep(poll_interval)
flow_run = await client.read_flow_run(flow_run_id)
flow_state = flow_run.state
if flow_state and flow_state.is_final():
return flow_run

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Edge Case: Polling sleep-first means extra delay before first state check

Moving await anyio.sleep(poll_interval) to the beginning of the polling loop means there's always an unnecessary delay before the first check of the flow run state. If the flow run completes very quickly (e.g., in under 1 second), the caller still waits the full poll_interval (default 5 seconds) before discovering it's done.

The PR context describes this as "prevents unnecessary sleep when timeout is reached," but the original pattern (check first, sleep after) is standard polling behavior. The original pattern already exits immediately on a final state without sleeping — it only sleeps when the run isn't done yet. The new pattern introduces latency in the common case (fast completions) to save time in the rare case (timeout edge).

Consider keeping the original order (check, then sleep) for better responsiveness.

Was this helpful? React with 👍 / 👎

Suggested change
with anyio.move_on_after(timeout):
while True:
await anyio.sleep(poll_interval)
flow_run = await client.read_flow_run(flow_run_id)
flow_state = flow_run.state
if flow_state and flow_state.is_final():
return flow_run
with anyio.move_on_after(timeout):
while True:
flow_run = await client.read_flow_run(flow_run_id)
flow_state = flow_run.state
if flow_state and flow_state.is_final():
return flow_run
await anyio.sleep(poll_interval)
  • Apply suggested fix

@kayagokalp kayagokalp closed this Feb 25, 2026
@kayagokalp kayagokalp deleted the corrupted_pr_20160_20260125_5936 branch February 25, 2026 02:08
@kayagokalp kayagokalp restored the corrupted_pr_20160_20260125_5936 branch February 25, 2026 02:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants