Skip to content

Commit

Permalink
Add task run tag concurrency slot wait setting (#11020)
Browse files Browse the repository at this point in the history
Co-authored-by: Zach Angell <42625717+zangell44@users.noreply.github.com>
  • Loading branch information
taljaards and zangell44 authored Nov 13, 2023
1 parent bd2bc16 commit e782a7b
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 3 deletions.
2 changes: 1 addition & 1 deletion docs/concepts/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ Tags without explicit limits are considered to have unlimited concurrency.

Task tag limits are checked whenever a task run attempts to enter a [`Running` state](/concepts/states/).

If there are no concurrency slots available for any one of your task's tags, the transition to a `Running` state will be delayed and the client is instructed to try entering a `Running` state again in 30 seconds.
If there are no concurrency slots available for any one of your task's tags, the transition to a `Running` state will be delayed and the client is instructed to try entering a `Running` state again in 30 seconds (or the value specified by the `PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS` setting).

!!! warning "Concurrency limits in subflows"
Using concurrency limits on task runs in subflows can cause deadlocks. As a best practice, configure your tags and concurrency limits to avoid setting limits on task runs in subflows.
Expand Down
6 changes: 4 additions & 2 deletions src/prefect/server/orchestration/core_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
)
from prefect.server.schemas import core, filters, states
from prefect.server.schemas.states import StateType
from prefect.settings import PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS
from prefect.utilities.math import clamped_poisson_interval


Expand Down Expand Up @@ -92,7 +93,8 @@ class SecureTaskConcurrencySlots(BaseOrchestrationRule):
This rule checks if concurrency limits have been set on the tags associated with a
TaskRun. If so, a concurrency slot will be secured against each concurrency limit
before being allowed to transition into a running state. If a concurrency limit has
been reached, the client will be instructed to delay the transition for 30 seconds
been reached, the client will be instructed to delay the transition for the duration
specified by the "PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS" setting
before trying again. If the concurrency limit set on a tag is 0, the transition will
be aborted to prevent deadlocks.
"""
Expand Down Expand Up @@ -138,7 +140,7 @@ async def before_transition(
stale_limit.active_slots = list(active_slots)

await self.delay_transition(
30,
PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS.value(),
f"Concurrency limit for the {tag} tag has been reached",
)
else:
Expand Down
6 changes: 6 additions & 0 deletions src/prefect/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,12 @@ def default_cloud_ui_url(settings, value):
This value does not overwrite individually set retry delay seconds
"""

PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS = Setting(int, default=30)
"""
The number of seconds to wait before retrying when a task run
cannot secure a concurrency slot from the server.
"""

PREFECT_LOCAL_STORAGE_PATH = Setting(
Path,
default=Path("${PREFECT_HOME}") / "storage",
Expand Down

0 comments on commit e782a7b

Please sign in to comment.