From b7b79a41b458368403f20d402004cac0c5930c4b Mon Sep 17 00:00:00 2001 From: Soami Charan Date: Fri, 18 Oct 2024 02:51:37 +0530 Subject: [PATCH] Thread pool task runner max worker configurable via environment variable (#15719) Co-authored-by: nate nowack --- src/prefect/settings.py | 6 ++++++ src/prefect/task_runners.py | 7 ++++++- tests/test_settings.py | 1 + tests/test_task_runners.py | 6 ++++++ 4 files changed, 19 insertions(+), 1 deletion(-) diff --git a/src/prefect/settings.py b/src/prefect/settings.py index 185217bee881..d52c6d7986af 100644 --- a/src/prefect/settings.py +++ b/src/prefect/settings.py @@ -1644,6 +1644,12 @@ class Settings(PrefectBaseSettings): description="Which cache implementation to use for the events system. Should point to a module that exports a Cache class.", ) + task_runner_thread_pool_max_workers: Optional[int] = Field( + default=None, + gt=0, + description="The maximum number of workers for ThreadPoolTaskRunner.", + ) + ########################################################################### # allow deprecated access to PREFECT_SOME_SETTING_NAME diff --git a/src/prefect/task_runners.py b/src/prefect/task_runners.py index 3e3e7f46aeb5..734bcf73b560 100644 --- a/src/prefect/task_runners.py +++ b/src/prefect/task_runners.py @@ -29,6 +29,7 @@ PrefectFutureList, ) from prefect.logging.loggers import get_logger, get_run_logger +from prefect.settings import PREFECT_TASK_RUNNER_THREAD_POOL_MAX_WORKERS from prefect.utilities.annotations import allow_failure, quote, unmapped from prefect.utilities.callables import ( collapse_variadic_parameters, @@ -220,7 +221,11 @@ class ThreadPoolTaskRunner(TaskRunner[PrefectConcurrentFuture]): def __init__(self, max_workers: Optional[int] = None): super().__init__() self._executor: Optional[ThreadPoolExecutor] = None - self._max_workers = sys.maxsize if max_workers is None else max_workers + self._max_workers = ( + (PREFECT_TASK_RUNNER_THREAD_POOL_MAX_WORKERS.value() or sys.maxsize) + if max_workers is None + else max_workers + ) self._cancel_events: Dict[uuid.UUID, threading.Event] = {} def duplicate(self) -> "ThreadPoolTaskRunner": diff --git a/tests/test_settings.py b/tests/test_settings.py index d93f59c09366..cf146427cdab 100644 --- a/tests/test_settings.py +++ b/tests/test_settings.py @@ -226,6 +226,7 @@ "PREFECT_WORKER_QUERY_SECONDS": {"test_value": 10.0}, "PREFECT_WORKER_WEBSERVER_HOST": {"test_value": "host"}, "PREFECT_WORKER_WEBSERVER_PORT": {"test_value": 8080}, + "PREFECT_TASK_RUNNER_THREAD_POOL_MAX_WORKERS": {"test_value": 5}, } diff --git a/tests/test_task_runners.py b/tests/test_task_runners.py index cd0c2718a5dd..c2e55221873e 100644 --- a/tests/test_task_runners.py +++ b/tests/test_task_runners.py @@ -14,6 +14,7 @@ from prefect.results import _default_storages from prefect.settings import ( PREFECT_DEFAULT_RESULT_STORAGE_BLOCK, + PREFECT_TASK_RUNNER_THREAD_POOL_MAX_WORKERS, PREFECT_TASK_SCHEDULING_DEFAULT_STORAGE_BLOCK, temporary_settings, ) @@ -94,6 +95,11 @@ def test_set_max_workers(self): with ThreadPoolTaskRunner(max_workers=2) as runner: assert runner._executor._max_workers == 2 + def test_set_max_workers_through_settings(self): + with temporary_settings({PREFECT_TASK_RUNNER_THREAD_POOL_MAX_WORKERS: 5}): + with ThreadPoolTaskRunner() as runner: + assert runner._executor._max_workers == 5 + def test_submit_sync_task(self): with ThreadPoolTaskRunner() as runner: parameters = {"param1": 1, "param2": 2}