Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GDScript: Avoid deadlock possibility in multi-threaded load #93032

Merged
merged 2 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 71 additions & 20 deletions core/object/worker_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
#include "core/object/script_language.h"
#include "core/os/os.h"
#include "core/os/thread_safe.h"
#include "core/templates/command_queue_mt.h"

WorkerThreadPool::Task *const WorkerThreadPool::ThreadData::YIELDING = (Task *)1;

Expand All @@ -46,7 +45,9 @@ void WorkerThreadPool::Task::free_template_userdata() {

WorkerThreadPool *WorkerThreadPool::singleton = nullptr;

thread_local CommandQueueMT *WorkerThreadPool::flushing_cmd_queue = nullptr;
#ifdef THREADS_ENABLED
thread_local uintptr_t WorkerThreadPool::unlockable_mutexes[MAX_UNLOCKABLE_MUTEXES] = {};
#endif

void WorkerThreadPool::_process_task(Task *p_task) {
#ifdef THREADS_ENABLED
Expand Down Expand Up @@ -416,13 +417,42 @@ Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) {
return OK;
}

void WorkerThreadPool::_lock_unlockable_mutexes() {
#ifdef THREADS_ENABLED
for (uint32_t i = 0; i < MAX_UNLOCKABLE_MUTEXES; i++) {
if (unlockable_mutexes[i]) {
if ((((uintptr_t)unlockable_mutexes[i]) & 1) == 0) {
((Mutex *)unlockable_mutexes[i])->lock();
} else {
((BinaryMutex *)unlockable_mutexes[i])->lock();
}
}
}
#endif
}

void WorkerThreadPool::_unlock_unlockable_mutexes() {
#ifdef THREADS_ENABLED
for (uint32_t i = 0; i < MAX_UNLOCKABLE_MUTEXES; i++) {
if (unlockable_mutexes[i]) {
if ((((uintptr_t)unlockable_mutexes[i]) & 1) == 0) {
((Mutex *)unlockable_mutexes[i])->unlock();
} else {
((BinaryMutex *)unlockable_mutexes[i])->unlock();
}
}
}
#endif
}

void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task) {
// Keep processing tasks until the condition to stop waiting is met.

#define IS_WAIT_OVER (unlikely(p_task == ThreadData::YIELDING) ? p_caller_pool_thread->yield_is_over : p_task->completed)

while (true) {
Task *task_to_process = nullptr;
bool relock_unlockables = false;
{
MutexLock lock(task_mutex);
bool was_signaled = p_caller_pool_thread->signaled;
Expand Down Expand Up @@ -460,20 +490,20 @@ void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, T
if (!task_to_process) {
p_caller_pool_thread->awaited_task = p_task;

if (flushing_cmd_queue) {
flushing_cmd_queue->unlock();
}
_unlock_unlockable_mutexes();
relock_unlockables = true;
p_caller_pool_thread->cond_var.wait(lock);
if (flushing_cmd_queue) {
flushing_cmd_queue->lock();
}

DEV_ASSERT(exit_threads || p_caller_pool_thread->signaled || IS_WAIT_OVER);
p_caller_pool_thread->awaited_task = nullptr;
}
}
}

if (relock_unlockables) {
_lock_unlockable_mutexes();
}

if (task_to_process) {
_process_task(task_to_process);
}
Expand Down Expand Up @@ -600,13 +630,9 @@ void WorkerThreadPool::wait_for_group_task_completion(GroupID p_group) {
{
Group *group = *groupp;

if (flushing_cmd_queue) {
flushing_cmd_queue->unlock();
}
_unlock_unlockable_mutexes();
group->done_semaphore.wait();
if (flushing_cmd_queue) {
flushing_cmd_queue->lock();
}
_lock_unlockable_mutexes();

uint32_t max_users = group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment.
uint32_t finished_users = group->finished.increment(); // fetch happens before inc, so increment later.
Expand All @@ -630,16 +656,41 @@ int WorkerThreadPool::get_thread_index() {
return singleton->thread_ids.has(tid) ? singleton->thread_ids[tid] : -1;
}

void WorkerThreadPool::thread_enter_command_queue_mt_flush(CommandQueueMT *p_queue) {
ERR_FAIL_COND(flushing_cmd_queue != nullptr);
flushing_cmd_queue = p_queue;
#ifdef THREADS_ENABLED
uint32_t WorkerThreadPool::thread_enter_unlock_allowance_zone(Mutex *p_mutex) {
return _thread_enter_unlock_allowance_zone(p_mutex, false);
}

uint32_t WorkerThreadPool::thread_enter_unlock_allowance_zone(BinaryMutex *p_mutex) {
return _thread_enter_unlock_allowance_zone(p_mutex, true);
}

void WorkerThreadPool::thread_exit_command_queue_mt_flush() {
ERR_FAIL_NULL(flushing_cmd_queue);
flushing_cmd_queue = nullptr;
uint32_t WorkerThreadPool::_thread_enter_unlock_allowance_zone(void *p_mutex, bool p_is_binary) {
for (uint32_t i = 0; i < MAX_UNLOCKABLE_MUTEXES; i++) {
if (unlikely(unlockable_mutexes[i] == (uintptr_t)p_mutex)) {
// Already registered in the current thread.
return UINT32_MAX;
}
if (!unlockable_mutexes[i]) {
unlockable_mutexes[i] = (uintptr_t)p_mutex;
if (p_is_binary) {
unlockable_mutexes[i] |= 1;
}
return i;
}
}
ERR_FAIL_V_MSG(UINT32_MAX, "No more unlockable mutex slots available. Engine bug.");
}

void WorkerThreadPool::thread_exit_unlock_allowance_zone(uint32_t p_zone_id) {
if (p_zone_id == UINT32_MAX) {
return;
}
DEV_ASSERT(unlockable_mutexes[p_zone_id]);
unlockable_mutexes[p_zone_id] = 0;
}
#endif

void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio) {
ERR_FAIL_COND(threads.size() > 0);
if (p_thread_count < 0) {
Expand Down
24 changes: 19 additions & 5 deletions core/object/worker_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
#include "core/templates/rid.h"
#include "core/templates/safe_refcount.h"

class CommandQueueMT;

class WorkerThreadPool : public Object {
GDCLASS(WorkerThreadPool, Object)
public:
Expand Down Expand Up @@ -163,7 +161,10 @@ class WorkerThreadPool : public Object {

static WorkerThreadPool *singleton;

static thread_local CommandQueueMT *flushing_cmd_queue;
#ifdef THREADS_ENABLED
static const uint32_t MAX_UNLOCKABLE_MUTEXES = 2;
static thread_local uintptr_t unlockable_mutexes[MAX_UNLOCKABLE_MUTEXES];
#endif

TaskID _add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description);
GroupID _add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description);
Expand All @@ -190,6 +191,13 @@ class WorkerThreadPool : public Object {

void _wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task);

#ifdef THREADS_ENABLED
static uint32_t _thread_enter_unlock_allowance_zone(void *p_mutex, bool p_is_binary);
#endif

void _lock_unlockable_mutexes();
void _unlock_unlockable_mutexes();

protected:
static void _bind_methods();

Expand Down Expand Up @@ -232,8 +240,14 @@ class WorkerThreadPool : public Object {
static WorkerThreadPool *get_singleton() { return singleton; }
static int get_thread_index();

static void thread_enter_command_queue_mt_flush(CommandQueueMT *p_queue);
static void thread_exit_command_queue_mt_flush();
#ifdef THREADS_ENABLED
static uint32_t thread_enter_unlock_allowance_zone(Mutex *p_mutex);
static uint32_t thread_enter_unlock_allowance_zone(BinaryMutex *p_mutex);
static void thread_exit_unlock_allowance_zone(uint32_t p_zone_id);
#else
static uint32_t thread_enter_unlock_allowance_zone(void *p_mutex) { return UINT32_MAX; }
static void thread_exit_unlock_allowance_zone(uint32_t p_zone_id) {}
#endif

void init(int p_thread_count = -1, float p_low_priority_task_ratio = 0.3);
void finish();
Expand Down
4 changes: 2 additions & 2 deletions core/templates/command_queue_mt.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ class CommandQueueMT {

lock();

WorkerThreadPool::thread_enter_command_queue_mt_flush(this);
uint32_t allowance_id = WorkerThreadPool::thread_enter_unlock_allowance_zone(&mutex);
while (flush_read_ptr < command_mem.size()) {
uint64_t size = *(uint64_t *)&command_mem[flush_read_ptr];
flush_read_ptr += 8;
Expand All @@ -383,7 +383,7 @@ class CommandQueueMT {

flush_read_ptr += size;
}
WorkerThreadPool::thread_exit_command_queue_mt_flush();
WorkerThreadPool::thread_exit_unlock_allowance_zone(allowance_id);

command_mem.clear();
flush_read_ptr = 0;
Expand Down
4 changes: 4 additions & 0 deletions modules/gdscript/gdscript_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,11 @@ Ref<GDScript> GDScriptCache::get_full_script(const String &p_path, Error &r_erro
}
}

// Allowing lifting the lock might cause a script to be reloaded multiple times,
// which, as a last resort deadlock prevention strategy, is a good tradeoff.
uint32_t allowance_id = WorkerThreadPool::thread_enter_unlock_allowance_zone(&singleton->mutex);
r_error = script->reload(true);
WorkerThreadPool::thread_exit_unlock_allowance_zone(allowance_id);
if (r_error) {
return script;
}
Expand Down
Loading