Skip to content

Add arun_deployment and replace @sync_compatible with @async_dispatch#32

Open
tomerqodo wants to merge 3 commits intocoderabbit_full_base_add_arun_deployment_and_replace_sync_compatible_with_async_dispatch_pr6from
coderabbit_full_head_add_arun_deployment_and_replace_sync_compatible_with_async_dispatch_pr6
Open

Add arun_deployment and replace @sync_compatible with @async_dispatch#32
tomerqodo wants to merge 3 commits intocoderabbit_full_base_add_arun_deployment_and_replace_sync_compatible_with_async_dispatch_pr6from
coderabbit_full_head_add_arun_deployment_and_replace_sync_compatible_with_async_dispatch_pr6

Conversation

@tomerqodo
Copy link

@tomerqodo tomerqodo commented Jan 29, 2026

Benchmark PR from agentic-review-benchmarks#6

Summary by CodeRabbit

  • New Features
    • Introduced arun_deployment, a new public async function for executing deployments with improved timeout semantics and job variable handling.
    • Existing run_deployment remains fully functional and supported for backward compatibility.

✏️ Tip: You can customize this high-level summary in your review settings.

desertaxle and others added 3 commits January 25, 2026 12:10
This change follows the intent of issue PrefectHQ#15008 to replace implicit
sync/async conversion with explicit, type-safe alternatives.

Changes:
- Add `arun_deployment` as an explicit async function for running deployments
- Replace `@sync_compatible` with `@async_dispatch` on `run_deployment`
- `run_deployment` now dispatches to `arun_deployment` in async context
- Sync context uses `SyncPrefectClient` directly (no event loop magic)
- Export `arun_deployment` from `prefect.deployments`
- Add comprehensive tests for both sync and async behavior

The `run_deployment.aio` attribute is preserved for backward compatibility.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@coderabbitai
Copy link

coderabbitai bot commented Jan 29, 2026

Walkthrough

The pull request introduces arun_deployment as the primary async function for running deployments, replacing the decorator-dependent run_deployment. The refactoring removes @sync_compatible and @inject_client decorators in favor of explicit get_or_create_client usage. A backward-compatible alias maintains existing import paths. Comprehensive tests verify the new implementation.

Changes

Cohort / File(s) Summary
Public API Export
src/prefect/deployments/__init__.py
Added arun_deployment to TYPE_CHECKING imports and public __all__ export list; updated _public_api mapping to route to .flow_runs module.
Core Implementation Refactor
src/prefect/deployments/flow_runs.py
Renamed run_deployment to arun_deployment (primary async function); replaced decorator-based client injection with explicit get_or_create_client calls; refactored parameter handling to skip awaiting individual task inputs; restructured parent task run construction with dynamic key derivation; adjusted deployment lookup and flow run creation calls; modified polling loop with initial sleep; maintained run_deployment as backward-compatible alias.
Comprehensive Test Coverage
tests/deployment/test_flow_runs.py
Added TestArunDeployment class with extensive tests covering direct invocation, deployment ID handling, parameter passing, tags, custom flow run names, job variables, timeout validation, idempotency keys, and parent-flow linking; verified run_deployment.aio backward compatibility; confirmed async context behavior.

Poem

🐰 A deployment renamed, from sync ties now freed,
Through explicit clients and fresh async seed,
No decorators binding, just clarity's call,
With tests standing guard over changes in all,
Backward compat whispers, "I'm still here, not tall!" 🌟

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~22 minutes

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description check ⚠️ Warning The description is minimal and only references an external benchmark PR without providing substantive detail about the changes, missing key checklist items and overview information. Expand the description to include an overview of changes, verify checklist items (related issues, unit tests, docstrings), and provide context beyond the external PR reference.
Docstring Coverage ⚠️ Warning Docstring coverage is 78.26% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main changes: adding arun_deployment and replacing @sync_compatible with @async_dispatch as shown in the code changes.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/prefect/deployments/flow_runs.py (1)

123-139: Ensure deployment_id is set after lookup by name.

When name is not a UUID, deployment_id stays None, but you pass it into create_flow_run_from_deployment, which likely expects a UUID. Set it from the fetched deployment to avoid passing None.

✅ Suggested fix
-    if deployment_id:
-        deployment = await client.read_deployment(deployment_id=deployment_id)
-    else:
-        deployment = await client.read_deployment_by_name(name)
+    if deployment_id:
+        deployment = await client.read_deployment(deployment_id=deployment_id)
+    else:
+        deployment = await client.read_deployment_by_name(name)
+        deployment_id = deployment.id

Also applies to: 211-212

