Skip to content

Improve workflow task deadlock and eviction #806

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions temporalio/worker/_replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,22 @@ def __init__(
"""Create a replayer to replay workflows from history.

See :py:meth:`temporalio.worker.Worker.__init__` for a description of
most of the arguments. The same arguments need to be passed to the
replayer that were passed to the worker when the workflow originally
most of the arguments. Most of the same arguments need to be passed to
the replayer that were passed to the worker when the workflow originally
ran.

Note, unlike the worker, for the replayer the workflow_task_executor
will default to a new thread pool executor with no max_workers set that
will be shared across all replay calls and never explicitly shut down.
Users are encouraged to provide their own if needing more control.
"""
if not workflows:
raise ValueError("At least one workflow must be specified")
self._config = ReplayerConfig(
workflows=list(workflows),
workflow_task_executor=workflow_task_executor,
workflow_task_executor=(
workflow_task_executor or concurrent.futures.ThreadPoolExecutor()
),
workflow_runner=workflow_runner,
unsandboxed_workflow_runner=unsandboxed_workflow_runner,
namespace=namespace,
Expand Down Expand Up @@ -195,6 +202,7 @@ def on_eviction_hook(
task_queue=task_queue,
workflows=self._config["workflows"],
workflow_task_executor=self._config["workflow_task_executor"],
max_concurrent_workflow_tasks=5,
workflow_runner=self._config["workflow_runner"],
unsandboxed_workflow_runner=self._config["unsandboxed_workflow_runner"],
data_converter=self._config["data_converter"],
Expand Down
8 changes: 5 additions & 3 deletions temporalio/worker/_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,10 @@ def __init__(
workflow_task_executor: Thread pool executor for workflow tasks. If
this is not present, a new
:py:class:`concurrent.futures.ThreadPoolExecutor` will be
created with ``max_workers`` set to ``max(os.cpu_count(), 4)``.
The default one will be properly shutdown, but if one is
provided, the caller is responsible for shutting it down after
created with ``max_workers`` set to
``max_concurrent_workflow_tasks`` if it is present, or 500
otherwise. The default one will be properly shutdown, but if one
is provided, the caller is responsible for shutting it down after
the worker is shut down.
workflow_runner: Runner for workflows.
unsandboxed_workflow_runner: Runner for workflows that opt-out of
Expand Down Expand Up @@ -312,6 +313,7 @@ def __init__(
task_queue=task_queue,
workflows=workflows,
workflow_task_executor=workflow_task_executor,
max_concurrent_workflow_tasks=max_concurrent_workflow_tasks,
workflow_runner=workflow_runner,
unsandboxed_workflow_runner=unsandboxed_workflow_runner,
data_converter=client_config["data_converter"],
Expand Down
Loading
Loading