Skip to content

Commit 77555cb

Browse files
yoneymiss-islington
authored andcommitted
gh-144981: Make PyUnstable_Code_SetExtra/GetExtra thread-safe (GH-144980)
* Make PyUnstable_Code_SetExtra/GetExtra thread-safe (cherry picked from commit a565327) Co-authored-by: Alper <alperyoney@fb.com>
1 parent e695019 commit 77555cb

File tree

4 files changed

+243
-31
lines changed

4 files changed

+243
-31
lines changed

Lib/test/test_free_threading/test_code.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,41 @@
11
import unittest
22

3+
try:
4+
import ctypes
5+
except ImportError:
6+
ctypes = None
7+
38
from threading import Thread
49
from unittest import TestCase
510

611
from test.support import threading_helper
12+
from test.support.threading_helper import run_concurrently
13+
14+
if ctypes is not None:
15+
capi = ctypes.pythonapi
16+
17+
freefunc = ctypes.CFUNCTYPE(None, ctypes.c_voidp)
18+
19+
RequestCodeExtraIndex = capi.PyUnstable_Eval_RequestCodeExtraIndex
20+
RequestCodeExtraIndex.argtypes = (freefunc,)
21+
RequestCodeExtraIndex.restype = ctypes.c_ssize_t
22+
23+
SetExtra = capi.PyUnstable_Code_SetExtra
24+
SetExtra.argtypes = (ctypes.py_object, ctypes.c_ssize_t, ctypes.c_voidp)
25+
SetExtra.restype = ctypes.c_int
26+
27+
GetExtra = capi.PyUnstable_Code_GetExtra
28+
GetExtra.argtypes = (
29+
ctypes.py_object,
30+
ctypes.c_ssize_t,
31+
ctypes.POINTER(ctypes.c_voidp),
32+
)
33+
GetExtra.restype = ctypes.c_int
34+
35+
# Note: each call to RequestCodeExtraIndex permanently allocates a slot
36+
# (the counter is monotonically increasing), up to MAX_CO_EXTRA_USERS (255).
37+
NTHREADS = 20
38+
739

840
@threading_helper.requires_working_threading()
941
class TestCode(TestCase):
@@ -25,6 +57,83 @@ def run_in_thread():
2557
for thread in threads:
2658
thread.join()
2759

60+
@unittest.skipUnless(ctypes, "ctypes is required")
61+
def test_request_code_extra_index_concurrent(self):
62+
"""Test concurrent calls to RequestCodeExtraIndex"""
63+
results = []
64+
65+
def worker():
66+
idx = RequestCodeExtraIndex(freefunc(0))
67+
self.assertGreaterEqual(idx, 0)
68+
results.append(idx)
69+
70+
run_concurrently(worker_func=worker, nthreads=NTHREADS)
71+
72+
# Every thread must get a unique index.
73+
self.assertEqual(len(results), NTHREADS)
74+
self.assertEqual(len(set(results)), NTHREADS)
75+
76+
@unittest.skipUnless(ctypes, "ctypes is required")
77+
def test_code_extra_all_ops_concurrent(self):
78+
"""Test concurrent RequestCodeExtraIndex + SetExtra + GetExtra"""
79+
LOOP = 100
80+
81+
def f():
82+
pass
83+
84+
code = f.__code__
85+
86+
def worker():
87+
idx = RequestCodeExtraIndex(freefunc(0))
88+
self.assertGreaterEqual(idx, 0)
89+
90+
for i in range(LOOP):
91+
ret = SetExtra(code, idx, ctypes.c_voidp(i + 1))
92+
self.assertEqual(ret, 0)
93+
94+
for _ in range(LOOP):
95+
extra = ctypes.c_voidp()
96+
ret = GetExtra(code, idx, extra)
97+
self.assertEqual(ret, 0)
98+
# The slot was set by this thread, so the value must
99+
# be the last one written.
100+
self.assertEqual(extra.value, LOOP)
101+
102+
run_concurrently(worker_func=worker, nthreads=NTHREADS)
103+
104+
@unittest.skipUnless(ctypes, "ctypes is required")
105+
def test_code_extra_set_get_concurrent(self):
106+
"""Test concurrent SetExtra + GetExtra on a shared index"""
107+
LOOP = 100
108+
109+
def f():
110+
pass
111+
112+
code = f.__code__
113+
114+
idx = RequestCodeExtraIndex(freefunc(0))
115+
self.assertGreaterEqual(idx, 0)
116+
117+
def worker():
118+
for i in range(LOOP):
119+
ret = SetExtra(code, idx, ctypes.c_voidp(i + 1))
120+
self.assertEqual(ret, 0)
121+
122+
for _ in range(LOOP):
123+
extra = ctypes.c_voidp()
124+
ret = GetExtra(code, idx, extra)
125+
self.assertEqual(ret, 0)
126+
# Value is set by any writer thread.
127+
self.assertTrue(1 <= extra.value <= LOOP)
128+
129+
run_concurrently(worker_func=worker, nthreads=NTHREADS)
130+
131+
# Every thread's last write is LOOP, so the final value must be LOOP.
132+
extra = ctypes.c_voidp()
133+
ret = GetExtra(code, idx, extra)
134+
self.assertEqual(ret, 0)
135+
self.assertEqual(extra.value, LOOP)
136+
28137