🤖 Fix all issues with AI agents
In `@src/prefect/deployments/__init__.py`:
- Line 18: The __all__ export list is not alphabetically sorted which triggers
Ruff RUF022; update the __all__ variable in this module so the names are sorted
lexicographically (e.g., arrange "arun_deployment", "deploy",
"initialize_project", "run_deployment") to satisfy the linter for the __all__
declaration.

In `@src/prefect/deployments/flow_runs.py`:
- Around line 240-241: Replace the simple alias run_deployment = arun_deployment
with a synchronous wrapper function decorated using
`@async_dispatch`(arun_deployment): define def run_deployment(...same params as
arun_deployment..., _sync: bool = False, *args, **kwargs) that calls into the
async dispatcher (keeping the signature same as the async version so sync
callers work), and then set run_deployment.aio = arun_deployment; this restores
sync execution via _sync=True and exposes the .aio attribute while using
async_dispatch to delegate to arun_deployment.
- Around line 148-150: The dict comprehension creating task_inputs is calling
the async helper collect_task_run_inputs without awaiting it, which stores
coroutine objects instead of results; update the comprehension in
arun_deployment so each value uses await collect_task_run_inputs(v) (e.g.,
task_inputs = {k: await collect_task_run_inputs(v) for k, v in
parameters.items()}) so task_inputs contains resolved values suitable for
serialization and dependency tracking.
- Around line 133-135: The call to get_or_create_client in flow_runs.py
currently discards the second return value and may leak connections; update the
code to capture the tuple (e.g., client, inferred =
get_or_create_client(client)) and then create a context variable that is client
when not inferred and asyncnullcontext(client) when inferred, then use "async
with context as client:" to ensure newly created PrefectClient instances are
closed (follow the pattern used in the inject_client decorator and use the
asyncnullcontext utility for already-provided clients).

In `@tests/deployment/test_flow_runs.py`:
- Around line 758-763: The test function
test_arun_deployment_links_to_parent_flow has an unused fixture parameter
use_hosted_api_server which triggers Ruff ARG002; either remove the
use_hosted_api_server parameter from the function signature, change the
decorator to `@pytest.mark.usefixtures`("use_hosted_api_server") to keep the
fixture without a param, or rename the parameter to _use_hosted_api_server to
indicate it is intentionally unused.
- Around line 922-926: The test function test_run_deployment_aio_attribute
currently declares unused parameters prefect_client and test_deployment which
triggers lint ARG002; remove these parameters or rename them to _prefect_client
and _test_deployment (or prefix with an underscore) so they are recognized as
intentionally unused. Update the function signature in the
test_run_deployment_aio_attribute definition accordingly and run tests/linter to
confirm the warning is resolved.
- Around line 782-880: The test methods in TestRunDeploymentSyncContext declare
an unused parameter sync_prefect_client which triggers Ruff ARG002; remove that
parameter from the method signatures or rename it to _sync_prefect_client for
each test method (e.g., test_run_deployment_sync_basic,
test_run_deployment_sync_with_parameters, test_run_deployment_sync_with_tags,
test_run_deployment_sync_negative_timeout_raises,
test_run_deployment_sync_with_deployment_id) while leaving the
test_deployment_sync fixture (which legitimately uses sync_prefect_client)
unchanged so test discovery and fixture injection still work.
🧹 Nitpick comments (1)
src/prefect/deployments/flow_runs.py (1)

144-145: Remove redundant import inside the function.

dynamic_key_for_task_run is already imported at module scope, so the inner import is unnecessary.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5b84502 and 1ff84aa.

📒 Files selected for processing (3)
  • src/prefect/deployments/__init__.py
  • src/prefect/deployments/flow_runs.py
  • tests/deployment/test_flow_runs.py
🧰 Additional context used
🧬 Code graph analysis (3)
tests/deployment/test_flow_runs.py (1)
src/prefect/deployments/flow_runs.py (1)
  • arun_deployment (48-237)
src/prefect/deployments/__init__.py (1)
src/prefect/deployments/flow_runs.py (1)
  • arun_deployment (48-237)
src/prefect/deployments/flow_runs.py (3)
src/prefect/tasks.py (1)
  • Task (301-1822)
src/prefect/types/_datetime.py (1)
  • now (113-124)
src/prefect/utilities/engine.py (1)
  • collect_task_run_inputs (66-114)
🪛 GitHub Actions: Static analysis
src/prefect/deployments/flow_runs.py

[error] 1-1: ruff-check: F401 (unused-import) and other lint issues detected. 1 error fixed by ruff.


