|
| 1 | +asyncio |
| 2 | +======= |
| 3 | + |
| 4 | + |
| 5 | +This document describes the working and implementation details of C |
| 6 | +implementation of the |
| 7 | +[`asyncio`](https://docs.python.org/3/library/asyncio.html) module. |
| 8 | + |
| 9 | + |
| 10 | +## Pre-Python 3.14 implementation |
| 11 | + |
| 12 | +Before Python 3.14, the C implementation of `asyncio` used a |
| 13 | +[`WeakSet`](https://docs.python.org/3/library/weakref.html#weakref.WeakSet) |
| 14 | +to store all the tasks created by the event loop. `WeakSet` was used |
| 15 | +so that the event loop doesn't hold strong references to the tasks, |
| 16 | +allowing them to be garbage collected when they are no longer needed. |
| 17 | +The current task of the event loop was stored in a dict mapping the |
| 18 | +event loop to the current task. |
| 19 | + |
| 20 | +```c |
| 21 | + /* Dictionary containing tasks that are currently active in |
| 22 | + all running event loops. {EventLoop: Task} */ |
| 23 | + PyObject *current_tasks; |
| 24 | + |
| 25 | + /* WeakSet containing all tasks scheduled to run on event loops. */ |
| 26 | + PyObject *scheduled_tasks; |
| 27 | +``` |
| 28 | + |
| 29 | +This implementation had a few drawbacks: |
| 30 | + |
| 31 | +1. **Performance**: Using a `WeakSet` for storing tasks is |
| 32 | +inefficient, as it requires maintaining a full set of weak references |
| 33 | +to tasks along with corresponding weakref callback to cleanup the |
| 34 | +tasks when they are garbage collected. This increases the work done |
| 35 | +by the garbage collector, and in applications with a large number of |
| 36 | +tasks, this becomes a bottleneck, with increased memory usage and |
| 37 | +lower performance. Looking up the current task was slow as it required |
| 38 | +a dictionary lookup on the `current_tasks` dict. |
| 39 | + |
| 40 | +2. **Thread safety**: Before Python 3.14, concurrent iterations over |
| 41 | +`WeakSet` was not thread safe[^1]. This meant calling APIs like |
| 42 | +`asyncio.all_tasks()` could lead to inconsistent results or even |
| 43 | +`RuntimeError` if used in multiple threads[^2]. |
| 44 | + |
| 45 | +3. **Poor scaling in free-threading**: Using global `WeakSet` for |
| 46 | +storing all tasks across all threads lead to contention when adding |
| 47 | +and removing tasks from the set which is a frequent operation. As such |
| 48 | +it performed poorly in free-threading and did not scale well with the |
| 49 | +number of threads. Similarly, accessing the current task in multiple |
| 50 | +threads did not scale due to contention on the global `current_tasks` |
| 51 | +dictionary. |
| 52 | + |
| 53 | +## Python 3.14 implementation |
| 54 | + |
| 55 | +To address these issues, Python 3.14 implements several changes to |
| 56 | +improve the performance and thread safety of tasks management. |
| 57 | + |
| 58 | +- **Per-thread double linked list for tasks**: Python 3.14 introduces |
| 59 | + a per-thread circular double linked list implementation for |
| 60 | + storing tasks. This allows each thread to maintain its own list of |
| 61 | + tasks and allows for lock free addition and removal of tasks. This |
| 62 | + is designed to be efficient, and thread-safe and scales well with |
| 63 | + the number of threads in free-threading. This also allows external |
| 64 | + introspection tools such as `python -m asyncio pstree` to inspect |
| 65 | + tasks running in all threads and was implemented as part of [Audit |
| 66 | + asyncio thread |
| 67 | + safety](https://github.com/python/cpython/issues/128002). |
| 68 | + |
| 69 | +- **Per-thread current task**: Python 3.14 stores the current task on |
| 70 | + the current thread state instead of a global dictionary. This |
| 71 | + allows for faster access to the current task without the need for |
| 72 | + a dictionary lookup. Each thread maintains its own current task, |
| 73 | + which is stored in the `PyThreadState` structure. This was |
| 74 | + implemented in https://github.com/python/cpython/issues/129898. |
| 75 | + |
| 76 | +Storing the current task and list of all tasks per-thread instead of |
| 77 | +storing it per-loop was chosen primarily to support external |
| 78 | +introspection tools such as `python -m asyncio pstree` as looking up |
| 79 | +arbitrary attributes on the loop object is not possible |
| 80 | +externally. Storing data per-thread also makes it easy to support |
| 81 | +third party event loop implementations such as `uvloop`, and is more |
| 82 | +efficient for the single threaded asyncio use-case as it avoids the |
| 83 | +overhead of attribute lookups on the loop object and several other |
| 84 | +calls on the performance critical path of adding and removing tasks |
| 85 | +from the per-loop task list. |
| 86 | + |
| 87 | +## Per-thread double linked list for tasks |
| 88 | + |
| 89 | +This implementation uses a circular doubly linked list to store tasks |
| 90 | +on the thread states. This is used for all tasks which are instances |
| 91 | +of `asyncio.Task` or subclasses of it, for third-party tasks a |
| 92 | +fallback `WeakSet` implementation is used. The linked list is |
| 93 | +implemented using an embedded `llist_node` structure within each |
| 94 | +`TaskObj`. By embedding the list node directly into the task object, |
| 95 | +the implementation avoids additional memory allocations for linked |
| 96 | +list nodes. |
| 97 | + |
| 98 | +The `PyThreadState` structure gained a new field `asyncio_tasks_head`, |
| 99 | +which serves as the head of the circular linked list of tasks. This |
| 100 | +allows for lock free addition and removal of tasks from the list. |
| 101 | + |
| 102 | +It is possible that when a thread state is deallocated, there are |
| 103 | +lingering tasks in its list; this can happen if another thread has |
| 104 | +references to the tasks of this thread. Therefore, the |
| 105 | +`PyInterpreterState` structure also gains a new `asyncio_tasks_head` |
| 106 | +field to store any lingering tasks. When a thread state is |
| 107 | +deallocated, any remaining lingering tasks are moved to the |
| 108 | +interpreter state tasks list, and the thread state tasks list is |
| 109 | +cleared. The `asyncio_tasks_lock` is used protect the interpreter's |
| 110 | +tasks list from concurrent modifications. |
| 111 | + |
| 112 | +```c |
| 113 | +typedef struct TaskObj { |
| 114 | + ... |
| 115 | + struct llist_node asyncio_node; |
| 116 | +} TaskObj; |
| 117 | + |
| 118 | +typedef struct PyThreadState { |
| 119 | + ... |
| 120 | + struct llist_node asyncio_tasks_head; |
| 121 | +} PyThreadState; |
| 122 | + |
| 123 | +typedef struct PyInterpreterState { |
| 124 | + ... |
| 125 | + struct llist_node asyncio_tasks_head; |
| 126 | + PyMutex asyncio_tasks_lock; |
| 127 | +} PyInterpreterState; |
| 128 | +``` |
| 129 | + |
| 130 | +When a task is created, it is added to the current thread's list of |
| 131 | +tasks by the `register_task` function. When the task is done, it is |
| 132 | +removed from the list by the `unregister_task` function. In |
| 133 | +free-threading, the thread id of the thread which created the task is |
| 134 | +stored in `task_tid` field of the `TaskObj`. This is used to check if |
| 135 | +the task is being removed from the correct thread's task list. If the |
| 136 | +current thread is same as the thread which created it then no locking |
| 137 | +is required, otherwise in free-threading, the `stop-the-world` pause |
| 138 | +is used to pause all other threads and then safely remove the task |
| 139 | +from the tasks list. |
| 140 | + |
| 141 | +```mermaid |
| 142 | +
|
| 143 | +flowchart TD |
| 144 | + subgraph one["Executing Thread"] |
| 145 | + A["task = asyncio.create_task(coro())"] -->B("register_task(task)") |
| 146 | + B --> C{"task->task_state?"} |
| 147 | + C -->|pending| D["task_step(task)"] |
| 148 | + C -->|done| F["unregister_task(task)"] |
| 149 | + C -->|cancelled| F["unregister_task(task)"] |
| 150 | + D --> C |
| 151 | + F --> G{"free-threading?"} |
| 152 | + G --> |false| H["unregister_task_safe(task)"] |
| 153 | + G --> |true| J{"correct thread? <br>task->task_tid == _Py_ThreadId()"} |
| 154 | + J --> |true| H |
| 155 | + J --> |false| I["stop the world <br> pause all threads"] |
| 156 | + I --> H["unregister_task_safe(task)"] |
| 157 | + end |
| 158 | + subgraph two["Thread deallocating"] |
| 159 | + A1{"thread's task list empty? <br> llist_empty(tstate->asyncio_tasks_head)"} |
| 160 | + A1 --> |true| B1["deallocate thread<br>free_threadstate(tstate)"] |
| 161 | + A1 --> |false| C1["add tasks to interpreter's task list<br> llist_concat(&tstate->interp->asyncio_tasks_head,tstate->asyncio_tasks_head)"] |
| 162 | + C1 --> B1 |
| 163 | + end |
| 164 | +
|
| 165 | + one --> two |
| 166 | +``` |
| 167 | + |
| 168 | +`asyncio.all_tasks` now iterates over the per-thread task lists of all |
| 169 | +threads and the interpreter's task list to get all the tasks. In |
| 170 | +free-threading, this is done by pausing all the threads using the |
| 171 | +`stop-the-world` pause to ensure that no tasks are being added or |
| 172 | +removed while iterating over the lists. This allows for a consistent |
| 173 | +view of all task lists across all threads and is thread safe. |
| 174 | + |
| 175 | +This design allows for lock free execution and scales well in |
| 176 | +free-threading with multiple event loops running in different threads. |
| 177 | + |
| 178 | +## Per-thread current task |
| 179 | + |
| 180 | +This implementation stores the current task in the `PyThreadState` |
| 181 | +structure, which allows for faster access to the current task without |
| 182 | +the need for a dictionary lookup. |
| 183 | + |
| 184 | +```c |
| 185 | +typedef struct PyThreadState { |
| 186 | + ... |
| 187 | + PyObject *asyncio_current_loop; |
| 188 | + PyObject *asyncio_current_task; |
| 189 | +} PyThreadState; |
| 190 | +``` |
| 191 | + |
| 192 | +When a task is entered or left, the current task is updated in the |
| 193 | +thread state using `enter_task` and `leave_task` functions. When |
| 194 | +`current_task(loop)` is called where `loop` is the current running |
| 195 | +event loop of the current thread, no locking is required as the |
| 196 | +current task is stored in the thread state and is returned directly |
| 197 | +(general case). Otherwise, if the `loop` is not current running event |
| 198 | +loop, the `stop-the-world` pause is used to pause all threads in |
| 199 | +free-threading and then by iterating over all the thread states and |
| 200 | +checking if the `loop` matches with `tstate->asyncio_current_loop`, |
| 201 | +the current task is found and returned. If no matching thread state is |
| 202 | +found, `None` is returned. |
| 203 | + |
| 204 | +In free-threading, it avoids contention on a global dictionary as |
| 205 | +threads can access the current task of thier running loop without any |
| 206 | +locking. |
| 207 | + |
| 208 | + |
| 209 | +[^1]: https://github.com/python/cpython/issues/123089 |
| 210 | +[^2]: https://github.com/python/cpython/issues/80788 |
0 commit comments