29138
if __name__ == "__main__":
30139
unittest.main()
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Made :c:func:`PyUnstable_Code_SetExtra`, :c:func:`PyUnstable_Code_GetExtra`,
2+
and :c:func:`PyUnstable_Eval_RequestCodeExtraIndex` thread-safe on the
3+
:term:`free threaded <free threading>` build.

Objects/codeobject.c

Lines changed: 113 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1582,6 +1582,67 @@ typedef struct {
15821582
} _PyCodeObjectExtra;
15831583

15841584

1585+
static inline size_t
1586+
code_extra_size(Py_ssize_t n)
1587+
{
1588+
return sizeof(_PyCodeObjectExtra) + (n - 1) * sizeof(void *);
1589+
}
1590+
1591+
#ifdef Py_GIL_DISABLED
1592+
static int
1593+
code_extra_grow_ft(PyCodeObject *co, _PyCodeObjectExtra *old_co_extra,
1594+
Py_ssize_t old_ce_size, Py_ssize_t new_ce_size,
1595+
Py_ssize_t index, void *extra)
1596+
{
1597+
_Py_CRITICAL_SECTION_ASSERT_OBJECT_LOCKED(co);
1598+
_PyCodeObjectExtra *new_co_extra = PyMem_Malloc(
1599+
code_extra_size(new_ce_size));
1600+
if (new_co_extra == NULL) {
1601+
PyErr_NoMemory();
1602+
return -1;
1603+
}
1604+
1605+
if (old_ce_size > 0) {
1606+
memcpy(new_co_extra->ce_extras, old_co_extra->ce_extras,
1607+
old_ce_size * sizeof(void *));
1608+
}
1609+
for (Py_ssize_t i = old_ce_size; i < new_ce_size; i++) {
1610+
new_co_extra->ce_extras[i] = NULL;
1611+
}
1612+
new_co_extra->ce_size = new_ce_size;
1613+
new_co_extra->ce_extras[index] = extra;
1614+
1615+
// Publish new buffer and its contents to lock-free readers.
1616+
FT_ATOMIC_STORE_PTR_RELEASE(co->co_extra, new_co_extra);
1617+
if (old_co_extra != NULL) {
1618+
// QSBR: defer old-buffer free until lock-free readers quiesce.
1619+
_PyMem_FreeDelayed(old_co_extra, code_extra_size(old_ce_size));
1620+
}
1621+
return 0;
1622+
}
1623+
#else
1624+
static int
1625+
code_extra_grow_gil(PyCodeObject *co, _PyCodeObjectExtra *old_co_extra,
1626+
Py_ssize_t old_ce_size, Py_ssize_t new_ce_size,
1627+
Py_ssize_t index, void *extra)
1628+
{
1629+
_PyCodeObjectExtra *new_co_extra = PyMem_Realloc(
1630+
old_co_extra, code_extra_size(new_ce_size));
1631+
if (new_co_extra == NULL) {
1632+
PyErr_NoMemory();
1633+
return -1;
1634+
}
1635+
1636+
for (Py_ssize_t i = old_ce_size; i < new_ce_size; i++) {
1637+
new_co_extra->ce_extras[i] = NULL;
1638+
}
1639+
new_co_extra->ce_size = new_ce_size;
1640+
new_co_extra->ce_extras[index] = extra;
1641+
co->co_extra = new_co_extra;
1642+
return 0;
1643+
}
1644+
#endif
1645+
15851646
int
15861647
PyUnstable_Code_GetExtra(PyObject *code, Py_ssize_t index, void **extra)
15871648
{
@@ -1590,15 +1651,19 @@ PyUnstable_Code_GetExtra(PyObject *code, Py_ssize_t index, void **extra)
15901651
return -1;
15911652
}
15921653