[error] 1-1: ruff-format: 1 file reformatted by ruff. Re-run pre-commit to apply formatting changes.

🪛 Ruff (0.14.14)
tests/deployment/test_flow_runs.py

761-761: Unused method argument: use_hosted_api_server

(ARG002)


800-800: Unused method argument: sync_prefect_client

(ARG002)


817-817: Unused method argument: sync_prefect_client

(ARG002)


835-835: Unused method argument: sync_prefect_client

(ARG002)


852-852: Unused method argument: sync_prefect_client

(ARG002)


867-867: Unused method argument: sync_prefect_client

(ARG002)


924-924: Unused method argument: prefect_client

(ARG002)


925-925: Unused method argument: test_deployment

(ARG002)

src/prefect/deployments/__init__.py

18-18: __all__ is not sorted

Apply an isort-style sorting to __all__

(RUF022)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (37)
  • GitHub Check: Client Tests - python:3.14, postgres:14
  • GitHub Check: Client Tests - python:3.11, postgres:14
  • GitHub Check: Server Tests - python:3.13, sqlite
  • GitHub Check: Server Tests - python:3.11, sqlite
  • GitHub Check: Database and Orchestration Tests - python:3.14, postgres:14
  • GitHub Check: Database and Orchestration Tests - python:3.13, sqlite
  • GitHub Check: Runner, Worker, Settings, Input, and CLI Tests - python:3.11, postgres:14
  • GitHub Check: Runner, Worker, Settings, Input, and CLI Tests - python:3.12, postgres:14
  • GitHub Check: Database and Orchestration Tests - python:3.11, postgres:14
  • GitHub Check: Client Tests - python:3.12, postgres:14
  • GitHub Check: Database and Orchestration Tests - python:3.11, sqlite
  • GitHub Check: Database and Orchestration Tests - python:3.13, postgres:14
  • GitHub Check: Runner, Worker, Settings, Input, and CLI Tests - python:3.10, postgres:14
  • GitHub Check: Database and Orchestration Tests - python:3.12, sqlite
  • GitHub Check: Server Tests - python:3.14, sqlite
  • GitHub Check: Database and Orchestration Tests - python:3.10, sqlite
  • GitHub Check: Client Tests - python:3.13, postgres:14
  • GitHub Check: Runner, Worker, Settings, Input, and CLI Tests - python:3.13, postgres:14
  • GitHub Check: Client Tests - python:3.10, postgres:14
  • GitHub Check: Runner, Worker, Settings, Input, and CLI Tests - python:3.14, postgres:14
  • GitHub Check: Database and Orchestration Tests - python:3.14, sqlite
  • GitHub Check: Database and Orchestration Tests - python:3.12, postgres:14
  • GitHub Check: Database and Orchestration Tests - python:3.10, postgres:14
  • GitHub Check: Server Tests - python:3.12, postgres:14
  • GitHub Check: Server Tests - python:3.13, postgres:14
  • GitHub Check: Server Tests - python:3.12, sqlite
  • GitHub Check: Server Tests - python:3.10, postgres:14
  • GitHub Check: Server Tests - python:3.14, postgres:14
  • GitHub Check: Server Tests - python:3.10, sqlite
  • GitHub Check: Server Tests - python:3.11, postgres:14
  • GitHub Check: typesafety
  • GitHub Check: Benchmark
  • GitHub Check: Benchmark
  • GitHub Check: Prefect Client Compatibility Tests
  • GitHub Check: Test SQLite 3.24.0 Compatibility
  • GitHub Check: Integration tests @main
  • GitHub Check: Test SQLite 3.31.1 Compatibility
🔇 Additional comments (3)
src/prefect/deployments/__init__.py (1)

5-13: Public API export update looks good.

The TYPE_CHECKING import and _public_api entry cleanly expose arun_deployment.

tests/deployment/test_flow_runs.py (2)

1-18: Imports for async-dispatch testing look good.

inspect and arun_deployment are used by the new async-dispatch tests.


601-757: Nice coverage for the explicit arun_deployment entry point.

The new suite exercises parameters, tags, idempotency, job variables, and parent linkage.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.


# Declare API for type-checkers
__all__ = ["initialize_project", "deploy", "run_deployment"]
__all__ = ["initialize_project", "deploy", "arun_deployment", "run_deployment"]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Sort __all__ to satisfy Ruff RUF022.

This will avoid the lint failure flagged by static analysis.

🧹 Suggested fix
-__all__ = ["initialize_project", "deploy", "arun_deployment", "run_deployment"]
+__all__ = ["arun_deployment", "deploy", "initialize_project", "run_deployment"]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
__all__ = ["initialize_project", "deploy", "arun_deployment", "run_deployment"]
__all__ = ["arun_deployment", "deploy", "initialize_project", "run_deployment"]
🧰 Tools
🪛 Ruff (0.14.14)

