diff --git a/Include/internal/pycore_ceval.h b/Include/internal/pycore_ceval.h index cff2b1f7114793..f32fcb57a016cd 100644 --- a/Include/internal/pycore_ceval.h +++ b/Include/internal/pycore_ceval.h @@ -48,12 +48,8 @@ extern void _PyEval_SignalReceived(void); #define _Py_PENDING_MAINTHREADONLY 1 #define _Py_PENDING_RAWFREE 2 -typedef int _Py_add_pending_call_result; -#define _Py_ADD_PENDING_SUCCESS 0 -#define _Py_ADD_PENDING_FULL -1 - // Export for '_testinternalcapi' shared extension -PyAPI_FUNC(_Py_add_pending_call_result) _PyEval_AddPendingCall( +PyAPI_FUNC(int) _PyEval_AddPendingCall( PyInterpreterState *interp, _Py_pending_call_func func, void *arg, diff --git a/Include/internal/pycore_ceval_state.h b/Include/internal/pycore_ceval_state.h index 009a1ea41eb985..1391d8454d3559 100644 --- a/Include/internal/pycore_ceval_state.h +++ b/Include/internal/pycore_ceval_state.h @@ -14,49 +14,38 @@ extern "C" { typedef int (*_Py_pending_call_func)(void *); -struct _pending_call { +typedef struct _pending_call { _Py_pending_call_func func; void *arg; int flags; -}; - -#define PENDINGCALLSARRAYSIZE 300 + struct _pending_call *next; +} _pending_call; -#define MAXPENDINGCALLS PENDINGCALLSARRAYSIZE /* For interpreter-level pending calls, we want to avoid spending too much time on pending calls in any one thread, so we apply a limit. */ -#if MAXPENDINGCALLS > 100 -# define MAXPENDINGCALLSLOOP 100 -#else -# define MAXPENDINGCALLSLOOP MAXPENDINGCALLS -#endif +#define MAXPENDINGCALLSLOOP 100 -/* We keep the number small to preserve as much compatibility - as possible with earlier versions. */ -#define MAXPENDINGCALLS_MAIN 32 /* For the main thread, we want to make sure all pending calls are run at once, for the sake of prompt signal handling. This is unlikely to cause any problems since there should be very few pending calls for the main thread. */ -#define MAXPENDINGCALLSLOOP_MAIN 0 +#define MAXPENDINGCALLSLOOP_MAIN INT32_MAX struct _pending_calls { PyThreadState *handling_thread; PyMutex mutex; - /* Request for running pending calls. */ + /* The number of pending calls. */ int32_t npending; - /* The maximum allowed number of pending calls. - If the queue fills up to this point then _PyEval_AddPendingCall() - will return _Py_ADD_PENDING_FULL. */ - int32_t max; /* We don't want a flood of pending calls to interrupt any one thread for too long, so we keep a limit on the number handled per pass. A value of 0 means there is no limit (other than the maximum size of the list of pending calls). */ int32_t maxloop; - struct _pending_call calls[PENDINGCALLSARRAYSIZE]; - int first; - int next; + _pending_call *head; + +#define _Py_PENDING_CALLS_FREELIST_SIZE 100 + _pending_call *freelist; + size_t freelist_num; }; @@ -97,8 +86,6 @@ struct _ceval_runtime_state { /* Pending calls to be made only on the main thread. */ // The signal machinery falls back on this // so it must be especially stable and efficient. - // For example, we use a preallocated array - // for the list of pending calls. struct _pending_calls pending_mainthread; PyMutex sys_trace_profile_mutex; }; diff --git a/Include/internal/pycore_runtime_init.h b/Include/internal/pycore_runtime_init.h index a17ba46966daa1..943437d8e111d1 100644 --- a/Include/internal/pycore_runtime_init.h +++ b/Include/internal/pycore_runtime_init.h @@ -163,7 +163,6 @@ extern PyTypeObject _PyExc_MemoryError; .parser = _parser_runtime_state_INIT, \ .ceval = { \ .pending_mainthread = { \ - .max = MAXPENDINGCALLS_MAIN, \ .maxloop = MAXPENDINGCALLSLOOP_MAIN, \ }, \ .perf = _PyEval_RUNTIME_PERF_INIT, \ @@ -223,7 +222,6 @@ extern PyTypeObject _PyExc_MemoryError; .ceval = { \ .recursion_limit = Py_DEFAULT_RECURSION_LIMIT, \ .pending = { \ - .max = MAXPENDINGCALLS, \ .maxloop = MAXPENDINGCALLSLOOP, \ }, \ }, \ diff --git a/Lib/test/test_capi/test_misc.py b/Lib/test/test_capi/test_misc.py index 5c6faa1626d380..20f6dd12d945e3 100644 --- a/Lib/test/test_capi/test_misc.py +++ b/Lib/test/test_capi/test_misc.py @@ -1557,14 +1557,9 @@ def callback(): for i in range(n): time.sleep(random.random()*0.02) #0.01 secs on average - #try submitting callback until successful. - #rely on regular interrupt to flush queue if we are - #unsuccessful. - while True: - if _testcapi._pending_threadfunc(callback): - break + self.assertTrue(_testcapi._pending_threadfunc(callback)) - def pendingcalls_submit(self, l, n, *, main=True, ensure=False): + def pendingcalls_submit(self, l, n, *, main=True): def callback(): #this function can be interrupted by thread switching so let's #use an atomic operation @@ -1572,12 +1567,10 @@ def callback(): if main: return _testcapi._pending_threadfunc(callback, n, - blocking=False, - ensure_added=ensure) + blocking=False) else: return _testinternalcapi.pending_threadfunc(callback, n, - blocking=False, - ensure_added=ensure) + blocking=False) def pendingcalls_wait(self, l, numadded, context = None): #now, stick around until l[0] has grown to 10 @@ -1635,51 +1628,11 @@ def test_main_pendingcalls_non_threaded(self): #again, just using the main thread, likely they will all be dispatched at #once. It is ok to ask for too many, because we loop until we find a slot. #the loop can be interrupted to dispatch. - #there are only 32 dispatch slots, so we go for twice that! l = [] n = 64 self.main_pendingcalls_submit(l, n) self.pendingcalls_wait(l, n) - def test_max_pending(self): - with self.subTest('main-only'): - maxpending = 32 - - l = [] - added = self.pendingcalls_submit(l, 1, main=True) - self.pendingcalls_wait(l, added) - self.assertEqual(added, 1) - - l = [] - added = self.pendingcalls_submit(l, maxpending, main=True) - self.pendingcalls_wait(l, added) - self.assertEqual(added, maxpending) - - l = [] - added = self.pendingcalls_submit(l, maxpending+1, main=True) - self.pendingcalls_wait(l, added) - self.assertEqual(added, maxpending) - - with self.subTest('not main-only'): - # Per-interpreter pending calls has a much higher limit - # on how many may be pending at a time. - maxpending = 300 - - l = [] - added = self.pendingcalls_submit(l, 1, main=False) - self.pendingcalls_wait(l, added) - self.assertEqual(added, 1) - - l = [] - added = self.pendingcalls_submit(l, maxpending, main=False) - self.pendingcalls_wait(l, added) - self.assertEqual(added, maxpending) - - l = [] - added = self.pendingcalls_submit(l, maxpending+1, main=False) - self.pendingcalls_wait(l, added) - self.assertEqual(added, maxpending) - class PendingTask(types.SimpleNamespace): _add_pending = _testinternalcapi.pending_threadfunc @@ -1713,10 +1666,10 @@ def callback(): # the eval breaker, so we take a naive approach to # make sure. if threading.get_ident() not in worker_tids: - self._add_pending(callback, ensure_added=True) + self._add_pending(callback) return self.run() - self._add_pending(callback, ensure_added=True) + self._add_pending(callback) def create_thread(self, worker_tids): return threading.Thread( diff --git a/Modules/_testcapimodule.c b/Modules/_testcapimodule.c index ea26295cca49d4..7fe3e381c623df 100644 --- a/Modules/_testcapimodule.c +++ b/Modules/_testcapimodule.c @@ -837,14 +837,13 @@ static PyObject * pending_threadfunc(PyObject *self, PyObject *arg, PyObject *kwargs) { static char *kwlist[] = {"callback", "num", - "blocking", "ensure_added", NULL}; + "blocking", NULL}; PyObject *callable; unsigned int num = 1; int blocking = 0; - int ensure_added = 0; if (!PyArg_ParseTupleAndKeywords(arg, kwargs, - "O|I$pp:_pending_threadfunc", kwlist, - &callable, &num, &blocking, &ensure_added)) + "O|I$p:_pending_threadfunc", kwlist, + &callable, &num, &blocking)) { return NULL; } @@ -861,16 +860,9 @@ pending_threadfunc(PyObject *self, PyObject *arg, PyObject *kwargs) unsigned int num_added = 0; for (; num_added < num; num_added++) { - if (ensure_added) { - int r; - do { - r = Py_AddPendingCall(&_pending_callback, callable); - } while (r < 0); - } - else { - if (Py_AddPendingCall(&_pending_callback, callable) < 0) { - break; - } + if (Py_AddPendingCall(&_pending_callback, callable) < 0) { + // out of memory and freelist is empty + break; } } diff --git a/Modules/_testinternalcapi.c b/Modules/_testinternalcapi.c index c403075fbb2501..d766cb0b0b396c 100644 --- a/Modules/_testinternalcapi.c +++ b/Modules/_testinternalcapi.c @@ -1072,18 +1072,17 @@ pending_threadfunc(PyObject *self, PyObject *args, PyObject *kwargs) PyObject *callable; unsigned int num = 1; int blocking = 0; - int ensure_added = 0; static char *kwlist[] = {"callback", "num", - "blocking", "ensure_added", NULL}; + "blocking", NULL}; if (!PyArg_ParseTupleAndKeywords(args, kwargs, - "O|I$pp:pending_threadfunc", kwlist, - &callable, &num, &blocking, &ensure_added)) + "O|I$p:pending_threadfunc", kwlist, + &callable, &num, &blocking)) { return NULL; } PyInterpreterState *interp = _PyInterpreterState_GET(); - /* create the reference for the callbackwhile we hold the lock */ + /* create the reference for the callback while we hold the lock */ for (unsigned int i = 0; i < num; i++) { Py_INCREF(callable); } @@ -1095,18 +1094,9 @@ pending_threadfunc(PyObject *self, PyObject *args, PyObject *kwargs) unsigned int num_added = 0; for (; num_added < num; num_added++) { - if (ensure_added) { - _Py_add_pending_call_result r; - do { - r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0); - assert(r == _Py_ADD_PENDING_SUCCESS - || r == _Py_ADD_PENDING_FULL); - } while (r == _Py_ADD_PENDING_FULL); - } - else { - if (_PyEval_AddPendingCall(interp, &_pending_callback, callable, 0) < 0) { - break; - } + if (_PyEval_AddPendingCall(interp, &_pending_callback, callable, 0) < 0) { + // out of memory and freelist is empty + break; } } @@ -1162,16 +1152,14 @@ pending_identify(PyObject *self, PyObject *args) PyThread_acquire_lock(mutex, WAIT_LOCK); /* It gets released in _pending_identify_callback(). */ - _Py_add_pending_call_result r; - do { - Py_BEGIN_ALLOW_THREADS - r = _PyEval_AddPendingCall(interp, - &_pending_identify_callback, (void *)mutex, - 0); - Py_END_ALLOW_THREADS - assert(r == _Py_ADD_PENDING_SUCCESS - || r == _Py_ADD_PENDING_FULL); - } while (r == _Py_ADD_PENDING_FULL); + + Py_BEGIN_ALLOW_THREADS + int r = _PyEval_AddPendingCall(interp, + &_pending_identify_callback, (void *)mutex, + 0); + (void)r; + assert(r == 0); + Py_END_ALLOW_THREADS /* Wait for the pending call to complete. */ PyThread_acquire_lock(mutex, WAIT_LOCK); diff --git a/Python/ceval_gil.c b/Python/ceval_gil.c index 4c9f59f837e11b..36aba5fcf1762a 100644 --- a/Python/ceval_gil.c +++ b/Python/ceval_gil.c @@ -709,59 +709,80 @@ signal_active_thread(PyInterpreterState *interp, uintptr_t bit) /* Push one item onto the queue while holding the lock. */ static int -_push_pending_call(struct _pending_calls *pending, +push_pending_call(struct _pending_calls *pending, _Py_pending_call_func func, void *arg, int flags) { - if (pending->npending == pending->max) { - return _Py_ADD_PENDING_FULL; + assert(PyMutex_IsLocked(&pending->mutex)); + _pending_call *call = NULL; + if (pending->freelist_num) { + call = pending->freelist; + assert(call != NULL); + pending->freelist = call->next; + pending->freelist_num--; } - assert(pending->npending < pending->max); - - int i = pending->next; - assert(pending->calls[i].func == NULL); - - pending->calls[i].func = func; - pending->calls[i].arg = arg; - pending->calls[i].flags = flags; - - assert(pending->npending < PENDINGCALLSARRAYSIZE); + else { + call = PyMem_RawMalloc(sizeof(*call)); + if (call == NULL) { + return -1; + } + } + call->arg = arg; + call->func = func; + call->flags = flags; + call->next = pending->head; + pending->head = call; _Py_atomic_add_int32(&pending->npending, 1); + return 0; +} - pending->next = (i + 1) % PENDINGCALLSARRAYSIZE; - assert(pending->next != pending->first - || pending->npending == pending->max); - - return _Py_ADD_PENDING_SUCCESS; +/* Pop one item off the queue while holding the lock. */ +static void +pop_pending_call(struct _pending_calls *pending, + int (**func)(void *), void **arg, int *flags) +{ + assert(PyMutex_IsLocked(&pending->mutex)); + _pending_call *call = pending->head; + if (call) { + assert(pending->npending > 0); + pending->head = call->next; + *func = call->func; + *arg = call->arg; + *flags = call->flags; + if (pending->freelist_num <= _Py_PENDING_CALLS_FREELIST_SIZE) { + call->next = pending->freelist; + pending->freelist = call; + pending->freelist_num++; + } + else { + PyMem_RawFree(call); + } + _Py_atomic_add_int32(&pending->npending, -1); + } } -static int -_next_pending_call(struct _pending_calls *pending, - int (**func)(void *), void **arg, int *flags) +static void +clear_pending_freelist(struct _pending_calls *pending) { - int i = pending->first; - if (pending->npending == 0) { - /* Queue empty */ - assert(i == pending->next); - assert(pending->calls[i].func == NULL); - return -1; + _pending_call *call = pending->freelist; + while (call) { + _pending_call *next = call->next; + PyMem_RawFree(call); + call = next; } - *func = pending->calls[i].func; - *arg = pending->calls[i].arg; - *flags = pending->calls[i].flags; - return i; + pending->freelist = NULL; } -/* Pop one item off the queue while holding the lock. */ static void -_pop_pending_call(struct _pending_calls *pending, - int (**func)(void *), void **arg, int *flags) +fill_pending_freelist(struct _pending_calls *pending) { - int i = _next_pending_call(pending, func, arg, flags); - if (i >= 0) { - pending->calls[i] = (struct _pending_call){0}; - pending->first = (i + 1) % PENDINGCALLSARRAYSIZE; - assert(pending->npending > 0); - _Py_atomic_add_int32(&pending->npending, -1); + assert(pending->freelist == NULL); + for (int i = 0; i < _Py_PENDING_CALLS_FREELIST_SIZE; i++) { + _pending_call *call = PyMem_RawMalloc(sizeof(*call)); + if (call == NULL) { + return; + } + call->next = pending->freelist; + pending->freelist = call; } } @@ -770,7 +791,7 @@ _pop_pending_call(struct _pending_calls *pending, callback. */ -_Py_add_pending_call_result +int _PyEval_AddPendingCall(PyInterpreterState *interp, _Py_pending_call_func func, void *arg, int flags) { @@ -783,8 +804,7 @@ _PyEval_AddPendingCall(PyInterpreterState *interp, } PyMutex_Lock(&pending->mutex); - _Py_add_pending_call_result result = - _push_pending_call(pending, func, arg, flags); + int result = push_pending_call(pending, func, arg, flags); PyMutex_Unlock(&pending->mutex); if (main_only) { @@ -807,15 +827,7 @@ Py_AddPendingCall(_Py_pending_call_func func, void *arg) /* Legacy users of this API will continue to target the main thread (of the main interpreter). */ PyInterpreterState *interp = _PyInterpreterState_Main(); - _Py_add_pending_call_result r = - _PyEval_AddPendingCall(interp, func, arg, _Py_PENDING_MAINTHREADONLY); - if (r == _Py_ADD_PENDING_FULL) { - return -1; - } - else { - assert(r == _Py_ADD_PENDING_SUCCESS); - return 0; - } + return _PyEval_AddPendingCall(interp, func, arg, _Py_PENDING_MAINTHREADONLY); } static int @@ -839,15 +851,9 @@ _make_pending_calls(struct _pending_calls *pending, int32_t *p_npending) { int res = 0; int32_t npending = -1; - - assert(sizeof(pending->max) <= sizeof(size_t) - && ((size_t)pending->max) <= Py_ARRAY_LENGTH(pending->calls)); int32_t maxloop = pending->maxloop; - if (maxloop == 0) { - maxloop = pending->max; - } - assert(maxloop > 0 && maxloop <= pending->max); + assert(maxloop == MAXPENDINGCALLSLOOP || maxloop == MAXPENDINGCALLSLOOP_MAIN); /* perform a bounded number of calls, in case of recursion */ for (int i=0; imutex); - _pop_pending_call(pending, &func, &arg, &flags); + pop_pending_call(pending, &func, &arg, &flags); npending = pending->npending; PyMutex_Unlock(&pending->mutex); @@ -1032,6 +1038,8 @@ _Py_FinishPendingCalls(PyThreadState *tstate) npending_prev = npending; #endif } while (npending > 0); + + clear_pending_freelist(pending); } int @@ -1077,6 +1085,13 @@ Py_MakePendingCalls(void) void _PyEval_InitState(PyInterpreterState *interp) { + // Fill the pending freelist so that signal handlers can safely + // add pending calls without needing to allocate memory. + struct _pending_calls *pending = &interp->ceval.pending; + PyMutex_Lock(&pending->mutex); + fill_pending_freelist(pending); + PyMutex_Unlock(&pending->mutex); + _gil_initialize(&interp->_gil); }