1593-
PyCodeObject *o = (PyCodeObject*) code;
1594-
_PyCodeObjectExtra *co_extra = (_PyCodeObjectExtra*) o->co_extra;
1654+
PyCodeObject *co = (PyCodeObject *)code;
1655+
*extra = NULL;
15951656

1596-
if (co_extra == NULL || index < 0 || co_extra->ce_size <= index) {
1597-
*extra = NULL;
1657+
if (index < 0) {
15981658
return 0;
15991659
}
16001660

1601-
*extra = co_extra->ce_extras[index];
1661+
// Lock-free read; pairs with release stores in SetExtra.
1662+
_PyCodeObjectExtra *co_extra = FT_ATOMIC_LOAD_PTR_ACQUIRE(co->co_extra);
1663+
if (co_extra != NULL && index < co_extra->ce_size) {
1664+
*extra = FT_ATOMIC_LOAD_PTR_ACQUIRE(co_extra->ce_extras[index]);
1665+
}
1666+
16021667
return 0;
16031668
}
16041669

@@ -1608,40 +1673,59 @@ PyUnstable_Code_SetExtra(PyObject *code, Py_ssize_t index, void *extra)
16081673
{
16091674
PyInterpreterState *interp = _PyInterpreterState_GET();
16101675

1611-
if (!PyCode_Check(code) || index < 0 ||
1612-
index >= interp->co_extra_user_count) {
1676+
// co_extra_user_count is monotonically increasing and published with
1677+
// release store in RequestCodeExtraIndex, so once an index is valid
1678+
// it stays valid.
1679+
Py_ssize_t user_count = FT_ATOMIC_LOAD_SSIZE_ACQUIRE(
1680+
interp->co_extra_user_count);
1681+
1682+
if (!PyCode_Check(code) || index < 0 || index >= user_count) {
16131683
PyErr_BadInternalCall();
16141684
return -1;
16151685
}
16161686

1617-
PyCodeObject *o = (PyCodeObject*) code;
1618-
_PyCodeObjectExtra *co_extra = (_PyCodeObjectExtra *) o->co_extra;
1687+
PyCodeObject *co = (PyCodeObject *)code;
1688+
int result = 0;
1689+
void *old_slot_value = NULL;
16191690

1620-
if (co_extra == NULL || co_extra->ce_size <= index) {
1621-
Py_ssize_t i = (co_extra == NULL ? 0 : co_extra->ce_size);
1622-
co_extra = PyMem_Realloc(
1623-
co_extra,
1624-
sizeof(_PyCodeObjectExtra) +
1625-
(interp->co_extra_user_count-1) * sizeof(void*));
1626-
if (co_extra == NULL) {
1627-
return -1;
1628-
}
1629-
for (; i < interp->co_extra_user_count; i++) {
1630-
co_extra->ce_extras[i] = NULL;
1631-
}
1632-
co_extra->ce_size = interp->co_extra_user_count;
1633-
o->co_extra = co_extra;
1691+
Py_BEGIN_CRITICAL_SECTION(co);
1692+
1693+
_PyCodeObjectExtra *old_co_extra = (_PyCodeObjectExtra *)co->co_extra;
1694+
Py_ssize_t old_ce_size = (old_co_extra == NULL)
1695+
? 0 : old_co_extra->ce_size;
1696+
1697+
// Fast path: slot already exists, update in place.
1698+
if (index < old_ce_size) {
1699+
old_slot_value = old_co_extra->ce_extras[index];
1700+
FT_ATOMIC_STORE_PTR_RELEASE(old_co_extra->ce_extras[index], extra);
1701+
goto done;
16341702
}
16351703

1636-
if (co_extra->ce_extras[index] != NULL) {
1637-
freefunc free = interp->co_extra_freefuncs[index];
1638-
if (free != NULL) {
1639-
free(co_extra->ce_extras[index]);
1704+
// Slow path: buffer needs to grow.
1705+
Py_ssize_t new_ce_size = user_count;
1706+
#ifdef Py_GIL_DISABLED
1707+
// FT build: allocate new buffer and swap; QSBR reclaims the old one.
1708+
result = code_extra_grow_ft(
1709+
co, old_co_extra, old_ce_size, new_ce_size, index, extra);
1710+
#else
1711+
// GIL build: grow with realloc.
1712+
result = code_extra_grow_gil(
1713+
co, old_co_extra, old_ce_size, new_ce_size, index, extra);
1714+
#endif
1715+
1716+
done:;
1717+
Py_END_CRITICAL_SECTION();
1718+
if (old_slot_value != NULL) {
1719+
// Free the old slot value if a free function was registered.
1720+
// The caller must ensure no other thread can still access the old
1721+
// value after this overwrite.
1722+
freefunc free_extra = interp->co_extra_freefuncs[index];
1723+
if (free_extra != NULL) {
1724+
free_extra(old_slot_value);
16401725
}
16411726
}
16421727

1643-
co_extra->ce_extras[index] = extra;
1644-
return 0;
1728+
return result;
16451729
}
16461730

16471731

Python/ceval.c

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3486,11 +3486,27 @@ PyUnstable_Eval_RequestCodeExtraIndex(freefunc free)
34863486
PyInterpreterState *interp = _PyInterpreterState_GET();
34873487
Py_ssize_t new_index;
34883488

3489-
if (interp->co_extra_user_count == MAX_CO_EXTRA_USERS - 1) {
3489+
#ifdef Py_GIL_DISABLED
3490+
struct _py_code_state *state = &interp->code_state;
3491+
FT_MUTEX_LOCK(&state->mutex);
3492+
#endif
3493+
3494+
if (interp->co_extra_user_count >= MAX_CO_EXTRA_USERS - 1) {
3495+
#ifdef Py_GIL_DISABLED
3496+
FT_MUTEX_UNLOCK(&state->mutex);
3497+
#endif
34903498
return -1;
34913499
}
3492-
new_index = interp->co_extra_user_count++;
3500+
3501+
new_index = interp->co_extra_user_count;
34933502
interp->co_extra_freefuncs[new_index] = free;
3503+
3504+
// Publish freefuncs[new_index] before making the index visible.
3505+
FT_ATOMIC_STORE_SSIZE_RELEASE(interp->co_extra_user_count, new_index + 1);
3506+
3507+
#ifdef Py_GIL_DISABLED
3508+
FT_MUTEX_UNLOCK(&state->mutex);
3509+
#endif
34943510
return new_index;
34953511
}
34963512

0 commit comments

Comments
 (0)