Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-115103: Implement delayed free mechanism for free-threaded builds #115367

Merged
merged 2 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Include/internal/pycore_interp.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ struct _is {

struct _Py_dict_state dict_state;
struct _Py_exc_state exc_state;
struct _Py_mem_interp_free_queue mem_free_queue;

struct ast_state ast;
struct types_state types;
Expand Down
19 changes: 19 additions & 0 deletions Include/internal/pycore_pymem.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef Py_INTERNAL_PYMEM_H
#define Py_INTERNAL_PYMEM_H

#include "pycore_llist.h" // struct llist_node
#include "pycore_lock.h" // PyMutex

#ifdef __cplusplus
Expand Down Expand Up @@ -48,6 +49,11 @@ struct _pymem_allocators {
PyObjectArenaAllocator obj_arena;
};

struct _Py_mem_interp_free_queue {
int has_work; // true if the queue is not empty
PyMutex mutex; // protects the queue
struct llist_node head; // queue of _mem_work_chunk items
};

/* Set the memory allocator of the specified domain to the default.
Save the old allocator into *old_alloc if it's non-NULL.
Expand Down Expand Up @@ -110,6 +116,19 @@ extern int _PyMem_SetupAllocators(PyMemAllocatorName allocator);
/* Is the debug allocator enabled? */
extern int _PyMem_DebugEnabled(void);

// Enqueue a pointer to be freed possibly after some delay.
extern void _PyMem_FreeDelayed(void *ptr);

// Periodically process delayed free requests.
extern void _PyMem_ProcessDelayed(PyThreadState *tstate);

// Abandon all thread-local delayed free requests and push them to the
// interpreter's queue.
extern void _PyMem_AbandonDelayed(PyThreadState *tstate);

// On interpreter shutdown, frees all delayed free requests.
extern void _PyMem_FiniDelayed(PyInterpreterState *interp);

#ifdef __cplusplus
}
#endif
Expand Down
5 changes: 5 additions & 0 deletions Include/internal/pycore_pymem_init.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ extern void _PyMem_ArenaFree(void *, void *, size_t);
{ NULL, _PyMem_ArenaAlloc, _PyMem_ArenaFree }


#define _Py_mem_free_queue_INIT(queue) \
{ \
.head = LLIST_INIT(queue.head), \
}

#ifdef __cplusplus
}
#endif
Expand Down
1 change: 1 addition & 0 deletions Include/internal/pycore_runtime_init.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ extern PyTypeObject _PyExc_MemoryError;
}, \
.dtoa = _dtoa_state_INIT(&(INTERP)), \
.dict_state = _dict_state_INIT, \
.mem_free_queue = _Py_mem_free_queue_INIT(INTERP.mem_free_queue), \
.func_state = { \
.next_version = 1, \
}, \
Expand Down
1 change: 1 addition & 0 deletions Include/internal/pycore_tstate.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ typedef struct _PyThreadStateImpl {
PyThreadState base;

struct _qsbr_thread_state *qsbr; // only used by free-threaded build
struct llist_node mem_free_queue; // delayed free queue

#ifdef Py_GIL_DISABLED
struct _gc_thread_state gc;
Expand Down
190 changes: 190 additions & 0 deletions Objects/obmalloc.c
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,196 @@ _PyMem_Strdup(const char *str)
return copy;
}

/***********************************************/
/* Delayed freeing support for Py_GIL_DISABLED */
/***********************************************/

// So that sizeof(struct _mem_work_chunk) is 4096 bytes on 64-bit platforms.
#define WORK_ITEMS_PER_CHUNK 254

// A pointer to be freed once the QSBR read sequence reaches qsbr_goal.
struct _mem_work_item {
void *ptr;
uint64_t qsbr_goal;
};

// A fixed-size buffer of pointers to be freed
struct _mem_work_chunk {
// Linked list node of chunks in queue
struct llist_node node;

Py_ssize_t rd_idx; // index of next item to read
Py_ssize_t wr_idx; // index of next item to write
struct _mem_work_item array[WORK_ITEMS_PER_CHUNK];
};

