Skip to content

Commit 8b9daa8

Browse files
committed
feat: section on some general multithreading concepts
Signed-off-by: Henry Schreiner <henryschreineriii@gmail.com>
1 parent 9b754a4 commit 8b9daa8

File tree

5 files changed

+217
-0
lines changed

5 files changed

+217
-0
lines changed

_toc.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ parts:
7676
- file: content/week11/intro.md
7777
- file: content/week11/openmp.md
7878
- file: content/week11/threading.md
79+
- file: content/week11/concepts.md
7980

8081
- caption: "Week 12: Distributed Computing"
8182
chapters:

content/week11/concepts.md

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
# Parallel concepts
2+
3+
These are general concepts, with some examples primarily in Python.
4+
5+
Note when playing with these examples: errors can get swallowed in threads we
6+
create, so static checks are really useful. The mechanism used to show an error
7+
varies based on several factors. `concurrent.futures` rethrows the exception
8+
when you get the result. `threading` prints out errors as strings (customizable
9+
in 3.8+, though). Etc.
10+
11+
Quite a few of these work across threading, interpreters, and multiprocessing in
12+
Python, and are generally found in other languages too, sometimes with different
13+
names.
14+
15+
## Thread safety
16+
17+
Is it safe to use a variable from multiple threads? Generally, read-only
18+
variables are fine; if you don't modify something, there's no problem. But if
19+
you modify a variable, there's a potential problem. Here's an example:
20+
21+
```{literalinclude} conceptsexample/threadunsafe.py
22+
23+
```
24+
25+
First, I set up a mutable variable, which is simply a list containing an item. I
26+
did this so I can avoid writing the word `gllobal`, to show that the problem is
27+
about mutating variables shared between threads, not unique to global variables.
28+
I use `+=` to add 1. Remember, `x+=1` really does `x = x + 1`: the processor
29+
reads the value, then adds one, then writes the value. If another thread reads
30+
the value in the middle of that process, then the final result will be one less
31+
than you expected!
32+
33+
If you run this with traditional Python, you'll probably get the trivial result:
34+
80,000. That's due to Python's context switching; it's not context switching
35+
within this operation. However, if you run it with free-threaded Python, you'll
36+
get the normal multithreading result; some smaller number (around 17,000 for
37+
me). (Don't rely on this in traditional Python either; this is a highly
38+
simplified example and we are relying completely on the context switching points
39+
helping us out here!).
40+
41+
This variable is not thread safe, which means we can't safely set it across
42+
threads. We'll look at several ways to make or use thread safe variables.
43+
44+
## Mutex
45+
46+
One of the most general tools for thread safely is a mutex. In Python, it's
47+
called a `Lock` or `RLock`, depending on if you can re-enter it in the same
48+
thread. Let's take a look:
49+
50+
```{literalinclude} conceptsexample/threadmutex.py
51+
52+
```
53+
54+
Here, we create an instance of a `threading.Lock`. Now we protect the part of
55+
our code that is mutating a variable by taking a lock with our mutex (using the
56+
with block, which releases automatically when exiting the block).
57+
58+
Note this works because the variable itself (Lock instances) are threadsafe,
59+
meaning taking a lock is guaranteed to work from any thread.
60+
61+
If you were going for performance, remember, now you have some extra overhead
62+
_and_ only one thread at a time can make it through this portion of the code.
63+
Free-threaded Python now gives the correct result, but is not really any faster
64+
than traditional Python, because it's basically doing the same thing. If you
65+
were doing a lot of work then locking around just a small portion of the code
66+
doing some sort of synchronized update, that would be much better.
67+
68+
One common issue is called a "deadlock"; this is when you end up trying to
69+
acquire a lock that is already acquired and won't be released. Sometimes if it
70+
happens, it's from the same thread; such as if you have a recursive function
71+
call. Since you usually don't need the lock from the same thread anyway, `RLock`
72+
only blocks for a different thread trying to take the lock, solving this issue
73+
at least in that one case.
74+
75+
## Semaphore
76+
77+
This is like a Lock, but instead of having an on/off state, it keeps track of a
78+
number with a pre-set limit. For example, you can use a semaphore with a value 4
79+
to create something that never runs more than 4 at a time. This is useful if you
80+
have lots of threads, but a resource that you don't want to hit with all threads
81+
at once.
82+
83+
## Atomic (not in Python)
84+
85+
Often, you need to lock because you are updating a single value. Modern
86+
processors have specialized instructions to allow a value (like an integer) to
87+
be updated in a way that is threadsafe without all the overhead of a lock (and
88+
also they don't need a separate lock object).
89+
90+
Python doesn't have this, since modifying a Python object far more than a single
91+
integer operation, and it's just a performance benefit over a mutex. You can use
92+
`Event`, which is basically an atomic bool that you can wait on to be set
93+
(True).
94+
95+
## Queue
96+
97+
Another common use case is adding and removing items, usually so one thread can
98+
communicate work with another. A common use case will be creating a threadpool
99+
of workers, then feeding work to them, with each thread picking up and
100+
processing available work. This requires a threadsafe container, and it's
101+
usually optimized for input and output, versus iteration for example.
102+
103+
Python has a `Queue` class, which is very powerful first-in, first-out (FIFO)
104+
queue. A trimmed down version, `SimpleQueue`, is available, which doesn't have
105+
the added task-related additions. There's also last-in, first out (LIFO) and
106+
priority queues, depending on how you want to process tasks.
107+
108+
Here's an example:
109+
110+
```{literalinclude} conceptsexample/threadqueue.py
111+
112+
```
113+
114+
Here, we set up two queues. The first one has "jobs", and we tell the queue when
115+
we've completed each task so that it knows when it's done (for clean shutdowns
116+
of the worker threads, this also uses the `shutdown` mechanism introduced in
117+
Python 3.13; you can set clean shutdown yourself if you want to support older
118+
versions).
119+
120+
You might notice, if you look at the API for Queue, that there `block` and
121+
`timeout` arguments on the get and put methods. You can decide if you want to
122+
wait (`block` the current thread) till something is available, and also set a
123+
timeout for how long to wait. Queue's can also have a maximum size, which is why
124+
these exist for `put` as well. And, like locks, you can end up with deadlocks if
125+
you are not careful.
126+
127+
```{admonition} Error checking
128+
This example will swallow errors if you play with it and make a mistake. To fix that, you need to save the returned values from the `.submit(...)`'s, and then call `.result()` on them; that will reraise the exception in the current thread.
129+
```
130+
131+
## Barrier
132+
133+
You can set a barrier, which pause threads until all of them reach that point.
134+
For example, if you have 4 threads all computing something, you could use a
135+
barrier to ensure all threads are done with that computation before you move on
136+
to the next step.
137+
138+
## Local / shared memory
139+
140+
Sometimes you need memory just for a single thread. This differs a bit depending
141+
on what you are using, so feel free to look it up. Threading, for example, has
142+
`threading.local`. Async programming uses `contextvars`, since it actually runs
143+
in one thread, it needs a different mechanism to track "context" instead. For
144+
multiprocessing and interpereters, along with variables defined inside function
145+
bodies and such, this is the default.
146+
147+
Then you may need "shared" memory. This might be the default for
148+
threading/async, but is required for multiprocessing and subinterpreters. This
149+
is not an option if you are running on multiple machines; then you must transfer
150+
serialized objects instead. See the examples in the distributed section.
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from concurrent.futures import ThreadPoolExecutor
2+
import threading
3+
4+
x = [0]
5+
lock = threading.Lock()
6+
7+
8+
def add(num: int) -> None:
9+
for i in range(num):
10+
with lock:
11+
x[0] += 1
12+
13+
14+
with ThreadPoolExecutor() as pool:
15+
for _ in range(8):
16+
pool.submit(add, 10_000)
17+
18+
print(x[0])
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
from concurrent.futures import ThreadPoolExecutor
2+
import contextlib
3+
import queue
4+
import time
5+
6+
task_queue = queue.Queue()
7+
result_queue = queue.SimpleQueue()
8+
9+
10+
def worker(q: queue.Queue, r: queue.SimpleQueue) -> None:
11+
with contextlib.suppress(queue.ShutDown):
12+
while True:
13+
task = q.get()
14+
time.sleep(0.1)
15+
r.put(task * 10)
16+
q.task_done()
17+
18+
19+
with ThreadPoolExecutor() as pool:
20+
# Start up 8 workers
21+
for _ in range(8):
22+
pool.submit(worker, task_queue, result_queue)
23+
24+
# Load work for the workers to do
25+
for i in range(50):
26+
task_queue.put(i)
27+
28+
# Wait until task_done() is called the same number of times as items in the queue
29+
task_queue.shutdown()
30+
task_queue.join()
31+
32+
33+
print(sum(result_queue.get() for _ in range(result_queue.qsize())))
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from concurrent.futures import ThreadPoolExecutor
2+
3+
x = [0]
4+
5+
6+
def add(num: int) -> None:
7+
for i in range(num):
8+
x[0] += 1
9+
10+
11+
with ThreadPoolExecutor() as pool:
12+
for _ in range(8):
13+
pool.submit(add, 10_000)
14+
15+
print(x[0])

0 commit comments

Comments
 (0)