Skip to content

Commit

Permalink
use linked list
Browse files Browse the repository at this point in the history
  • Loading branch information
kumaraditya303 authored Oct 15, 2024
1 parent 9c2bb7d commit 74ac0c7
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 100 deletions.
6 changes: 1 addition & 5 deletions Include/internal/pycore_ceval.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,8 @@ extern void _PyEval_SignalReceived(void);
#define _Py_PENDING_MAINTHREADONLY 1
#define _Py_PENDING_RAWFREE 2

typedef int _Py_add_pending_call_result;
#define _Py_ADD_PENDING_SUCCESS 0
#define _Py_ADD_PENDING_FULL -1

// Export for '_testinternalcapi' shared extension
PyAPI_FUNC(_Py_add_pending_call_result) _PyEval_AddPendingCall(
PyAPI_FUNC(int) _PyEval_AddPendingCall(
PyInterpreterState *interp,
_Py_pending_call_func func,
void *arg,
Expand Down
35 changes: 11 additions & 24 deletions Include/internal/pycore_ceval_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,49 +14,38 @@ extern "C" {

typedef int (*_Py_pending_call_func)(void *);

struct _pending_call {
typedef struct _pending_call {
_Py_pending_call_func func;
void *arg;
int flags;
};

#define PENDINGCALLSARRAYSIZE 300
struct _pending_call *next;
} _pending_call;

#define MAXPENDINGCALLS PENDINGCALLSARRAYSIZE
/* For interpreter-level pending calls, we want to avoid spending too
much time on pending calls in any one thread, so we apply a limit. */
#if MAXPENDINGCALLS > 100
# define MAXPENDINGCALLSLOOP 100
#else
# define MAXPENDINGCALLSLOOP MAXPENDINGCALLS
#endif
#define MAXPENDINGCALLSLOOP 100

/* We keep the number small to preserve as much compatibility
as possible with earlier versions. */
#define MAXPENDINGCALLS_MAIN 32
/* For the main thread, we want to make sure all pending calls are
run at once, for the sake of prompt signal handling. This is
unlikely to cause any problems since there should be very few
pending calls for the main thread. */
#define MAXPENDINGCALLSLOOP_MAIN 0
#define MAXPENDINGCALLSLOOP_MAIN INT32_MAX

struct _pending_calls {
PyThreadState *handling_thread;
PyMutex mutex;
/* Request for running pending calls. */
/* The number of pending calls. */
int32_t npending;
/* The maximum allowed number of pending calls.
If the queue fills up to this point then _PyEval_AddPendingCall()
will return _Py_ADD_PENDING_FULL. */
int32_t max;
/* We don't want a flood of pending calls to interrupt any one thread
for too long, so we keep a limit on the number handled per pass.
A value of 0 means there is no limit (other than the maximum
size of the list of pending calls). */
int32_t maxloop;
struct _pending_call calls[PENDINGCALLSARRAYSIZE];
int first;
int next;
_pending_call *head;

#define _Py_PENDING_CALLS_FREELIST_SIZE 100
_pending_call *freelist;
size_t freelist_num;
};


Expand Down Expand Up @@ -97,8 +86,6 @@ struct _ceval_runtime_state {
/* Pending calls to be made only on the main thread. */
// The signal machinery falls back on this
// so it must be especially stable and efficient.
// For example, we use a preallocated array
// for the list of pending calls.
struct _pending_calls pending_mainthread;
PyMutex sys_trace_profile_mutex;
};
Expand Down
2 changes: 0 additions & 2 deletions Include/internal/pycore_runtime_init.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ extern PyTypeObject _PyExc_MemoryError;
.parser = _parser_runtime_state_INIT, \
.ceval = { \
.pending_mainthread = { \
.max = MAXPENDINGCALLS_MAIN, \
.maxloop = MAXPENDINGCALLSLOOP_MAIN, \
}, \
.perf = _PyEval_RUNTIME_PERF_INIT, \
Expand Down Expand Up @@ -223,7 +222,6 @@ extern PyTypeObject _PyExc_MemoryError;
.ceval = { \
.recursion_limit = Py_DEFAULT_RECURSION_LIMIT, \
.pending = { \
.max = MAXPENDINGCALLS, \
.maxloop = MAXPENDINGCALLSLOOP, \
}, \
}, \
Expand Down
1 change: 1 addition & 0 deletions Lib/test/test_capi/test_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1678,6 +1678,7 @@ def test_max_pending(self):
l = []
added = self.pendingcalls_submit(l, maxpending+1, main=False)
self.pendingcalls_wait(l, added)
breakpoint()
self.assertEqual(added, maxpending)

class PendingTask(types.SimpleNamespace):
Expand Down
14 changes: 5 additions & 9 deletions Modules/_testinternalcapi.c
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,7 @@ pending_threadfunc(PyObject *self, PyObject *args, PyObject *kwargs)
}
PyInterpreterState *interp = _PyInterpreterState_GET();

/* create the reference for the callbackwhile we hold the lock */
/* create the reference for the callback while we hold the lock */
for (unsigned int i = 0; i < num; i++) {
Py_INCREF(callable);
}
Expand All @@ -1096,12 +1096,10 @@ pending_threadfunc(PyObject *self, PyObject *args, PyObject *kwargs)
unsigned int num_added = 0;
for (; num_added < num; num_added++) {
if (ensure_added) {
_Py_add_pending_call_result r;
int r;
do {
r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0);
assert(r == _Py_ADD_PENDING_SUCCESS
|| r == _Py_ADD_PENDING_FULL);
} while (r == _Py_ADD_PENDING_FULL);
} while (r < 0);
}
else {
if (_PyEval_AddPendingCall(interp, &_pending_callback, callable, 0) < 0) {
Expand Down Expand Up @@ -1162,16 +1160,14 @@ pending_identify(PyObject *self, PyObject *args)
PyThread_acquire_lock(mutex, WAIT_LOCK);
/* It gets released in _pending_identify_callback(). */

_Py_add_pending_call_result r;
int r;
do {
Py_BEGIN_ALLOW_THREADS
r = _PyEval_AddPendingCall(interp,
&_pending_identify_callback, (void *)mutex,
0);
Py_END_ALLOW_THREADS
assert(r == _Py_ADD_PENDING_SUCCESS
|| r == _Py_ADD_PENDING_FULL);
} while (r == _Py_ADD_PENDING_FULL);
} while (r < 0);

/* Wait for the pending call to complete. */
PyThread_acquire_lock(mutex, WAIT_LOCK);
Expand Down
Empty file.
138 changes: 78 additions & 60 deletions Python/ceval_gil.c
Original file line number Diff line number Diff line change
Expand Up @@ -709,59 +709,85 @@ signal_active_thread(PyInterpreterState *interp, uintptr_t bit)

/* Push one item onto the queue while holding the lock. */
static int
_push_pending_call(struct _pending_calls *pending,
push_pending_call(struct _pending_calls *pending,
_Py_pending_call_func func, void *arg, int flags)
{
if (pending->npending == pending->max) {
return _Py_ADD_PENDING_FULL;
assert(PyMutex_IsLocked(&pending->mutex));
_pending_call *call = NULL;
if (pending->freelist_num) {
call = pending->freelist;
assert(call != NULL);
pending->freelist = call->next;
pending->freelist_num--;
}
assert(pending->npending < pending->max);

int i = pending->next;
assert(pending->calls[i].func == NULL);

pending->calls[i].func = func;
pending->calls[i].arg = arg;
pending->calls[i].flags = flags;

assert(pending->npending < PENDINGCALLSARRAYSIZE);
else {
call = PyMem_RawMalloc(sizeof(*call));
if (call == NULL) {
return -1;
}
}
call->arg = arg;
call->func = func;
call->flags = flags;
call->next = NULL;

_pending_call **head = &pending->head;
while (*head) {
head = &(*head)->next;
}
*head = call;
_Py_atomic_add_int32(&pending->npending, 1);
return 0;
}

pending->next = (i + 1) % PENDINGCALLSARRAYSIZE;
assert(pending->next != pending->first
|| pending->npending == pending->max);

return _Py_ADD_PENDING_SUCCESS;
/* Pop one item off the queue while holding the lock. */
static void
pop_pending_call(struct _pending_calls *pending,
int (**func)(void *), void **arg, int *flags)
{
assert(PyMutex_IsLocked(&pending->mutex));
_pending_call *call = pending->head;
if (call) {
assert(pending->npending > 0);
pending->head = call->next;
*func = call->func;
*arg = call->arg;
*flags = call->flags;
if (pending->freelist_num < _Py_PENDING_CALLS_FREELIST_SIZE) {
call->next = pending->freelist;
pending->freelist = call;
pending->freelist_num++;
}
else {
PyMem_RawFree(call);
}
_Py_atomic_add_int32(&pending->npending, -1);
}
}

static int
_next_pending_call(struct _pending_calls *pending,
int (**func)(void *), void **arg, int *flags)
static void
clear_pending_freelist(struct _pending_calls *pending)
{
int i = pending->first;
if (pending->npending == 0) {
/* Queue empty */
assert(i == pending->next);
assert(pending->calls[i].func == NULL);
return -1;
_pending_call *call = pending->freelist;
while (call) {
_pending_call *next = call->next;
PyMem_RawFree(call);
call = next;
}
*func = pending->calls[i].func;
*arg = pending->calls[i].arg;
*flags = pending->calls[i].flags;
return i;
pending->freelist = NULL;
}

/* Pop one item off the queue while holding the lock. */
static void
_pop_pending_call(struct _pending_calls *pending,
int (**func)(void *), void **arg, int *flags)
fill_pending_freelist(struct _pending_calls *pending)
{
int i = _next_pending_call(pending, func, arg, flags);
if (i >= 0) {
pending->calls[i] = (struct _pending_call){0};
pending->first = (i + 1) % PENDINGCALLSARRAYSIZE;
assert(pending->npending > 0);
_Py_atomic_add_int32(&pending->npending, -1);
assert(pending->freelist == NULL);
for (int i = 0; i < _Py_PENDING_CALLS_FREELIST_SIZE; i++) {
_pending_call *call = PyMem_RawMalloc(sizeof(*call));
if (call == NULL) {
return;
}
call->next = pending->freelist;
pending->freelist = call;
}
}

Expand All @@ -770,7 +796,7 @@ _pop_pending_call(struct _pending_calls *pending,
callback.
*/

_Py_add_pending_call_result
int
_PyEval_AddPendingCall(PyInterpreterState *interp,
_Py_pending_call_func func, void *arg, int flags)
{
Expand All @@ -783,8 +809,7 @@ _PyEval_AddPendingCall(PyInterpreterState *interp,
}

PyMutex_Lock(&pending->mutex);
_Py_add_pending_call_result result =
_push_pending_call(pending, func, arg, flags);
int result = push_pending_call(pending, func, arg, flags);
PyMutex_Unlock(&pending->mutex);

if (main_only) {
Expand All @@ -807,15 +832,7 @@ Py_AddPendingCall(_Py_pending_call_func func, void *arg)
/* Legacy users of this API will continue to target the main thread
(of the main interpreter). */
PyInterpreterState *interp = _PyInterpreterState_Main();
_Py_add_pending_call_result r =
_PyEval_AddPendingCall(interp, func, arg, _Py_PENDING_MAINTHREADONLY);
if (r == _Py_ADD_PENDING_FULL) {
return -1;
}
else {
assert(r == _Py_ADD_PENDING_SUCCESS);
return 0;
}
return _PyEval_AddPendingCall(interp, func, arg, _Py_PENDING_MAINTHREADONLY);
}

static int
Expand All @@ -840,14 +857,8 @@ _make_pending_calls(struct _pending_calls *pending, int32_t *p_npending)
int res = 0;
int32_t npending = -1;

assert(sizeof(pending->max) <= sizeof(size_t)
&& ((size_t)pending->max) <= Py_ARRAY_LENGTH(pending->calls));
int32_t maxloop = pending->maxloop;
if (maxloop == 0) {
maxloop = pending->max;
}
assert(maxloop > 0 && maxloop <= pending->max);

assert(maxloop > 0);
/* perform a bounded number of calls, in case of recursion */
for (int i=0; i<maxloop; i++) {
_Py_pending_call_func func = NULL;
Expand All @@ -856,7 +867,7 @@ _make_pending_calls(struct _pending_calls *pending, int32_t *p_npending)

/* pop one item off the queue while holding the lock */
PyMutex_Lock(&pending->mutex);
_pop_pending_call(pending, &func, &arg, &flags);
pop_pending_call(pending, &func, &arg, &flags);
npending = pending->npending;
PyMutex_Unlock(&pending->mutex);

Expand Down Expand Up @@ -1032,6 +1043,8 @@ _Py_FinishPendingCalls(PyThreadState *tstate)
npending_prev = npending;
#endif
} while (npending > 0);

clear_pending_freelist(pending);
}

int
Expand Down Expand Up @@ -1077,6 +1090,11 @@ Py_MakePendingCalls(void)
void
_PyEval_InitState(PyInterpreterState *interp)
{
struct _pending_calls *pending = &interp->ceval.pending;
PyMutex_Lock(&pending->mutex);
fill_pending_freelist(pending);
PyMutex_Unlock(&pending->mutex);

_gil_initialize(&interp->_gil);
}

Expand Down

0 comments on commit 74ac0c7

Please sign in to comment.