18-18: __all__ is not sorted

Apply an isort-style sorting to __all__

(RUF022)

🤖 Prompt for AI Agents
In `@src/prefect/deployments/__init__.py` at line 18, The __all__ export list is
not alphabetically sorted which triggers Ruff RUF022; update the __all__
variable in this module so the names are sorted lexicographically (e.g., arrange
"arun_deployment", "deploy", "initialize_project", "run_deployment") to satisfy
the linter for the __all__ declaration.

Comment on lines +133 to 135
client, _ = get_or_create_client(client)

if deployment_id:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find the definition of get_or_create_client
rg "def get_or_create_client|async def get_or_create_client" -B 2 -A 15 -S

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 1221


🏁 Script executed:

#!/bin/bash
# Look at the context around lines 133-135 in flow_runs.py
sed -n '120,150p' src/prefect/deployments/flow_runs.py | cat -n

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 1285


🏁 Script executed:

#!/bin/bash
# Search for usage patterns of get_or_create_client to understand the contract
rg "get_or_create_client" -B 3 -A 3 -S --max-count 10

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 5214


🏁 Script executed:

#!/bin/bash
# Get the full implementation of get_or_create_client
sed -n '31,60p' src/prefect/client/utilities.py | cat -n

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 1346


🏁 Script executed:

#!/bin/bash
# Let's see more lines to capture the complete function
wc -l src/prefect/client/utilities.py

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 121


🏁 Script executed:

#!/bin/bash
# Read a larger section to ensure we get the full function
sed -n '25,70p' src/prefect/client/utilities.py | cat -n

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 1996


🏁 Script executed:

#!/bin/bash
# Look at the function containing line 133-135 to understand the context
sed -n '100,145p' src/prefect/deployments/flow_runs.py | cat -n

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 1792


🏁 Script executed:

#!/bin/bash
# Check if PrefectClient is an async context manager
rg "class PrefectClient" -A 20 -S | head -60

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 1582


🏁 Script executed:

#!/bin/bash
# Check what get_httpx_client returns
rg "def get_client|async def get_client" src/prefect/client/orchestration.py -A 10 -S

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 161


🏁 Script executed:

#!/bin/bash
# Find where get_client is defined
rg "def get_client" src/prefect/client/ -A 10 -S

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 2551


🏁 Script executed:

#!/bin/bash
# Check if PrefectClient has __aenter__ and __aexit__ methods
rg "__aenter__|__aexit__" src/prefect/client/orchestration/ -B 2 -A 5 -S

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 2600


🏁 Script executed:

#!/bin/bash
# Check how the second return value from get_or_create_client is used elsewhere
rg "get_or_create_client" -A 2 -B 0 -S | grep -A 2 "get_or_create_client"

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 2667


🏁 Script executed:

#!/bin/bash
# Look at the full context of the arun_deployment function to see if there's cleanup
sed -n '80,200p' src/prefect/deployments/flow_runs.py | cat -n

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 5783


🏁 Script executed:

#!/bin/bash
# Search for any try/finally or context manager patterns in arun_deployment
sed -n '80,250p' src/prefect/deployments/flow_runs.py | cat -n

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 7325


🏁 Script executed:

#!/bin/bash
# Get the complete get_client function to understand resource management
sed -n '80,130p' src/prefect/client/orchestration/__init__.py | cat -n

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 1669


🏁 Script executed:

#!/bin/bash
# Check if there's any automatic cleanup or context management elsewhere
rg "get_or_create_client" -B 5 -A 10 src/prefect/client/utilities.py | head -80

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 1785


🏁 Script executed:

#!/bin/bash
# Look at the inject_client implementation more carefully to understand the pattern
sed -n '85,110p' src/prefect/client/utilities.py | cat -n

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 839


🏁 Script executed:

#!/bin/bash
# Check the get_client function implementation
rg "def get_client" -A 30 src/prefect/client/orchestration/__init__.py | head -50

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 1349


Close newly created clients from get_or_create_client using context manager.

When get_or_create_client creates a new client (indicated by the second return value being False), the returned PrefectClient must be context managed. The current code discards the boolean flag and uses the client directly without cleanup, causing connection leaks.

The correct pattern is shown in the inject_client decorator in the same utilities module: check the boolean flag and wrap newly created clients as async with client: before use.

Affected pattern
client, _ = get_or_create_client(client)
# ... use client without context manager

