diff --git a/docs/concepts/tasks.md b/docs/concepts/tasks.md index 9cb580cac28a..486212a1951d 100644 --- a/docs/concepts/tasks.md +++ b/docs/concepts/tasks.md @@ -593,7 +593,7 @@ Tags without explicit limits are considered to have unlimited concurrency. Task tag limits are checked whenever a task run attempts to enter a [`Running` state](/concepts/states/). -If there are no concurrency slots available for any one of your task's tags, the transition to a `Running` state will be delayed and the client is instructed to try entering a `Running` state again in 30 seconds. +If there are no concurrency slots available for any one of your task's tags, the transition to a `Running` state will be delayed and the client is instructed to try entering a `Running` state again in 30 seconds (or the value specified by the `PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS` setting). !!! warning "Concurrency limits in subflows" Using concurrency limits on task runs in subflows can cause deadlocks. As a best practice, configure your tags and concurrency limits to avoid setting limits on task runs in subflows. diff --git a/src/prefect/server/orchestration/core_policy.py b/src/prefect/server/orchestration/core_policy.py index 299c615f519f..7468213d9120 100644 --- a/src/prefect/server/orchestration/core_policy.py +++ b/src/prefect/server/orchestration/core_policy.py @@ -30,6 +30,7 @@ ) from prefect.server.schemas import core, filters, states from prefect.server.schemas.states import StateType +from prefect.settings import PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS from prefect.utilities.math import clamped_poisson_interval @@ -92,7 +93,8 @@ class SecureTaskConcurrencySlots(BaseOrchestrationRule): This rule checks if concurrency limits have been set on the tags associated with a TaskRun. If so, a concurrency slot will be secured against each concurrency limit before being allowed to transition into a running state. If a concurrency limit has - been reached, the client will be instructed to delay the transition for 30 seconds + been reached, the client will be instructed to delay the transition for the duration + specified by the "PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS" setting before trying again. If the concurrency limit set on a tag is 0, the transition will be aborted to prevent deadlocks. """ @@ -138,7 +140,7 @@ async def before_transition( stale_limit.active_slots = list(active_slots) await self.delay_transition( - 30, + PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS.value(), f"Concurrency limit for the {tag} tag has been reached", ) else: diff --git a/src/prefect/settings.py b/src/prefect/settings.py index 7f9dd3e88585..412fb97e12b7 100644 --- a/src/prefect/settings.py +++ b/src/prefect/settings.py @@ -760,6 +760,12 @@ def default_cloud_ui_url(settings, value): This value does not overwrite individually set retry delay seconds """ +PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS = Setting(int, default=30) +""" +The number of seconds to wait before retrying when a task run +cannot secure a concurrency slot from the server. +""" + PREFECT_LOCAL_STORAGE_PATH = Setting( Path, default=Path("${PREFECT_HOME}") / "storage",