void
_PyMem_FreeDelayed(void *ptr)
{
#ifndef Py_GIL_DISABLED
PyMem_Free(ptr);
#else
if (_PyRuntime.stoptheworld.world_stopped) {
// Free immediately if the world is stopped, including during
// interpreter shutdown.
PyMem_Free(ptr);
return;
}

_PyThreadStateImpl *tstate = (_PyThreadStateImpl *)_PyThreadState_GET();
struct llist_node *head = &tstate->mem_free_queue;

struct _mem_work_chunk *buf = NULL;
if (!llist_empty(head)) {
// Try to re-use the last buffer
buf = llist_data(head->prev, struct _mem_work_chunk, node);
if (buf->wr_idx == WORK_ITEMS_PER_CHUNK) {
// already full
buf = NULL;
}
}

if (buf == NULL) {
buf = PyMem_Calloc(1, sizeof(*buf));
if (buf != NULL) {
llist_insert_tail(head, &buf->node);
}
}

if (buf == NULL) {
// failed to allocate a buffer, free immediately
_PyEval_StopTheWorld(tstate->base.interp);
PyMem_Free(ptr);
_PyEval_StartTheWorld(tstate->base.interp);
return;
}

assert(buf != NULL && buf->wr_idx < WORK_ITEMS_PER_CHUNK);
uint64_t seq = _Py_qsbr_deferred_advance(tstate->qsbr);
buf->array[buf->wr_idx].ptr = ptr;
buf->array[buf->wr_idx].qsbr_goal = seq;
buf->wr_idx++;

if (buf->wr_idx == WORK_ITEMS_PER_CHUNK) {
_PyMem_ProcessDelayed((PyThreadState *)tstate);
}
#endif
}

static struct _mem_work_chunk *
work_queue_first(struct llist_node *head)
{
return llist_data(head->next, struct _mem_work_chunk, node);
}

static void
process_queue(struct llist_node *head, struct _qsbr_thread_state *qsbr,
bool keep_empty)
{
while (!llist_empty(head)) {
struct _mem_work_chunk *buf = work_queue_first(head);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we'e not just doing something like:
for (Py_ssize_t i = buf->rd_idx; i < buf->wr_idx; i++) {
...
}

Or the while loop like _PyMem_FiniDelayed with the polling from here added in vs just going back through the head of the list each time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the loop to avoid grabbing the head every iteration


while (buf->rd_idx < buf->wr_idx) {
struct _mem_work_item *item = &buf->array[buf->rd_idx];
if (!_Py_qsbr_poll(qsbr, item->qsbr_goal)) {
return;
}

PyMem_Free(item->ptr);
buf->rd_idx++;
}

assert(buf->rd_idx == buf->wr_idx);
if (keep_empty && buf->node.next == head) {
// Keep the last buffer in the queue to reduce re-allocations
buf->rd_idx = buf->wr_idx = 0;
return;
}

llist_remove(&buf->node);
PyMem_Free(buf);
}
}

static void
process_interp_queue(struct _Py_mem_interp_free_queue *queue,
struct _qsbr_thread_state *qsbr)
{
if (!_Py_atomic_load_int_relaxed(&queue->has_work)) {
return;
}

// Try to acquire the lock, but don't block if it's already held.
if (_PyMutex_LockTimed(&queue->mutex, 0, 0) == PY_LOCK_ACQUIRED) {
process_queue(&queue->head, qsbr, false);

int more_work = !llist_empty(&queue->head);
_Py_atomic_store_int_relaxed(&queue->has_work, more_work);

PyMutex_Unlock(&queue->mutex);
}
}

void
_PyMem_ProcessDelayed(PyThreadState *tstate)
{
PyInterpreterState *interp = tstate->interp;
_PyThreadStateImpl *tstate_impl = (_PyThreadStateImpl *)tstate;

// Process thread-local work
process_queue(&tstate_impl->mem_free_queue, tstate_impl->qsbr, true);

// Process shared interpreter work
process_interp_queue(&interp->mem_free_queue, tstate_impl->qsbr);
}

