Skip to content

gh-84570: Add Threading Timeout Helpers to Internal C-API #110948

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions Include/internal/pycore_pythread.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,27 @@ extern int _PyThread_at_fork_reinit(PyThread_type_lock *lock);
#endif /* HAVE_FORK */


// unset: -1 seconds, in nanoseconds
#define PyThread_UNSET_TIMEOUT ((_PyTime_t)(-1 * 1000 * 1000 * 1000))

// Exported for the _xxinterpchannels module.
PyAPI_FUNC(int) PyThread_ParseTimeoutArg(
PyObject *arg,
int blocking,
PY_TIMEOUT_T *timeout);

/* Helper to acquire an interruptible lock with a timeout. If the lock acquire
* is interrupted, signal handlers are run, and if they raise an exception,
* PY_LOCK_INTR is returned. Otherwise, PY_LOCK_ACQUIRED or PY_LOCK_FAILURE
* are returned, depending on whether the lock can be acquired within the
* timeout.
*/
// Exported for the _xxinterpchannels module.
PyAPI_FUNC(PyLockStatus) PyThread_acquire_lock_timed_with_retries(
PyThread_type_lock,
PY_TIMEOUT_T microseconds);


#ifdef __cplusplus
}
#endif
Expand Down
2 changes: 2 additions & 0 deletions Modules/_queuemodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
PY_TIMEOUT_T microseconds;
PyThreadState *tstate = PyThreadState_Get();

// XXX Use PyThread_ParseTimeoutArg().

if (block == 0) {
/* Non-blocking */
microseconds = 0;
Expand Down
63 changes: 8 additions & 55 deletions Modules/_threadmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
/* Interface to Sjoerd's portable C thread library */

#include "Python.h"
#include "pycore_ceval.h" // _PyEval_MakePendingCalls()
#include "pycore_dict.h" // _PyDict_Pop()
#include "pycore_interp.h" // _PyInterpreterState.threads.count
#include "pycore_moduleobject.h" // _PyModule_GetState()
Expand Down Expand Up @@ -76,57 +75,10 @@ lock_dealloc(lockobject *self)
Py_DECREF(tp);
}

/* Helper to acquire an interruptible lock with a timeout. If the lock acquire
* is interrupted, signal handlers are run, and if they raise an exception,
* PY_LOCK_INTR is returned. Otherwise, PY_LOCK_ACQUIRED or PY_LOCK_FAILURE
* are returned, depending on whether the lock can be acquired within the
* timeout.
*/
static PyLockStatus
static inline PyLockStatus
acquire_timed(PyThread_type_lock lock, _PyTime_t timeout)
{
PyThreadState *tstate = _PyThreadState_GET();
_PyTime_t endtime = 0;
if (timeout > 0) {
endtime = _PyDeadline_Init(timeout);
}

PyLockStatus r;
do {
_PyTime_t microseconds;
microseconds = _PyTime_AsMicroseconds(timeout, _PyTime_ROUND_CEILING);

/* first a simple non-blocking try without releasing the GIL */
r = PyThread_acquire_lock_timed(lock, 0, 0);
if (r == PY_LOCK_FAILURE && microseconds != 0) {
Py_BEGIN_ALLOW_THREADS
r = PyThread_acquire_lock_timed(lock, microseconds, 1);
Py_END_ALLOW_THREADS
}

if (r == PY_LOCK_INTR) {
/* Run signal handlers if we were interrupted. Propagate
* exceptions from signal handlers, such as KeyboardInterrupt, by
* passing up PY_LOCK_INTR. */
if (_PyEval_MakePendingCalls(tstate) < 0) {
return PY_LOCK_INTR;
}

/* If we're using a timeout, recompute the timeout after processing
* signals, since those can take time. */
if (timeout > 0) {
timeout = _PyDeadline_Get(endtime);

/* Check for negative values, since those mean block forever.
*/
if (timeout < 0) {
r = PY_LOCK_FAILURE;
}
}
}
} while (r == PY_LOCK_INTR); /* Retry if we were interrupted. */

return r;
return PyThread_acquire_lock_timed_with_retries(lock, timeout);
}

