Skip to content

Commit 5140187

Browse files
Workflow multiprocessing (#263)
* WIP add workflow_multiprocessing sample * Add README content and other minor improvements * remove html table that rendered poorly. switch to print over logger for simplicity * remove syntax highlighting on blocks * Apply some minor improvements to the readme * fix broken link in top level README * remove inaccurate comment in starter.py * remove unused workflow input * run formatter. Ignore linter error on multiprocessing.get_context b/c both ForkContext and SpawnContext are acceptable for what we need * rename to worker_multiprocessing
1 parent 6de4916 commit 5140187

File tree

9 files changed

+336
-19
lines changed

9 files changed

+336
-19
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ Some examples require extra dependencies. See each sample's directory for specif
8484
* [updatable_timer](updatable_timer) - A timer that can be updated while sleeping.
8585
* [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers.
8686
* [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code.
87+
* [worker_multiprocessing](worker_multiprocessing) - Leverage Python multiprocessing to parallelize workflow tasks and other CPU bound operations by running multiple workers.
8788

8889
## Test
8990

pyproject.toml

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,8 @@ dev = [
2727
"poethepoet>=0.36.0",
2828
]
2929
bedrock = ["boto3>=1.34.92,<2"]
30-
dsl = [
31-
"pyyaml>=6.0.1,<7",
32-
"types-pyyaml>=6.0.12,<7",
33-
"dacite>=1.8.1,<2",
34-
]
35-
encryption = [
36-
"cryptography>=38.0.1,<39",
37-
"aiohttp>=3.8.1,<4",
38-
]
30+
dsl = ["pyyaml>=6.0.1,<7", "types-pyyaml>=6.0.12,<7", "dacite>=1.8.1,<2"]
31+
encryption = ["cryptography>=38.0.1,<39", "aiohttp>=3.8.1,<4"]
3932
gevent = ["gevent>=25.4.2 ; python_version >= '3.8'"]
4033
langchain = [
4134
"langchain>=0.1.7,<0.2 ; python_version >= '3.8.1' and python_version < '4.0'",
@@ -46,9 +39,7 @@ langchain = [
4639
"tqdm>=4.62.0,<5",
4740
"uvicorn[standard]>=0.24.0.post1,<0.25",
4841
]
49-
nexus = [
50-
"nexus-rpc>=1.1.0,<2",
51-
]
42+
nexus = ["nexus-rpc>=1.1.0,<2"]
5243
open-telemetry = [
5344
"temporalio[opentelemetry]",
5445
"opentelemetry-exporter-otlp-proto-grpc",
@@ -60,17 +51,16 @@ openai-agents = [
6051
]
6152
pydantic-converter = ["pydantic>=2.10.6,<3"]
6253
sentry = ["sentry-sdk>=2.13.0"]
63-
trio-async = [
64-
"trio>=0.28.0,<0.29",
65-
"trio-asyncio>=0.15.0,<0.16",
66-
]
54+
trio-async = ["trio>=0.28.0,<0.29", "trio-asyncio>=0.15.0,<0.16"]
6755
cloud-export-to-parquet = [
6856
"pandas>=2.2.2,<3 ; python_version >= '3.10' and python_version < '4.0'",
6957
"numpy>=1.26.0,<2 ; python_version >= '3.10' and python_version < '3.13'",
7058
"boto3>=1.34.89,<2",
7159
"pyarrow>=19.0.1",
7260
]
7361

62+
[tool.hatch.metadata]
63+
allow-direct-references = true
7464

7565
[tool.hatch.build.targets.sdist]
7666
include = ["./**/*.py"]
@@ -118,8 +108,15 @@ requires = ["hatchling"]
118108
build-backend = "hatchling.build"
119109

120110
[tool.poe.tasks]
121-
format = [{cmd = "uv run ruff check --select I --fix"}, {cmd = "uv run ruff format"}]
122-
lint = [{cmd = "uv run ruff check --select I"}, {cmd = "uv run ruff format --check"}, {ref = "lint-types"}]
111+
format = [
112+
{ cmd = "uv run ruff check --select I --fix" },
113+
{ cmd = "uv run ruff format" },
114+
]
115+
lint = [
116+
{ cmd = "uv run ruff check --select I" },
117+
{ cmd = "uv run ruff format --check" },
118+
{ ref = "lint-types" },
119+
]
123120
lint-types = "uv run --all-groups mypy --check-untyped-defs --namespace-packages ."
124121
test = "uv run --all-groups pytest"
125122

sleep_for_days/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
This sample demonstrates how to create a Temporal workflow that runs forever, sending an email every 30 days.
44

5-
To run, first see the main [README.md](../../README.md) for prerequisites.
5+
To run, first see the main [README.md](../README.md) for prerequisites.
66

77
Then create two terminals.
88

worker_multiprocessing/README.md

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# Worker Multiprocessing Sample
2+
3+
4+
## Python Concurrency Limitations
5+
6+
CPU-bound tasks effectively cannot run in parallel in Python due to the [Global Interpreter Lock (GIL)](https://docs.python.org/3/glossary.html#term-global-interpreter-lock). The Python standard library's [`threading` module](https://docs.python.org/3/library/threading.html) provides the following guidance:
7+
8+
> CPython implementation detail: In CPython, due to the Global Interpreter Lock, only one thread can execute Python code at once (even though certain performance-oriented libraries might overcome this limitation). If you want your application to make better use of the computational resources of multi-core machines, you are advised to use multiprocessing or concurrent.futures.ProcessPoolExecutor. However, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously.
9+
10+
## Temporal Workflow Tasks in Python
11+
12+
[Temporal Workflow Tasks](https://docs.temporal.io/tasks#workflow-task) are CPU-bound operations and therefore cannot be run concurrently using threads or an async runtime. Instead, we can use [`concurrent.futures.ProcessPoolExecutor`](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor) or the [`multiprocessing` module](https://docs.python.org/3/library/multiprocessing.html), as suggested by the `threading` documentation, to more appropriately utilize machine resources.
13+
14+
This sample demonstrates how to use `concurrent.futures.ProcessPoolExecutor` to run multiple workflow worker processes.
15+
16+
## Running the Sample
17+
18+
To run, first see the root [README.md](../README.md) for prerequisites. Then execute the following commands from the root directory:
19+
20+
```
21+
uv run worker_multiprocessing/worker.py
22+
uv run worker_multiprocessing/starter.py
23+
```
24+
25+
Both `worker.py` and `starter.py` have minimal arguments that can be adjusted to modify how the sample runs.
26+
27+
```
28+
uv run worker_multiprocessing/worker.py -h
29+
30+
usage: worker.py [-h] [-w NUM_WORKFLOW_WORKERS] [-a NUM_ACTIVITY_WORKERS]
31+
32+
options:
33+
-h, --help show this help message and exit
34+
-w, --num-workflow-workers NUM_WORKFLOW_WORKERS
35+
-a, --num-activity-workers NUM_ACTIVITY_WORKERS
36+
```
37+
38+
```
39+
uv run worker_multiprocessing/starter.py -h
40+
41+
usage: starter.py [-h] [-n NUM_WORKFLOWS]
42+
43+
options:
44+
-h, --help show this help message and exit
45+
-n, --num-workflows NUM_WORKFLOWS
46+
the number of workflows to execute
47+
```
48+
49+
## Example Output
50+
51+
```
52+
uv run worker_multiprocessing/worker.py
53+
54+
starting 2 workflow worker(s) and 1 activity worker(s)
55+
waiting for keyboard interrupt or for all workers to exit
56+
workflow-worker:0 starting
57+
workflow-worker:1 starting
58+
activity-worker:0 starting
59+
workflow-worker:0 shutting down
60+
activity-worker:0 shutting down
61+
workflow-worker:1 shutting down
62+
```
63+
64+
65+
```
66+
uv run worker_multiprocessing/starter.py
67+
68+
wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
69+
wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179
70+
wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
71+
wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179
72+
wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
73+
wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179
74+
wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
75+
wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179
76+
wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
77+
wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179
78+
wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
79+
wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179
80+
wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
81+
wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179
82+
wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
83+
wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179
84+
wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
85+
wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179
86+
wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179
87+
wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
88+
wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179
89+
wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
90+
wf-starting-pid:19179 | activity-pid:19180 | wf-ending-pid:19179
91+
wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
92+
wf-starting-pid:19178 | activity-pid:19180 | wf-ending-pid:19178
93+
```

worker_multiprocessing/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
WORKFLOW_TASK_QUEUE = "workflow-task-queue"
2+
ACTIVITY_TASK_QUEUE = "activity-task-queue"
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import os
2+
3+
from temporalio import activity
4+
5+
6+
@activity.defn
7+
async def echo_pid_activity(input: str) -> str:
8+
return f"{input} | activity-pid:{os.getpid()}"

worker_multiprocessing/starter.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import argparse
2+
import asyncio
3+
import uuid
4+
5+
from temporalio.client import Client
6+
from temporalio.envconfig import ClientConfig
7+
8+
from worker_multiprocessing import WORKFLOW_TASK_QUEUE
9+
from worker_multiprocessing.workflows import ParallelizedWorkflow
10+
11+
12+
class Args(argparse.Namespace):
13+
num_workflows: int
14+
15+
16+
async def main():
17+
parser = argparse.ArgumentParser()
18+
parser.add_argument(
19+
"-n",
20+
"--num-workflows",
21+
help="the number of workflows to execute",
22+
type=int,
23+
default=25,
24+
)
25+
args = parser.parse_args(namespace=Args())
26+
27+
config = ClientConfig.load_client_connect_config()
28+
config.setdefault("target_host", "localhost:7233")
29+
client = await Client.connect(**config)
30+
31+
# Start several workflows
32+
wf_handles = [
33+
client.execute_workflow(
34+
ParallelizedWorkflow.run,
35+
id=f"greeting-workflow-id-{uuid.uuid4()}",
36+
task_queue=WORKFLOW_TASK_QUEUE,
37+
)
38+
for _ in range(args.num_workflows)
39+
]
40+
41+
# Wait for workflow completion
42+
for wf in asyncio.as_completed(wf_handles):
43+
result = await wf
44+
print(result)
45+
46+
47+
if __name__ == "__main__":
48+
asyncio.run(main())

worker_multiprocessing/worker.py

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
import argparse
2+
import asyncio
3+
import concurrent.futures
4+
import dataclasses
5+
import multiprocessing
6+
import traceback
7+
from typing import Literal
8+
9+
from temporalio.client import Client
10+
from temporalio.envconfig import ClientConfig
11+
from temporalio.runtime import Runtime, TelemetryConfig
12+
from temporalio.worker import PollerBehaviorSimpleMaximum, Worker
13+
from temporalio.worker.workflow_sandbox import (
14+
SandboxedWorkflowRunner,
15+
SandboxRestrictions,
16+
)
17+
18+
from worker_multiprocessing import ACTIVITY_TASK_QUEUE, WORKFLOW_TASK_QUEUE
19+
from worker_multiprocessing.activities import echo_pid_activity
20+
from worker_multiprocessing.workflows import ParallelizedWorkflow
21+
22+
# Immediately prevent the default Runtime from being created to ensure
23+
# each process creates it's own
24+
Runtime.prevent_default()
25+
26+
27+
class Args(argparse.Namespace):
28+
num_workflow_workers: int
29+
num_activity_workers: int
30+
31+
@property
32+
def total_workers(self) -> int:
33+
return self.num_activity_workers + self.num_workflow_workers
34+
35+
36+
def main():
37+
parser = argparse.ArgumentParser()
38+
parser.add_argument("-w", "--num-workflow-workers", type=int, default=2)
39+
parser.add_argument("-a", "--num-activity-workers", type=int, default=1)
40+
args = parser.parse_args(namespace=Args())
41+
print(
42+
f"starting {args.num_workflow_workers} workflow worker(s) and {args.num_activity_workers} activity worker(s)"
43+
)
44+
45+
# This sample prefers fork to avoid re-importing modules
46+
# and decrease startup time. Fork is not available on all
47+
# operating systems, so we fallback to 'spawn' when not available
48+
try:
49+
mp_ctx = multiprocessing.get_context("fork")
50+
except ValueError:
51+
mp_ctx = multiprocessing.get_context("spawn") # type: ignore
52+
53+
with concurrent.futures.ProcessPoolExecutor(
54+
args.total_workers, mp_context=mp_ctx
55+
) as executor:
56+
# Start workflow workers by submitting them to the
57+
# ProcessPoolExecutor
58+
worker_futures = [
59+
executor.submit(worker_entry, "workflow", i)
60+
for i in range(args.num_workflow_workers)
61+
]
62+
63+
# In this sample, we start activity workers as separate processes in the
64+
# same way we do workflow workers. In production, activity workers
65+
# are often deployed separately from workflow workers to account for
66+
# differing scaling characteristics.
67+
worker_futures.extend(
68+
[
69+
executor.submit(worker_entry, "activity", i)
70+
for i in range(args.num_activity_workers)
71+
]
72+
)
73+
74+
try:
75+
print("waiting for keyboard interrupt or for all workers to exit")
76+
for worker in concurrent.futures.as_completed(worker_futures):
77+
print("ERROR: worker exited unexpectedly")
78+
if worker.exception():
79+
traceback.print_exception(worker.exception())
80+
except KeyboardInterrupt:
81+
pass
82+
83+
84+
def worker_entry(worker_type: Literal["workflow", "activity"], id: int):
85+
Runtime.set_default(Runtime(telemetry=TelemetryConfig()))
86+
87+
async def run_worker():
88+
config = ClientConfig.load_client_connect_config()
89+
config.setdefault("target_host", "localhost:7233")
90+
client = await Client.connect(**config)
91+
92+
if worker_type == "workflow":
93+
worker = workflow_worker(client)
94+
else:
95+
worker = activity_worker(client)
96+
97+
try:
98+
print(f"{worker_type}-worker:{id} starting")
99+
await asyncio.shield(worker.run())
100+
except asyncio.CancelledError:
101+
print(f"{worker_type}-worker:{id} shutting down")
102+
await worker.shutdown()
103+
104+
asyncio.run(run_worker())
105+
106+
107+
def workflow_worker(client: Client) -> Worker:
108+
"""
109+
Create a workflow worker that is configured to leverage being run
110+
as many child processes.
111+
"""
112+
return Worker(
113+
client,
114+
task_queue=WORKFLOW_TASK_QUEUE,
115+
workflows=[ParallelizedWorkflow],
116+
# Workflow tasks are CPU bound, but generally execute quickly.
117+
# Because we're leveraging multiprocessing to achieve parallelism,
118+
# we want each workflow worker to be confirgured for small workflow
119+
# task processing.
120+
max_concurrent_workflow_tasks=2,
121+
workflow_task_poller_behavior=PollerBehaviorSimpleMaximum(2),
122+
# Allow workflows to access the os module to access the pid
123+
workflow_runner=SandboxedWorkflowRunner(
124+
restrictions=dataclasses.replace(
125+
SandboxRestrictions.default,
126+
invalid_module_members=SandboxRestrictions.invalid_module_members_default.with_child_unrestricted(
127+
"os"
128+
),
129+
)
130+
),
131+
)
132+
133+
134+
def activity_worker(client: Client) -> Worker:
135+
"""
136+
Create a basic activity worker
137+
"""
138+
return Worker(
139+
client,
140+
task_queue=ACTIVITY_TASK_QUEUE,
141+
activities=[echo_pid_activity],
142+
)
143+
144+
145+
if __name__ == "__main__":
146+
main()

0 commit comments

Comments
 (0)