void
_PyMem_AbandonDelayed(PyThreadState *tstate)
{
PyInterpreterState *interp = tstate->interp;
struct llist_node *queue = &((_PyThreadStateImpl *)tstate)->mem_free_queue;

if (llist_empty(queue)) {
return;
}

// Check if the queue contains one empty buffer
struct _mem_work_chunk *buf = work_queue_first(queue);
if (buf->rd_idx == buf->wr_idx) {
llist_remove(&buf->node);
PyMem_Free(buf);
assert(llist_empty(queue));
return;
}

// Merge the thread's work queue into the interpreter's work queue.
PyMutex_Lock(&interp->mem_free_queue.mutex);
llist_concat(&interp->mem_free_queue.head, queue);
_Py_atomic_store_int_relaxed(&interp->mem_free_queue.has_work, 1);
PyMutex_Unlock(&interp->mem_free_queue.mutex);

assert(llist_empty(queue)); // the thread's queue is now empty
}

void
_PyMem_FiniDelayed(PyInterpreterState *interp)
{
struct llist_node *head = &interp->mem_free_queue.head;
while (!llist_empty(head)) {
struct _mem_work_chunk *buf = work_queue_first(head);

while (buf->rd_idx < buf->wr_idx) {
// Free the remaining items immediately. There should be no other
// threads accessing the memory at this point during shutdown.
struct _mem_work_item *item = &buf->array[buf->rd_idx];
PyMem_Free(item->ptr);
buf->rd_idx++;
}

llist_remove(&buf->node);
PyMem_Free(buf);
}
}

/**************************/
/* the "object" allocator */
Expand Down
3 changes: 3 additions & 0 deletions Python/pylifecycle.c
Original file line number Diff line number Diff line change
Expand Up @@ -1834,6 +1834,9 @@ finalize_interp_clear(PyThreadState *tstate)

finalize_interp_types(tstate->interp);

/* Free any delayed free requests immediately */
_PyMem_FiniDelayed(tstate->interp);

/* finalize_interp_types may allocate Python objects so we may need to
abandon mimalloc segments again */
_PyThreadState_ClearMimallocHeaps(tstate);
Expand Down
6 changes: 6 additions & 0 deletions Python/pystate.c
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,7 @@ init_interpreter(PyInterpreterState *interp,
#ifdef Py_GIL_DISABLED
_Py_brc_init_state(interp);
#endif
llist_init(&interp->mem_free_queue.head);
for (int i = 0; i < _PY_MONITORING_UNGROUPED_EVENTS; i++) {
interp->monitors.tools[i] = 0;
}
Expand Down Expand Up @@ -1351,6 +1352,7 @@ init_threadstate(_PyThreadStateImpl *_tstate,
// Initialize biased reference counting inter-thread queue
_Py_brc_init_thread(tstate);
#endif
llist_init(&_tstate->mem_free_queue);

if (interp->stoptheworld.requested || _PyRuntime.stoptheworld.requested) {
// Start in the suspended state if there is an ongoing stop-the-world.
Expand Down Expand Up @@ -1572,6 +1574,7 @@ PyThreadState_Clear(PyThreadState *tstate)
// don't call _PyInterpreterState_SetNotRunningMain() yet.
tstate->on_delete(tstate->on_delete_data);
}

#ifdef Py_GIL_DISABLED
// Each thread should clear own freelists in free-threading builds.
struct _Py_object_freelists *freelists = _Py_object_freelists_GET();
Expand All @@ -1581,6 +1584,9 @@ PyThreadState_Clear(PyThreadState *tstate)
_Py_brc_remove_thread(tstate);
#endif

// Merge our queue of pointers to be freed into the interpreter queue.
_PyMem_AbandonDelayed(tstate);

_PyThreadState_ClearMimallocHeaps(tstate);

tstate->_status.cleared = 1;
Expand Down