Add arun_deployment and replace @sync_compatible with @async_dispatch#32
Conversation
This change follows the intent of issue PrefectHQ#15008 to replace implicit sync/async conversion with explicit, type-safe alternatives. Changes: - Add `arun_deployment` as an explicit async function for running deployments - Replace `@sync_compatible` with `@async_dispatch` on `run_deployment` - `run_deployment` now dispatches to `arun_deployment` in async context - Sync context uses `SyncPrefectClient` directly (no event loop magic) - Export `arun_deployment` from `prefect.deployments` - Add comprehensive tests for both sync and async behavior The `run_deployment.aio` attribute is preserved for backward compatibility. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
WalkthroughThe pull request introduces Changes
Poem
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes 🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/prefect/deployments/flow_runs.py (1)
123-139: Ensuredeployment_idis set after lookup by name.When
nameis not a UUID,deployment_idstaysNone, but you pass it intocreate_flow_run_from_deployment, which likely expects a UUID. Set it from the fetched deployment to avoid passingNone.✅ Suggested fix
- if deployment_id: - deployment = await client.read_deployment(deployment_id=deployment_id) - else: - deployment = await client.read_deployment_by_name(name) + if deployment_id: + deployment = await client.read_deployment(deployment_id=deployment_id) + else: + deployment = await client.read_deployment_by_name(name) + deployment_id = deployment.idAlso applies to: 211-212
🤖 Fix all issues with AI agents
In `@src/prefect/deployments/__init__.py`:
- Line 18: The __all__ export list is not alphabetically sorted which triggers
Ruff RUF022; update the __all__ variable in this module so the names are sorted
lexicographically (e.g., arrange "arun_deployment", "deploy",
"initialize_project", "run_deployment") to satisfy the linter for the __all__
declaration.
In `@src/prefect/deployments/flow_runs.py`:
- Around line 240-241: Replace the simple alias run_deployment = arun_deployment
with a synchronous wrapper function decorated using
`@async_dispatch`(arun_deployment): define def run_deployment(...same params as
arun_deployment..., _sync: bool = False, *args, **kwargs) that calls into the
async dispatcher (keeping the signature same as the async version so sync
callers work), and then set run_deployment.aio = arun_deployment; this restores
sync execution via _sync=True and exposes the .aio attribute while using
async_dispatch to delegate to arun_deployment.
- Around line 148-150: The dict comprehension creating task_inputs is calling
the async helper collect_task_run_inputs without awaiting it, which stores
coroutine objects instead of results; update the comprehension in
arun_deployment so each value uses await collect_task_run_inputs(v) (e.g.,
task_inputs = {k: await collect_task_run_inputs(v) for k, v in
parameters.items()}) so task_inputs contains resolved values suitable for
serialization and dependency tracking.
- Around line 133-135: The call to get_or_create_client in flow_runs.py
currently discards the second return value and may leak connections; update the
code to capture the tuple (e.g., client, inferred =
get_or_create_client(client)) and then create a context variable that is client
when not inferred and asyncnullcontext(client) when inferred, then use "async
with context as client:" to ensure newly created PrefectClient instances are
closed (follow the pattern used in the inject_client decorator and use the
asyncnullcontext utility for already-provided clients).
In `@tests/deployment/test_flow_runs.py`:
- Around line 758-763: The test function
test_arun_deployment_links_to_parent_flow has an unused fixture parameter
use_hosted_api_server which triggers Ruff ARG002; either remove the
use_hosted_api_server parameter from the function signature, change the
decorator to `@pytest.mark.usefixtures`("use_hosted_api_server") to keep the
fixture without a param, or rename the parameter to _use_hosted_api_server to
indicate it is intentionally unused.
- Around line 922-926: The test function test_run_deployment_aio_attribute
currently declares unused parameters prefect_client and test_deployment which
triggers lint ARG002; remove these parameters or rename them to _prefect_client
and _test_deployment (or prefix with an underscore) so they are recognized as
intentionally unused. Update the function signature in the
test_run_deployment_aio_attribute definition accordingly and run tests/linter to
confirm the warning is resolved.
- Around line 782-880: The test methods in TestRunDeploymentSyncContext declare
an unused parameter sync_prefect_client which triggers Ruff ARG002; remove that
parameter from the method signatures or rename it to _sync_prefect_client for
each test method (e.g., test_run_deployment_sync_basic,
test_run_deployment_sync_with_parameters, test_run_deployment_sync_with_tags,
test_run_deployment_sync_negative_timeout_raises,
test_run_deployment_sync_with_deployment_id) while leaving the
test_deployment_sync fixture (which legitimately uses sync_prefect_client)
unchanged so test discovery and fixture injection still work.
🧹 Nitpick comments (1)
src/prefect/deployments/flow_runs.py (1)
144-145: Remove redundant import inside the function.
dynamic_key_for_task_runis already imported at module scope, so the inner import is unnecessary.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
src/prefect/deployments/__init__.pysrc/prefect/deployments/flow_runs.pytests/deployment/test_flow_runs.py
🧰 Additional context used
🧬 Code graph analysis (3)
tests/deployment/test_flow_runs.py (1)
src/prefect/deployments/flow_runs.py (1)
arun_deployment(48-237)
src/prefect/deployments/__init__.py (1)
src/prefect/deployments/flow_runs.py (1)
arun_deployment(48-237)
src/prefect/deployments/flow_runs.py (3)
src/prefect/tasks.py (1)
Task(301-1822)src/prefect/types/_datetime.py (1)
now(113-124)src/prefect/utilities/engine.py (1)
collect_task_run_inputs(66-114)
🪛 GitHub Actions: Static analysis
src/prefect/deployments/flow_runs.py
[error] 1-1: ruff-check: F401 (unused-import) and other lint issues detected. 1 error fixed by ruff.
[error] 1-1: ruff-format: 1 file reformatted by ruff. Re-run pre-commit to apply formatting changes.
🪛 Ruff (0.14.14)
tests/deployment/test_flow_runs.py
761-761: Unused method argument: use_hosted_api_server
(ARG002)
800-800: Unused method argument: sync_prefect_client
(ARG002)
817-817: Unused method argument: sync_prefect_client
(ARG002)
835-835: Unused method argument: sync_prefect_client
(ARG002)
852-852: Unused method argument: sync_prefect_client
(ARG002)
867-867: Unused method argument: sync_prefect_client
(ARG002)
924-924: Unused method argument: prefect_client
(ARG002)
925-925: Unused method argument: test_deployment
(ARG002)
src/prefect/deployments/__init__.py
18-18: __all__ is not sorted
Apply an isort-style sorting to __all__
(RUF022)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (37)
- GitHub Check: Client Tests - python:3.14, postgres:14
- GitHub Check: Client Tests - python:3.11, postgres:14
- GitHub Check: Server Tests - python:3.13, sqlite
- GitHub Check: Server Tests - python:3.11, sqlite
- GitHub Check: Database and Orchestration Tests - python:3.14, postgres:14
- GitHub Check: Database and Orchestration Tests - python:3.13, sqlite
- GitHub Check: Runner, Worker, Settings, Input, and CLI Tests - python:3.11, postgres:14
- GitHub Check: Runner, Worker, Settings, Input, and CLI Tests - python:3.12, postgres:14
- GitHub Check: Database and Orchestration Tests - python:3.11, postgres:14
- GitHub Check: Client Tests - python:3.12, postgres:14
- GitHub Check: Database and Orchestration Tests - python:3.11, sqlite
- GitHub Check: Database and Orchestration Tests - python:3.13, postgres:14
- GitHub Check: Runner, Worker, Settings, Input, and CLI Tests - python:3.10, postgres:14
- GitHub Check: Database and Orchestration Tests - python:3.12, sqlite
- GitHub Check: Server Tests - python:3.14, sqlite
- GitHub Check: Database and Orchestration Tests - python:3.10, sqlite
- GitHub Check: Client Tests - python:3.13, postgres:14
- GitHub Check: Runner, Worker, Settings, Input, and CLI Tests - python:3.13, postgres:14
- GitHub Check: Client Tests - python:3.10, postgres:14
- GitHub Check: Runner, Worker, Settings, Input, and CLI Tests - python:3.14, postgres:14
- GitHub Check: Database and Orchestration Tests - python:3.14, sqlite
- GitHub Check: Database and Orchestration Tests - python:3.12, postgres:14
- GitHub Check: Database and Orchestration Tests - python:3.10, postgres:14
- GitHub Check: Server Tests - python:3.12, postgres:14
- GitHub Check: Server Tests - python:3.13, postgres:14
- GitHub Check: Server Tests - python:3.12, sqlite
- GitHub Check: Server Tests - python:3.10, postgres:14
- GitHub Check: Server Tests - python:3.14, postgres:14
- GitHub Check: Server Tests - python:3.10, sqlite
- GitHub Check: Server Tests - python:3.11, postgres:14
- GitHub Check: typesafety
- GitHub Check: Benchmark
- GitHub Check: Benchmark
- GitHub Check: Prefect Client Compatibility Tests
- GitHub Check: Test SQLite 3.24.0 Compatibility
- GitHub Check: Integration tests
@main - GitHub Check: Test SQLite 3.31.1 Compatibility
🔇 Additional comments (3)
src/prefect/deployments/__init__.py (1)
5-13: Public API export update looks good.The TYPE_CHECKING import and _public_api entry cleanly expose
arun_deployment.tests/deployment/test_flow_runs.py (2)
1-18: Imports for async-dispatch testing look good.
inspectandarun_deploymentare used by the new async-dispatch tests.
601-757: Nice coverage for the explicitarun_deploymententry point.The new suite exercises parameters, tags, idempotency, job variables, and parent linkage.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
|
|
||
| # Declare API for type-checkers | ||
| __all__ = ["initialize_project", "deploy", "run_deployment"] | ||
| __all__ = ["initialize_project", "deploy", "arun_deployment", "run_deployment"] |
There was a problem hiding this comment.
Sort __all__ to satisfy Ruff RUF022.
This will avoid the lint failure flagged by static analysis.
🧹 Suggested fix
-__all__ = ["initialize_project", "deploy", "arun_deployment", "run_deployment"]
+__all__ = ["arun_deployment", "deploy", "initialize_project", "run_deployment"]📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| __all__ = ["initialize_project", "deploy", "arun_deployment", "run_deployment"] | |
| __all__ = ["arun_deployment", "deploy", "initialize_project", "run_deployment"] |
🧰 Tools
🪛 Ruff (0.14.14)
18-18: __all__ is not sorted
Apply an isort-style sorting to __all__
(RUF022)
🤖 Prompt for AI Agents
In `@src/prefect/deployments/__init__.py` at line 18, The __all__ export list is
not alphabetically sorted which triggers Ruff RUF022; update the __all__
variable in this module so the names are sorted lexicographically (e.g., arrange
"arun_deployment", "deploy", "initialize_project", "run_deployment") to satisfy
the linter for the __all__ declaration.
| client, _ = get_or_create_client(client) | ||
|
|
||
| if deployment_id: |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find the definition of get_or_create_client
rg "def get_or_create_client|async def get_or_create_client" -B 2 -A 15 -SRepository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 1221
🏁 Script executed:
#!/bin/bash
# Look at the context around lines 133-135 in flow_runs.py
sed -n '120,150p' src/prefect/deployments/flow_runs.py | cat -nRepository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 1285
🏁 Script executed:
#!/bin/bash
# Search for usage patterns of get_or_create_client to understand the contract
rg "get_or_create_client" -B 3 -A 3 -S --max-count 10Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 5214
🏁 Script executed:
#!/bin/bash
# Get the full implementation of get_or_create_client
sed -n '31,60p' src/prefect/client/utilities.py | cat -nRepository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 1346
🏁 Script executed:
#!/bin/bash
# Let's see more lines to capture the complete function
wc -l src/prefect/client/utilities.pyRepository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 121
🏁 Script executed:
#!/bin/bash
# Read a larger section to ensure we get the full function
sed -n '25,70p' src/prefect/client/utilities.py | cat -nRepository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 1996
🏁 Script executed:
#!/bin/bash
# Look at the function containing line 133-135 to understand the context
sed -n '100,145p' src/prefect/deployments/flow_runs.py | cat -nRepository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 1792
🏁 Script executed:
#!/bin/bash
# Check if PrefectClient is an async context manager
rg "class PrefectClient" -A 20 -S | head -60Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 1582
🏁 Script executed:
#!/bin/bash
# Check what get_httpx_client returns
rg "def get_client|async def get_client" src/prefect/client/orchestration.py -A 10 -SRepository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 161
🏁 Script executed:
#!/bin/bash
# Find where get_client is defined
rg "def get_client" src/prefect/client/ -A 10 -SRepository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 2551
🏁 Script executed:
#!/bin/bash
# Check if PrefectClient has __aenter__ and __aexit__ methods
rg "__aenter__|__aexit__" src/prefect/client/orchestration/ -B 2 -A 5 -SRepository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 2600
🏁 Script executed:
#!/bin/bash
# Check how the second return value from get_or_create_client is used elsewhere
rg "get_or_create_client" -A 2 -B 0 -S | grep -A 2 "get_or_create_client"Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 2667
🏁 Script executed:
#!/bin/bash
# Look at the full context of the arun_deployment function to see if there's cleanup
sed -n '80,200p' src/prefect/deployments/flow_runs.py | cat -nRepository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 5783
🏁 Script executed:
#!/bin/bash
# Search for any try/finally or context manager patterns in arun_deployment
sed -n '80,250p' src/prefect/deployments/flow_runs.py | cat -nRepository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 7325
🏁 Script executed:
#!/bin/bash
# Get the complete get_client function to understand resource management
sed -n '80,130p' src/prefect/client/orchestration/__init__.py | cat -nRepository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 1669
🏁 Script executed:
#!/bin/bash
# Check if there's any automatic cleanup or context management elsewhere
rg "get_or_create_client" -B 5 -A 10 src/prefect/client/utilities.py | head -80Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 1785
🏁 Script executed:
#!/bin/bash
# Look at the inject_client implementation more carefully to understand the pattern
sed -n '85,110p' src/prefect/client/utilities.py | cat -nRepository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 839
🏁 Script executed:
#!/bin/bash
# Check the get_client function implementation
rg "def get_client" -A 30 src/prefect/client/orchestration/__init__.py | head -50Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 1349
Close newly created clients from get_or_create_client using context manager.
When get_or_create_client creates a new client (indicated by the second return value being False), the returned PrefectClient must be context managed. The current code discards the boolean flag and uses the client directly without cleanup, causing connection leaks.
The correct pattern is shown in the inject_client decorator in the same utilities module: check the boolean flag and wrap newly created clients as async with client: before use.
Affected pattern
client, _ = get_or_create_client(client)
# ... use client without context managerShould follow the pattern from inject_client:
client, inferred = get_or_create_client(given)
if not inferred:
context = client # Enter as context manager
else:
context = asyncnullcontext(client) # No-op for context clients
async with context as new_client:
# use new_client🤖 Prompt for AI Agents
In `@src/prefect/deployments/flow_runs.py` around lines 133 - 135, The call to
get_or_create_client in flow_runs.py currently discards the second return value
and may leak connections; update the code to capture the tuple (e.g., client,
inferred = get_or_create_client(client)) and then create a context variable that
is client when not inferred and asyncnullcontext(client) when inferred, then use
"async with context as client:" to ensure newly created PrefectClient instances
are closed (follow the pattern used in the inject_client decorator and use the
asyncnullcontext utility for already-provided clients).
| task_inputs = { | ||
| k: await collect_task_run_inputs(v) for k, v in parameters.items() | ||
| k: collect_task_run_inputs(v) for k, v in parameters.items() | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Find the collect_task_run_inputs function definition
rg -n "async def collect_task_run_inputs|def collect_task_run_inputs" -SRepository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 225
🏁 Script executed:
# Check the context around lines 148-150 in the target file
cd src/prefect/deployments && cat -n flow_runs.py | head -160 | tail -30Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 1596
🏁 Script executed:
# Get the full context of the function containing lines 148-150
rg -n "def.*:" src/prefect/deployments/flow_runs.py -B 2 -A 1 | head -50Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 323
🏁 Script executed:
# Get the enclosing function definition for lines 148-150
rg -n "^async def |^def " src/prefect/deployments/flow_runs.py -B 1 -A 1 | head -40Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 215
🏁 Script executed:
# Look at the actual enclosing function more carefully - find what function contains line 148
cat -n src/prefect/deployments/flow_runs.py | sed -n '100,160p'Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 2605
🏁 Script executed:
# Get the function signature that contains line 148
cat -n src/prefect/deployments/flow_runs.py | sed -n '90,160p'Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 3332
🏁 Script executed:
# Check the full definition of collect_task_run_inputs to understand if it's truly async
cat -n src/prefect/utilities/engine.py | sed -n '66,90p'Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 1124
🏁 Script executed:
# Check if there's a synchronous version available
cat -n src/prefect/utilities/engine.py | sed -n '117,140p'Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 1130
🏁 Script executed:
# Check how task_inputs is used after being set
cat -n src/prefect/deployments/flow_runs.py | sed -n '148,180p'Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 1744
🏁 Script executed:
# Verify if dict comprehension with await is valid in Python
python3 <<'EOF'
import ast
import sys
# Try to parse a dict comprehension with await
code = """
async def test():
task_inputs = {
k: await collect_task_run_inputs(v) for k, v in parameters.items()
}
"""
try:
tree = ast.parse(code)
print("Valid Python syntax")
except SyntaxError as e:
print(f"SyntaxError: {e}")
EOFRepository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 105
Add await to collect_task_run_inputs calls to avoid storing coroutines.
collect_task_run_inputs is an async function (as shown in its docstring at src/prefect/utilities/engine.py:66-80); without await, task_inputs will contain coroutine objects instead of actual results, causing serialization failures and breaking dependency tracking. The fix is straightforward since the enclosing arun_deployment function is async.
Suggested fix
task_inputs = {
- k: collect_task_run_inputs(v) for k, v in parameters.items()
+ k: await collect_task_run_inputs(v) for k, v in parameters.items()
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| task_inputs = { | |
| k: await collect_task_run_inputs(v) for k, v in parameters.items() | |
| k: collect_task_run_inputs(v) for k, v in parameters.items() | |
| } | |
| task_inputs = {} | |
| for k, v in parameters.items(): | |
| task_inputs[k] = await collect_task_run_inputs(v) |
| task_inputs = { | |
| k: await collect_task_run_inputs(v) for k, v in parameters.items() | |
| k: collect_task_run_inputs(v) for k, v in parameters.items() | |
| } | |
| task_inputs = {k: await collect_task_run_inputs(v) async for k, v in parameters.items()} |
🤖 Prompt for AI Agents
In `@src/prefect/deployments/flow_runs.py` around lines 148 - 150, The dict
comprehension creating task_inputs is calling the async helper
collect_task_run_inputs without awaiting it, which stores coroutine objects
instead of results; update the comprehension in arun_deployment so each value
uses await collect_task_run_inputs(v) (e.g., task_inputs = {k: await
collect_task_run_inputs(v) for k, v in parameters.items()}) so task_inputs
contains resolved values suitable for serialization and dependency tracking.
| # Alias for backwards compatibility | ||
| run_deployment = arun_deployment |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's examine the file at the specified lines and surrounding context
cat -n src/prefect/deployments/flow_runs.py | sed -n '230,250p'Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 514
🏁 Script executed:
# Find async_dispatch definition and usage
rg -n "async_dispatch" -C 3 -SRepository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 50400
🏁 Script executed:
# Search for arun_deployment definition
rg -n "def arun_deployment" -C 5 -A 10Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 968
🏁 Script executed:
# Search for _sync parameter usage in tests to confirm
rg -n "_sync\s*=" --type py -C 2 | head -100Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 5644
🏁 Script executed:
# Look for .aio attribute or similar pattern
rg -n "\.aio" -C 2Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 33589
Use @async_dispatch decorator with sync wrapper instead of simple alias.
The current alias run_deployment = arun_deployment breaks the sync/async dispatch mechanism. A simple alias removes the wrapper that enables:
- Sync mode execution with
_sync=True(tests rely on this) - The
.aioattribute that references the async implementation (test attests/deployment/test_flow_runs.py:927-930expects it)
Follow the pattern used throughout the codebase (see async_dispatch in flows.py, variables.py, etc.): create a sync wrapper function decorated with @async_dispatch(arun_deployment) that mirrors the async signature and sets .aio = arun_deployment.
🤖 Prompt for AI Agents
In `@src/prefect/deployments/flow_runs.py` around lines 240 - 241, Replace the
simple alias run_deployment = arun_deployment with a synchronous wrapper
function decorated using `@async_dispatch`(arun_deployment): define def
run_deployment(...same params as arun_deployment..., _sync: bool = False, *args,
**kwargs) that calls into the async dispatcher (keeping the signature same as
the async version so sync callers work), and then set run_deployment.aio =
arun_deployment; this restores sync execution via _sync=True and exposes the
.aio attribute while using async_dispatch to delegate to arun_deployment.
| async def test_arun_deployment_links_to_parent_flow( | ||
| self, | ||
| test_deployment: DeploymentResponse, | ||
| use_hosted_api_server, | ||
| prefect_client: PrefectClient, | ||
| ): |
There was a problem hiding this comment.
Unused use_hosted_api_server fixture argument.
Ruff ARG002 will fire; either remove it or convert to @pytest.mark.usefixtures("use_hosted_api_server") / prefix with underscore.
🧰 Tools
🪛 Ruff (0.14.14)
761-761: Unused method argument: use_hosted_api_server
(ARG002)
🤖 Prompt for AI Agents
In `@tests/deployment/test_flow_runs.py` around lines 758 - 763, The test function
test_arun_deployment_links_to_parent_flow has an unused fixture parameter
use_hosted_api_server which triggers Ruff ARG002; either remove the
use_hosted_api_server parameter from the function signature, change the
decorator to `@pytest.mark.usefixtures`("use_hosted_api_server") to keep the
fixture without a param, or rename the parameter to _use_hosted_api_server to
indicate it is intentionally unused.
| class TestRunDeploymentSyncContext: | ||
| """Tests for run_deployment behavior in sync context.""" | ||
|
|
||
| @pytest.fixture | ||
| def test_deployment_sync(self, sync_prefect_client): | ||
| flow_id = sync_prefect_client.create_flow_from_name("foo-sync") | ||
|
|
||
| deployment_id = sync_prefect_client.create_deployment( | ||
| name="foo-sync-deployment", | ||
| flow_id=flow_id, | ||
| parameter_openapi_schema={"type": "object", "properties": {}}, | ||
| ) | ||
| deployment = sync_prefect_client.read_deployment(deployment_id) | ||
|
|
||
| return deployment | ||
|
|
||
| def test_run_deployment_sync_basic( | ||
| self, | ||
| sync_prefect_client, | ||
| test_deployment_sync, | ||
| ): | ||
| """Test that run_deployment works in a synchronous context.""" | ||
| deployment = test_deployment_sync | ||
| # Force sync execution using _sync parameter | ||
| flow_run = run_deployment( | ||
| f"foo-sync/{deployment.name}", | ||
| timeout=0, | ||
| poll_interval=0, | ||
| _sync=True, | ||
| ) | ||
| assert flow_run.deployment_id == deployment.id | ||
| assert flow_run.state | ||
|
|
||
| def test_run_deployment_sync_with_parameters( | ||
| self, | ||
| sync_prefect_client, | ||
| test_deployment_sync, | ||
| ): | ||
| """Test that run_deployment in sync context passes parameters correctly.""" | ||
| deployment = test_deployment_sync | ||
|
|
||
| flow_run = run_deployment( | ||
| f"foo-sync/{deployment.name}", | ||
| parameters={"sync_param": "sync_value"}, | ||
| timeout=0, | ||
| poll_interval=0, | ||
| _sync=True, | ||
| ) | ||
| assert flow_run.deployment_id == deployment.id | ||
| assert flow_run.parameters == {"sync_param": "sync_value"} | ||
|
|
||
| def test_run_deployment_sync_with_tags( | ||
| self, | ||
| sync_prefect_client, | ||
| test_deployment_sync, | ||
| ): | ||
| """Test that run_deployment in sync context accepts tags.""" | ||
| deployment = test_deployment_sync | ||
|
|
||
| flow_run = run_deployment( | ||
| f"foo-sync/{deployment.name}", | ||
| tags=["sync", "test"], | ||
| timeout=0, | ||
| poll_interval=0, | ||
| _sync=True, | ||
| ) | ||
| assert sorted(flow_run.tags) == ["sync", "test"] | ||
|
|
||
| def test_run_deployment_sync_negative_timeout_raises( | ||
| self, | ||
| sync_prefect_client, | ||
| test_deployment_sync, | ||
| ): | ||
| """Test that run_deployment in sync context raises on negative timeout.""" | ||
| deployment = test_deployment_sync | ||
|
|
||
| with pytest.raises(ValueError, match="`timeout` cannot be negative"): | ||
| run_deployment( | ||
| f"foo-sync/{deployment.name}", | ||
| timeout=-1, | ||
| _sync=True, | ||
| ) | ||
|
|
||
| def test_run_deployment_sync_with_deployment_id( | ||
| self, | ||
| sync_prefect_client, | ||
| test_deployment_sync, | ||
| ): | ||
| """Test that run_deployment in sync context works with deployment UUID.""" | ||
| deployment = test_deployment_sync | ||
|
|
||
| flow_run = run_deployment( | ||
| deployment.id, | ||
| timeout=0, | ||
| poll_interval=0, | ||
| _sync=True, | ||
| ) | ||
| assert flow_run.deployment_id == deployment.id | ||
| assert flow_run.state |
There was a problem hiding this comment.
Unused sync_prefect_client parameters in sync tests.
These args are not referenced and trigger Ruff ARG002. Remove them (or prefix with _) across the class.
🧹 Example fix (apply similarly to all methods in this class)
- def test_run_deployment_sync_basic(
- self,
- sync_prefect_client,
- test_deployment_sync,
- ):
+ def test_run_deployment_sync_basic(
+ self,
+ test_deployment_sync,
+ ):📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| class TestRunDeploymentSyncContext: | |
| """Tests for run_deployment behavior in sync context.""" | |
| @pytest.fixture | |
| def test_deployment_sync(self, sync_prefect_client): | |
| flow_id = sync_prefect_client.create_flow_from_name("foo-sync") | |
| deployment_id = sync_prefect_client.create_deployment( | |
| name="foo-sync-deployment", | |
| flow_id=flow_id, | |
| parameter_openapi_schema={"type": "object", "properties": {}}, | |
| ) | |
| deployment = sync_prefect_client.read_deployment(deployment_id) | |
| return deployment | |
| def test_run_deployment_sync_basic( | |
| self, | |
| sync_prefect_client, | |
| test_deployment_sync, | |
| ): | |
| """Test that run_deployment works in a synchronous context.""" | |
| deployment = test_deployment_sync | |
| # Force sync execution using _sync parameter | |
| flow_run = run_deployment( | |
| f"foo-sync/{deployment.name}", | |
| timeout=0, | |
| poll_interval=0, | |
| _sync=True, | |
| ) | |
| assert flow_run.deployment_id == deployment.id | |
| assert flow_run.state | |
| def test_run_deployment_sync_with_parameters( | |
| self, | |
| sync_prefect_client, | |
| test_deployment_sync, | |
| ): | |
| """Test that run_deployment in sync context passes parameters correctly.""" | |
| deployment = test_deployment_sync | |
| flow_run = run_deployment( | |
| f"foo-sync/{deployment.name}", | |
| parameters={"sync_param": "sync_value"}, | |
| timeout=0, | |
| poll_interval=0, | |
| _sync=True, | |
| ) | |
| assert flow_run.deployment_id == deployment.id | |
| assert flow_run.parameters == {"sync_param": "sync_value"} | |
| def test_run_deployment_sync_with_tags( | |
| self, | |
| sync_prefect_client, | |
| test_deployment_sync, | |
| ): | |
| """Test that run_deployment in sync context accepts tags.""" | |
| deployment = test_deployment_sync | |
| flow_run = run_deployment( | |
| f"foo-sync/{deployment.name}", | |
| tags=["sync", "test"], | |
| timeout=0, | |
| poll_interval=0, | |
| _sync=True, | |
| ) | |
| assert sorted(flow_run.tags) == ["sync", "test"] | |
| def test_run_deployment_sync_negative_timeout_raises( | |
| self, | |
| sync_prefect_client, | |
| test_deployment_sync, | |
| ): | |
| """Test that run_deployment in sync context raises on negative timeout.""" | |
| deployment = test_deployment_sync | |
| with pytest.raises(ValueError, match="`timeout` cannot be negative"): | |
| run_deployment( | |
| f"foo-sync/{deployment.name}", | |
| timeout=-1, | |
| _sync=True, | |
| ) | |
| def test_run_deployment_sync_with_deployment_id( | |
| self, | |
| sync_prefect_client, | |
| test_deployment_sync, | |
| ): | |
| """Test that run_deployment in sync context works with deployment UUID.""" | |
| deployment = test_deployment_sync | |
| flow_run = run_deployment( | |
| deployment.id, | |
| timeout=0, | |
| poll_interval=0, | |
| _sync=True, | |
| ) | |
| assert flow_run.deployment_id == deployment.id | |
| assert flow_run.state | |
| class TestRunDeploymentSyncContext: | |
| """Tests for run_deployment behavior in sync context.""" | |
| `@pytest.fixture` | |
| def test_deployment_sync(self, sync_prefect_client): | |
| flow_id = sync_prefect_client.create_flow_from_name("foo-sync") | |
| deployment_id = sync_prefect_client.create_deployment( | |
| name="foo-sync-deployment", | |
| flow_id=flow_id, | |
| parameter_openapi_schema={"type": "object", "properties": {}}, | |
| ) | |
| deployment = sync_prefect_client.read_deployment(deployment_id) | |
| return deployment | |
| def test_run_deployment_sync_basic( | |
| self, | |
| test_deployment_sync, | |
| ): | |
| """Test that run_deployment works in a synchronous context.""" | |
| deployment = test_deployment_sync | |
| # Force sync execution using _sync parameter | |
| flow_run = run_deployment( | |
| f"foo-sync/{deployment.name}", | |
| timeout=0, | |
| poll_interval=0, | |
| _sync=True, | |
| ) | |
| assert flow_run.deployment_id == deployment.id | |
| assert flow_run.state | |
| def test_run_deployment_sync_with_parameters( | |
| self, | |
| test_deployment_sync, | |
| ): | |
| """Test that run_deployment in sync context passes parameters correctly.""" | |
| deployment = test_deployment_sync | |
| flow_run = run_deployment( | |
| f"foo-sync/{deployment.name}", | |
| parameters={"sync_param": "sync_value"}, | |
| timeout=0, | |
| poll_interval=0, | |
| _sync=True, | |
| ) | |
| assert flow_run.deployment_id == deployment.id | |
| assert flow_run.parameters == {"sync_param": "sync_value"} | |
| def test_run_deployment_sync_with_tags( | |
| self, | |
| test_deployment_sync, | |
| ): | |
| """Test that run_deployment in sync context accepts tags.""" | |
| deployment = test_deployment_sync | |
| flow_run = run_deployment( | |
| f"foo-sync/{deployment.name}", | |
| tags=["sync", "test"], | |
| timeout=0, | |
| poll_interval=0, | |
| _sync=True, | |
| ) | |
| assert sorted(flow_run.tags) == ["sync", "test"] | |
| def test_run_deployment_sync_negative_timeout_raises( | |
| self, | |
| test_deployment_sync, | |
| ): | |
| """Test that run_deployment in sync context raises on negative timeout.""" | |
| deployment = test_deployment_sync | |
| with pytest.raises(ValueError, match="`timeout` cannot be negative"): | |
| run_deployment( | |
| f"foo-sync/{deployment.name}", | |
| timeout=-1, | |
| _sync=True, | |
| ) | |
| def test_run_deployment_sync_with_deployment_id( | |
| self, | |
| test_deployment_sync, | |
| ): | |
| """Test that run_deployment in sync context works with deployment UUID.""" | |
| deployment = test_deployment_sync | |
| flow_run = run_deployment( | |
| deployment.id, | |
| timeout=0, | |
| poll_interval=0, | |
| _sync=True, | |
| ) | |
| assert flow_run.deployment_id == deployment.id | |
| assert flow_run.state |
🧰 Tools
🪛 Ruff (0.14.14)
800-800: Unused method argument: sync_prefect_client
(ARG002)
817-817: Unused method argument: sync_prefect_client
(ARG002)
835-835: Unused method argument: sync_prefect_client
(ARG002)
852-852: Unused method argument: sync_prefect_client
(ARG002)
867-867: Unused method argument: sync_prefect_client
(ARG002)
🤖 Prompt for AI Agents
In `@tests/deployment/test_flow_runs.py` around lines 782 - 880, The test methods
in TestRunDeploymentSyncContext declare an unused parameter sync_prefect_client
which triggers Ruff ARG002; remove that parameter from the method signatures or
rename it to _sync_prefect_client for each test method (e.g.,
test_run_deployment_sync_basic, test_run_deployment_sync_with_parameters,
test_run_deployment_sync_with_tags,
test_run_deployment_sync_negative_timeout_raises,
test_run_deployment_sync_with_deployment_id) while leaving the
test_deployment_sync fixture (which legitimately uses sync_prefect_client)
unchanged so test discovery and fixture injection still work.
| async def test_run_deployment_aio_attribute( | ||
| self, | ||
| prefect_client: PrefectClient, | ||
| test_deployment: DeploymentResponse, | ||
| ): |
There was a problem hiding this comment.
Unused prefect_client / test_deployment in .aio attribute test.
Drop them or prefix with _ to avoid ARG002.
🧹 Suggested fix
- async def test_run_deployment_aio_attribute(
- self,
- prefect_client: PrefectClient,
- test_deployment: DeploymentResponse,
- ):
+ async def test_run_deployment_aio_attribute(
+ self,
+ ):📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def test_run_deployment_aio_attribute( | |
| self, | |
| prefect_client: PrefectClient, | |
| test_deployment: DeploymentResponse, | |
| ): | |
| async def test_run_deployment_aio_attribute( | |
| self, | |
| ): |
🧰 Tools
🪛 Ruff (0.14.14)
924-924: Unused method argument: prefect_client
(ARG002)
925-925: Unused method argument: test_deployment
(ARG002)
🤖 Prompt for AI Agents
In `@tests/deployment/test_flow_runs.py` around lines 922 - 926, The test function
test_run_deployment_aio_attribute currently declares unused parameters
prefect_client and test_deployment which triggers lint ARG002; remove these
parameters or rename them to _prefect_client and _test_deployment (or prefix
with an underscore) so they are recognized as intentionally unused. Update the
function signature in the test_run_deployment_aio_attribute definition
accordingly and run tests/linter to confirm the warning is resolved.
Benchmark PR from agentic-review-benchmarks#6
Summary by CodeRabbit
arun_deployment, a new public async function for executing deployments with improved timeout semantics and job variable handling.run_deploymentremains fully functional and supported for backward compatibility.✏️ Tip: You can customize this high-level summary in your review settings.