static int
Expand All @@ -136,14 +88,15 @@ lock_acquire_parse_args(PyObject *args, PyObject *kwds,
char *kwlist[] = {"blocking", "timeout", NULL};
int blocking = 1;
PyObject *timeout_obj = NULL;
const _PyTime_t unset_timeout = _PyTime_FromSeconds(-1);

*timeout = unset_timeout ;

if (!PyArg_ParseTupleAndKeywords(args, kwds, "|pO:acquire", kwlist,
&blocking, &timeout_obj))
return -1;

// XXX Use PyThread_ParseTimeoutArg().

const _PyTime_t unset_timeout = _PyTime_FromSeconds(-1);
*timeout = unset_timeout;

if (timeout_obj
&& _PyTime_FromSecondsObject(timeout,
timeout_obj, _PyTime_ROUND_TIMEOUT) < 0)
Expand All @@ -156,7 +109,7 @@ lock_acquire_parse_args(PyObject *args, PyObject *kwds,
}
if (*timeout < 0 && *timeout != unset_timeout) {
PyErr_SetString(PyExc_ValueError,
"timeout value must be positive");
"timeout value must be a non-negative number");
return -1;
}
if (!blocking)
Expand Down
84 changes: 84 additions & 0 deletions Python/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Stuff shared by all thread_*.h files is collected here. */

#include "Python.h"
#include "pycore_ceval.h" // _PyEval_MakePendingCalls()
#include "pycore_pystate.h" // _PyInterpreterState_GET()
#include "pycore_structseq.h" // _PyStructSequence_FiniBuiltin()
#include "pycore_pythread.h" // _POSIX_THREADS
Expand Down Expand Up @@ -92,6 +93,89 @@ PyThread_set_stacksize(size_t size)
}


int
PyThread_ParseTimeoutArg(PyObject *arg, int blocking, PY_TIMEOUT_T *timeout_p)
{
assert(_PyTime_FromSeconds(-1) == PyThread_UNSET_TIMEOUT);
if (arg == NULL || arg == Py_None) {
*timeout_p = blocking ? PyThread_UNSET_TIMEOUT : 0;
return 0;
}
if (!blocking) {
PyErr_SetString(PyExc_ValueError,
"can't specify a timeout for a non-blocking call");
return -1;
}

_PyTime_t timeout;
if (_PyTime_FromSecondsObject(&timeout, arg, _PyTime_ROUND_TIMEOUT) < 0) {
return -1;
}
if (timeout < 0) {
PyErr_SetString(PyExc_ValueError,
"timeout value must be a non-negative number");
return -1;
}

if (_PyTime_AsMicroseconds(timeout,
_PyTime_ROUND_TIMEOUT) > PY_TIMEOUT_MAX) {
PyErr_SetString(PyExc_OverflowError,
"timeout value is too large");
return -1;
}
*timeout_p = timeout;
return 0;
}

PyLockStatus
PyThread_acquire_lock_timed_with_retries(PyThread_type_lock lock,
PY_TIMEOUT_T timeout)
{
PyThreadState *tstate = _PyThreadState_GET();
_PyTime_t endtime = 0;
if (timeout > 0) {
endtime = _PyDeadline_Init(timeout);
}

PyLockStatus r;
do {
_PyTime_t microseconds;
microseconds = _PyTime_AsMicroseconds(timeout, _PyTime_ROUND_CEILING);

/* first a simple non-blocking try without releasing the GIL */
r = PyThread_acquire_lock_timed(lock, 0, 0);
if (r == PY_LOCK_FAILURE && microseconds != 0) {
Py_BEGIN_ALLOW_THREADS
r = PyThread_acquire_lock_timed(lock, microseconds, 1);
Py_END_ALLOW_THREADS
}

if (r == PY_LOCK_INTR) {
/* Run signal handlers if we were interrupted. Propagate
* exceptions from signal handlers, such as KeyboardInterrupt, by
* passing up PY_LOCK_INTR. */
if (_PyEval_MakePendingCalls(tstate) < 0) {
return PY_LOCK_INTR;
}

/* If we're using a timeout, recompute the timeout after processing
* signals, since those can take time. */
if (timeout > 0) {
timeout = _PyDeadline_Get(endtime);

/* Check for negative values, since those mean block forever.
*/
if (timeout < 0) {
r = PY_LOCK_FAILURE;
}
}
}
} while (r == PY_LOCK_INTR); /* Retry if we were interrupted. */

return r;
}


/* Thread Specific Storage (TSS) API

Cross-platform components of TSS API implementation.
Expand Down