Should follow the pattern from inject_client:

client, inferred = get_or_create_client(given)
if not inferred:
    context = client  # Enter as context manager
else:
    context = asyncnullcontext(client)  # No-op for context clients
async with context as new_client:
    # use new_client
🤖 Prompt for AI Agents
In `@src/prefect/deployments/flow_runs.py` around lines 133 - 135, The call to
get_or_create_client in flow_runs.py currently discards the second return value
and may leak connections; update the code to capture the tuple (e.g., client,
inferred = get_or_create_client(client)) and then create a context variable that
is client when not inferred and asyncnullcontext(client) when inferred, then use
"async with context as client:" to ensure newly created PrefectClient instances
are closed (follow the pattern used in the inject_client decorator and use the
asyncnullcontext utility for already-provided clients).

Comment on lines 148 to 150
task_inputs = {
k: await collect_task_run_inputs(v) for k, v in parameters.items()
k: collect_task_run_inputs(v) for k, v in parameters.items()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Find the collect_task_run_inputs function definition
rg -n "async def collect_task_run_inputs|def collect_task_run_inputs" -S

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 225


🏁 Script executed:

# Check the context around lines 148-150 in the target file
cd src/prefect/deployments && cat -n flow_runs.py | head -160 | tail -30

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 1596


🏁 Script executed:

# Get the full context of the function containing lines 148-150
rg -n "def.*:" src/prefect/deployments/flow_runs.py -B 2 -A 1 | head -50

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 323


🏁 Script executed:

# Get the enclosing function definition for lines 148-150
rg -n "^async def |^def " src/prefect/deployments/flow_runs.py -B 1 -A 1 | head -40

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 215


🏁 Script executed:

# Look at the actual enclosing function more carefully - find what function contains line 148
cat -n src/prefect/deployments/flow_runs.py | sed -n '100,160p'

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 2605


🏁 Script executed:

# Get the function signature that contains line 148
cat -n src/prefect/deployments/flow_runs.py | sed -n '90,160p'

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 3332


🏁 Script executed:

# Check the full definition of collect_task_run_inputs to understand if it's truly async
cat -n src/prefect/utilities/engine.py | sed -n '66,90p'

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 1124


🏁 Script executed:

# Check if there's a synchronous version available
cat -n src/prefect/utilities/engine.py | sed -n '117,140p'

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 1130


🏁 Script executed:

# Check how task_inputs is used after being set
cat -n src/prefect/deployments/flow_runs.py | sed -n '148,180p'

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 1744


🏁 Script executed:

# Verify if dict comprehension with await is valid in Python
python3 <<'EOF'
import ast
import sys

# Try to parse a dict comprehension with await
code = """
async def test():
    task_inputs = {
        k: await collect_task_run_inputs(v) for k, v in parameters.items()
    }
"""

try:
    tree = ast.parse(code)
    print("Valid Python syntax")
except SyntaxError as e:
    print(f"SyntaxError: {e}")
EOF

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 105


Add await to collect_task_run_inputs calls to avoid storing coroutines.

collect_task_run_inputs is an async function (as shown in its docstring at src/prefect/utilities/engine.py:66-80); without await, task_inputs will contain coroutine objects instead of actual results, causing serialization failures and breaking dependency tracking. The fix is straightforward since the enclosing arun_deployment function is async.

Suggested fix
         task_inputs = {
-            k: collect_task_run_inputs(v) for k, v in parameters.items()
+            k: await collect_task_run_inputs(v) for k, v in parameters.items()
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
task_inputs = {
k: await collect_task_run_inputs(v) for k, v in parameters.items()
k: collect_task_run_inputs(v) for k, v in parameters.items()
}
task_inputs = {}
for k, v in parameters.items():
task_inputs[k] = await collect_task_run_inputs(v)
Suggested change
task_inputs = {
k: await collect_task_run_inputs(v) for k, v in parameters.items()
k: collect_task_run_inputs(v) for k, v in parameters.items()
}
task_inputs = {k: await collect_task_run_inputs(v) async for k, v in parameters.items()}
🤖 Prompt for AI Agents
In `@src/prefect/deployments/flow_runs.py` around lines 148 - 150, The dict
comprehension creating task_inputs is calling the async helper
collect_task_run_inputs without awaiting it, which stores coroutine objects
instead of results; update the comprehension in arun_deployment so each value
uses await collect_task_run_inputs(v) (e.g., task_inputs = {k: await
collect_task_run_inputs(v) for k, v in parameters.items()}) so task_inputs
contains resolved values suitable for serialization and dependency tracking.

Comment on lines +240 to +241
# Alias for backwards compatibility
run_deployment = arun_deployment
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's examine the file at the specified lines and surrounding context
cat -n src/prefect/deployments/flow_runs.py | sed -n '230,250p'

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 514


🏁 Script executed:

# Find async_dispatch definition and usage
rg -n "async_dispatch" -C 3 -S

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 50400


🏁 Script executed:

# Search for arun_deployment definition
rg -n "def arun_deployment" -C 5 -A 10

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 968


🏁 Script executed:

# Search for _sync parameter usage in tests to confirm
rg -n "_sync\s*=" --type py -C 2 | head -100

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 5644


🏁 Script executed:

# Look for .aio attribute or similar pattern
rg -n "\.aio" -C 2

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 33589


Use @async_dispatch decorator with sync wrapper instead of simple alias.

The current alias run_deployment = arun_deployment breaks the sync/async dispatch mechanism. A simple alias removes the wrapper that enables:

  • Sync mode execution with _sync=True (tests rely on this)
  • The .aio attribute that references the async implementation (test at tests/deployment/test_flow_runs.py:927-930 expects it)

Follow the pattern used throughout the codebase (see async_dispatch in flows.py, variables.py, etc.): create a sync wrapper function decorated with @async_dispatch(arun_deployment) that mirrors the async signature and sets .aio = arun_deployment.

🤖 Prompt for AI Agents
In `@src/prefect/deployments/flow_runs.py` around lines 240 - 241, Replace the
simple alias run_deployment = arun_deployment with a synchronous wrapper
function decorated using `@async_dispatch`(arun_deployment): define def
run_deployment(...same params as arun_deployment..., _sync: bool = False, *args,
**kwargs) that calls into the async dispatcher (keeping the signature same as
the async version so sync callers work), and then set run_deployment.aio =
arun_deployment; this restores sync execution via _sync=True and exposes the
.aio attribute while using async_dispatch to delegate to arun_deployment.

Comment on lines +758 to +763
async def test_arun_deployment_links_to_parent_flow(
self,
test_deployment: DeploymentResponse,
use_hosted_api_server,
prefect_client: PrefectClient,
):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Unused use_hosted_api_server fixture argument.

Ruff ARG002 will fire; either remove it or convert to @pytest.mark.usefixtures("use_hosted_api_server") / prefix with underscore.

🧰 Tools
🪛 Ruff (0.14.14)

761-761: Unused method argument: use_hosted_api_server

(ARG002)

🤖 Prompt for AI Agents
In `@tests/deployment/test_flow_runs.py` around lines 758 - 763, The test function
test_arun_deployment_links_to_parent_flow has an unused fixture parameter
use_hosted_api_server which triggers Ruff ARG002; either remove the
use_hosted_api_server parameter from the function signature, change the
decorator to `@pytest.mark.usefixtures`("use_hosted_api_server") to keep the
fixture without a param, or rename the parameter to _use_hosted_api_server to
indicate it is intentionally unused.

Comment on lines +782 to +880
class TestRunDeploymentSyncContext:
"""Tests for run_deployment behavior in sync context."""

@pytest.fixture
def test_deployment_sync(self, sync_prefect_client):
flow_id = sync_prefect_client.create_flow_from_name("foo-sync")

deployment_id = sync_prefect_client.create_deployment(
name="foo-sync-deployment",
flow_id=flow_id,
parameter_openapi_schema={"type": "object", "properties": {}},
)
deployment = sync_prefect_client.read_deployment(deployment_id)

return deployment

def test_run_deployment_sync_basic(
self,
sync_prefect_client,
test_deployment_sync,
):
"""Test that run_deployment works in a synchronous context."""
deployment = test_deployment_sync
# Force sync execution using _sync parameter
flow_run = run_deployment(
f"foo-sync/{deployment.name}",
timeout=0,
poll_interval=0,
_sync=True,
)
assert flow_run.deployment_id == deployment.id
assert flow_run.state

def test_run_deployment_sync_with_parameters(
self,
sync_prefect_client,
test_deployment_sync,
):
"""Test that run_deployment in sync context passes parameters correctly."""
deployment = test_deployment_sync

flow_run = run_deployment(
f"foo-sync/{deployment.name}",
parameters={"sync_param": "sync_value"},
timeout=0,
poll_interval=0,
_sync=True,
)
assert flow_run.deployment_id == deployment.id
assert flow_run.parameters == {"sync_param": "sync_value"}

def test_run_deployment_sync_with_tags(
self,
sync_prefect_client,
test_deployment_sync,
):
"""Test that run_deployment in sync context accepts tags."""
deployment = test_deployment_sync

flow_run = run_deployment(
f"foo-sync/{deployment.name}",
tags=["sync", "test"],
timeout=0,
poll_interval=0,
_sync=True,
)
assert sorted(flow_run.tags) == ["sync", "test"]

def test_run_deployment_sync_negative_timeout_raises(
self,
sync_prefect_client,
test_deployment_sync,
):
"""Test that run_deployment in sync context raises on negative timeout."""
deployment = test_deployment_sync

with pytest.raises(ValueError, match="`timeout` cannot be negative"):
run_deployment(
f"foo-sync/{deployment.name}",
timeout=-1,
_sync=True,
)

def test_run_deployment_sync_with_deployment_id(
self,
sync_prefect_client,
test_deployment_sync,
):
"""Test that run_deployment in sync context works with deployment UUID."""
deployment = test_deployment_sync

flow_run = run_deployment(
deployment.id,
timeout=0,
poll_interval=0,
_sync=True,
)
assert flow_run.deployment_id == deployment.id
assert flow_run.state
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Unused sync_prefect_client parameters in sync tests.

These args are not referenced and trigger Ruff ARG002. Remove them (or prefix with _) across the class.

🧹 Example fix (apply similarly to all methods in this class)
-    def test_run_deployment_sync_basic(
-        self,
-        sync_prefect_client,
-        test_deployment_sync,
-    ):
+    def test_run_deployment_sync_basic(
+        self,
+        test_deployment_sync,
+    ):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class TestRunDeploymentSyncContext:
"""Tests for run_deployment behavior in sync context."""
@pytest.fixture
def test_deployment_sync(self, sync_prefect_client):
flow_id = sync_prefect_client.create_flow_from_name("foo-sync")
deployment_id = sync_prefect_client.create_deployment(
name="foo-sync-deployment",
flow_id=flow_id,
parameter_openapi_schema={"type": "object", "properties": {}},
)
deployment = sync_prefect_client.read_deployment(deployment_id)
return deployment
def test_run_deployment_sync_basic(
self,
sync_prefect_client,
test_deployment_sync,
):
"""Test that run_deployment works in a synchronous context."""
deployment = test_deployment_sync
# Force sync execution using _sync parameter
flow_run = run_deployment(
f"foo-sync/{deployment.name}",
timeout=0,
poll_interval=0,
_sync=True,
)
assert flow_run.deployment_id == deployment.id
assert flow_run.state
def test_run_deployment_sync_with_parameters(
self,
sync_prefect_client,
test_deployment_sync,
):
"""Test that run_deployment in sync context passes parameters correctly."""
deployment = test_deployment_sync
flow_run = run_deployment(
f"foo-sync/{deployment.name}",
parameters={"sync_param": "sync_value"},
timeout=0,
poll_interval=0,
_sync=True,
)
assert flow_run.deployment_id == deployment.id
assert flow_run.parameters == {"sync_param": "sync_value"}
def test_run_deployment_sync_with_tags(
self,
sync_prefect_client,
test_deployment_sync,
):
"""Test that run_deployment in sync context accepts tags."""
deployment = test_deployment_sync
flow_run = run_deployment(
f"foo-sync/{deployment.name}",
tags=["sync", "test"],
timeout=0,
poll_interval=0,
_sync=True,
)
assert sorted(flow_run.tags) == ["sync", "test"]
def test_run_deployment_sync_negative_timeout_raises(
self,
sync_prefect_client,
test_deployment_sync,
):
"""Test that run_deployment in sync context raises on negative timeout."""
deployment = test_deployment_sync
with pytest.raises(ValueError, match="`timeout` cannot be negative"):
run_deployment(
f"foo-sync/{deployment.name}",
timeout=-1,
_sync=True,
)
def test_run_deployment_sync_with_deployment_id(
self,
sync_prefect_client,
test_deployment_sync,
):
"""Test that run_deployment in sync context works with deployment UUID."""
deployment = test_deployment_sync
flow_run = run_deployment(
deployment.id,
timeout=0,
poll_interval=0,
_sync=True,
)
assert flow_run.deployment_id == deployment.id
assert flow_run.state
class TestRunDeploymentSyncContext:
"""Tests for run_deployment behavior in sync context."""
`@pytest.fixture`
def test_deployment_sync(self, sync_prefect_client):
flow_id = sync_prefect_client.create_flow_from_name("foo-sync")
deployment_id = sync_prefect_client.create_deployment(
name="foo-sync-deployment",
flow_id=flow_id,
parameter_openapi_schema={"type": "object", "properties": {}},
)
deployment = sync_prefect_client.read_deployment(deployment_id)
return deployment
def test_run_deployment_sync_basic(
self,
test_deployment_sync,
):
"""Test that run_deployment works in a synchronous context."""
deployment = test_deployment_sync
# Force sync execution using _sync parameter
flow_run = run_deployment(
f"foo-sync/{deployment.name}",
timeout=0,
poll_interval=0,
_sync=True,
)
assert flow_run.deployment_id == deployment.id
assert flow_run.state
def test_run_deployment_sync_with_parameters(
self,
test_deployment_sync,
):
"""Test that run_deployment in sync context passes parameters correctly."""
deployment = test_deployment_sync
flow_run = run_deployment(
f"foo-sync/{deployment.name}",
parameters={"sync_param": "sync_value"},
timeout=0,
poll_interval=0,
_sync=True,
)
assert flow_run.deployment_id == deployment.id
assert flow_run.parameters == {"sync_param": "sync_value"}
def test_run_deployment_sync_with_tags(
self,
test_deployment_sync,
):
"""Test that run_deployment in sync context accepts tags."""
deployment = test_deployment_sync
flow_run = run_deployment(
f"foo-sync/{deployment.name}",
tags=["sync", "test"],
timeout=0,
poll_interval=0,
_sync=True,
)
assert sorted(flow_run.tags) == ["sync", "test"]
def test_run_deployment_sync_negative_timeout_raises(
self,
test_deployment_sync,
):
"""Test that run_deployment in sync context raises on negative timeout."""
deployment = test_deployment_sync
with pytest.raises(ValueError, match="`timeout` cannot be negative"):
run_deployment(
f"foo-sync/{deployment.name}",
timeout=-1,
_sync=True,
)
def test_run_deployment_sync_with_deployment_id(
self,
test_deployment_sync,
):
"""Test that run_deployment in sync context works with deployment UUID."""
deployment = test_deployment_sync
flow_run = run_deployment(
deployment.id,
timeout=0,
poll_interval=0,
_sync=True,
)
assert flow_run.deployment_id == deployment.id
assert flow_run.state
🧰 Tools
🪛 Ruff (0.14.14)

800-800: Unused method argument: sync_prefect_client

(ARG002)


817-817: Unused method argument: sync_prefect_client

(ARG002)


835-835: Unused method argument: sync_prefect_client

(ARG002)


852-852: Unused method argument: sync_prefect_client

(ARG002)


867-867: Unused method argument: sync_prefect_client

(ARG002)

🤖 Prompt for AI Agents
In `@tests/deployment/test_flow_runs.py` around lines 782 - 880, The test methods
in TestRunDeploymentSyncContext declare an unused parameter sync_prefect_client
which triggers Ruff ARG002; remove that parameter from the method signatures or
rename it to _sync_prefect_client for each test method (e.g.,
test_run_deployment_sync_basic, test_run_deployment_sync_with_parameters,
test_run_deployment_sync_with_tags,
test_run_deployment_sync_negative_timeout_raises,
test_run_deployment_sync_with_deployment_id) while leaving the
test_deployment_sync fixture (which legitimately uses sync_prefect_client)
unchanged so test discovery and fixture injection still work.

Comment on lines +922 to +926
async def test_run_deployment_aio_attribute(
self,
prefect_client: PrefectClient,
test_deployment: DeploymentResponse,
):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Unused prefect_client / test_deployment in .aio attribute test.

Drop them or prefix with _ to avoid ARG002.

🧹 Suggested fix
-    async def test_run_deployment_aio_attribute(
-        self,
-        prefect_client: PrefectClient,
-        test_deployment: DeploymentResponse,
-    ):
+    async def test_run_deployment_aio_attribute(
+        self,
+    ):
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def test_run_deployment_aio_attribute(
self,
prefect_client: PrefectClient,
test_deployment: DeploymentResponse,
):
async def test_run_deployment_aio_attribute(
self,
):
🧰 Tools
🪛 Ruff (0.14.14)

924-924: Unused method argument: prefect_client

(ARG002)


925-925: Unused method argument: test_deployment

(ARG002)

🤖 Prompt for AI Agents
In `@tests/deployment/test_flow_runs.py` around lines 922 - 926, The test function
test_run_deployment_aio_attribute currently declares unused parameters
prefect_client and test_deployment which triggers lint ARG002; remove these
parameters or rename them to _prefect_client and _test_deployment (or prefix
with an underscore) so they are recognized as intentionally unused. Update the
function signature in the test_run_deployment_aio_attribute definition
accordingly and run tests/linter to confirm the warning is resolved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants