Skip to content

Commit

Permalink
Backported the improvements from the pandemonium engine. Also fixes #1
Browse files Browse the repository at this point in the history
…. Thanks.
  • Loading branch information
Relintai committed Sep 17, 2022
1 parent 0917511 commit b76e803
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 104 deletions.
222 changes: 131 additions & 91 deletions thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,59 +59,95 @@ bool ThreadPool::get_use_threads() const {
return _use_threads;
}
void ThreadPool::set_use_threads(const bool value) {
_use_threads = value;
// Will be applied later in update, so current jobs can be finished first
_use_threads_new = value;
_dirty = true;
}

int ThreadPool::get_thread_count() const {
return _thread_count;
}
void ThreadPool::set_thread_count(const bool value) {
void ThreadPool::set_thread_count(const int value) {
_thread_count = value;
_dirty = true;
}

int ThreadPool::get_thread_fallback_count() const {
return _thread_fallback_count;
}
void ThreadPool::set_thread_fallback_count(const bool value) {
void ThreadPool::set_thread_fallback_count(const int value) {
_thread_fallback_count = value;
_dirty = true;
}

float ThreadPool::get_max_work_per_frame_percent() const {
return _max_work_per_frame_percent;
}
void ThreadPool::set_max_work_per_frame_percent(const bool value) {
void ThreadPool::set_max_work_per_frame_percent(const float value) {
_max_work_per_frame_percent = value;
_dirty = true;
}

float ThreadPool::get_max_time_per_frame() const {
return _max_time_per_frame;
}
void ThreadPool::set_max_time_per_frame(const bool value) {
void ThreadPool::set_max_time_per_frame(const float value) {
_max_time_per_frame = value;
_dirty = true;
}

bool ThreadPool::has_job(const Ref<ThreadPoolJob> &job) {
bool ThreadPool::is_working() const {
_THREAD_SAFE_LOCK_

if (_queue.size() > 0) {
_THREAD_SAFE_UNLOCK_

return true;
}

for (int i = 0; i < _threads.size(); ++i) {
ThreadPoolContext *context = _threads.get(i);
if (_threads[i]->job.is_valid()) {
_THREAD_SAFE_UNLOCK_
return true;
}
}

if (context->job == job) {
_THREAD_SAFE_UNLOCK_

return false;
}

bool ThreadPool::is_working_no_lock() const {
if (_queue.size() > 0) {
return true;
}

for (int i = 0; i < _threads.size(); ++i) {
if (_threads[i]->job.is_valid()) {
return true;
}
}

return false;
}

bool ThreadPool::has_job(const Ref<ThreadPoolJob> &job) {
_THREAD_SAFE_LOCK_

for (int i = _current_queue_head; i < _current_queue_tail; ++i) {
if (_queue[i] == job) {
_THREAD_SAFE_UNLOCK_
for (int i = 0; i < _threads.size(); ++i) {
ThreadPoolContext *context = _threads.get(i);

if (context->job == job) {
_THREAD_SAFE_UNLOCK_
return true;
}
}

List<Ref<ThreadPoolJob>>::Element *E = _queue.find(job);

_THREAD_SAFE_UNLOCK_

return false;
return E;
}

void ThreadPool::add_job(const Ref<ThreadPoolJob> &job) {
Expand All @@ -130,22 +166,7 @@ void ThreadPool::add_job(const Ref<ThreadPoolJob> &job) {
}
}

if (_current_queue_tail == _queue.size()) {
if (_current_queue_head == 0) {
_queue.resize(_queue.size() + _queue_grow_size);
} else {
int j = 0;

for (int i = _current_queue_head; i < _current_queue_tail; ++i) {
_queue.write[j++] = _queue[i];
}

_current_queue_head = 0;
_current_queue_tail = j;
}
}

_queue.write[_current_queue_tail++] = job;
_queue.push_back(job);

_THREAD_SAFE_UNLOCK_
}
Expand All @@ -157,23 +178,7 @@ void ThreadPool::cancel_job(Ref<ThreadPoolJob> job) {

_THREAD_SAFE_LOCK_

//it it's in the queue remove it
for (int i = _current_queue_head; i < _current_queue_tail; ++i) {
Ref<ThreadPoolJob> cjob = _queue[i];

if (cjob == job) {
_queue.write[i].unref();

for (int j = i; j + 1 < _current_queue_tail; ++j) {
_queue.write[j] = _queue[j + 1];
}

--_current_queue_tail;

_THREAD_SAFE_UNLOCK_
return;
}
}
_queue.erase(job);

_THREAD_SAFE_UNLOCK_
}
Expand All @@ -185,14 +190,9 @@ void ThreadPool::cancel_job_wait(Ref<ThreadPoolJob> job) {

_THREAD_SAFE_LOCK_

for (int i = _current_queue_head; i < _current_queue_tail; ++i) {
Ref<ThreadPoolJob> j = _queue[i];

if (j == job) {
_queue.write[i].unref();
_THREAD_SAFE_UNLOCK_
return;
}
if (_queue.erase(job)) {
_THREAD_SAFE_UNLOCK_
return;
}

_THREAD_SAFE_UNLOCK_
Expand All @@ -216,19 +216,13 @@ void ThreadPool::_thread_finished(ThreadPoolContext *context) {

context->job.unref();

while (_current_queue_head != _current_queue_tail) {
context->job = _queue.get(_current_queue_head);

if (!context->job.is_valid()) {
++_current_queue_head;
continue;
}
while (_queue.size() > 0 && !context->job.is_valid()) {
context->job = _queue.front()->get();
_queue.pop_front();
}

if (context->job.is_valid()) {
context->semaphore->post();
_queue.write[_current_queue_head].unref();

++_current_queue_head;
break;
}

_THREAD_SAFE_UNLOCK_
Expand Down Expand Up @@ -261,16 +255,25 @@ void ThreadPool::register_update() {
}

void ThreadPool::update() {
if (_current_queue_head == _current_queue_tail)
if (_dirty) {
apply_settings();
}

if (_use_threads) {
return;
}

if (_queue.size() == 0) {
return;
}

float remaining_time = _max_time_per_frame;

while (remaining_time > 0 && _current_queue_head != _current_queue_tail) {
Ref<ThreadPoolJob> job = _queue.get(_current_queue_head);
while (remaining_time > 0 && _queue.size() > 0) {
Ref<ThreadPoolJob> job = _queue.front()->get();

if (!job.is_valid()) {
++_current_queue_head;
_queue.pop_front();
continue;
}

Expand All @@ -280,17 +283,66 @@ void ThreadPool::update() {
remaining_time -= job->get_current_execution_time();

if (job->get_complete() || job->get_cancelled()) {
_queue.write[_current_queue_head++].unref();
_queue.pop_front();
}
}
}

void ThreadPool::apply_settings() {
if (!_dirty) {
return;
}

_THREAD_SAFE_LOCK_

if (is_working_no_lock()) {
_THREAD_SAFE_UNLOCK_
return;
}

_dirty = false;

for (int i = 0; i < _threads.size(); ++i) {
ThreadPoolContext *context = _threads[i];

CRASH_COND(context->job.is_valid());

context->running = false;
context->semaphore->post();
context->thread->wait_to_finish();
memdelete(context->thread);
memdelete(context->semaphore);
memdelete(context);
}

_threads.resize(0);

_use_threads = _use_threads_new;

if (_use_threads) {
_threads.resize(_thread_count);

for (int i = 0; i < _threads.size(); ++i) {
ThreadPoolContext *context = memnew(ThreadPoolContext);

context->running = true;
context->semaphore = memnew(Semaphore);

context->thread = memnew(Thread());
context->thread->start(ThreadPool::_worker_thread_func, context);

_threads.write[i] = context;
}
} else {
call_deferred("register_update");
}

_THREAD_SAFE_UNLOCK_
}

ThreadPool::ThreadPool() {
_instance = this;

_current_queue_head = 0;
_current_queue_tail = 0;

_use_threads = GLOBAL_DEF("thread_pool/use_threads", true);
_thread_count = GLOBAL_DEF("thread_pool/thread_count", -1);
_thread_fallback_count = GLOBAL_DEF("thread_pool/thread_fallback_count", 4);
Expand All @@ -304,10 +356,6 @@ ThreadPool::ThreadPool() {
//Todo Add help text, as this will only come into play if threading is disabled, or not available
_max_work_per_frame_percent = GLOBAL_DEF("thread_pool/max_work_per_frame_percent", 25);

_queue_start_size = GLOBAL_DEF("thread_pool/queue_start_size", 20);
_queue_grow_size = GLOBAL_DEF("thread_pool/queue_grow_size", 10);
_queue.resize(_queue_start_size);

//Todo this should be recalculated constantly to smooth out performance better
_max_time_per_frame = (1 / 60.0) * (_max_work_per_frame_percent / 100.0);

Expand All @@ -324,23 +372,13 @@ ThreadPool::ThreadPool() {
if (_thread_count <= 0) {
_thread_count = _thread_fallback_count;
}
}

_threads.resize(_thread_count);

for (int i = 0; i < _threads.size(); ++i) {
ThreadPoolContext *context = memnew(ThreadPoolContext);
_use_threads_new = _use_threads;

context->running = true;
context->semaphore = memnew(Semaphore);
_dirty = true;

context->thread = memnew(Thread());
context->thread->start(ThreadPool::_worker_thread_func, context);

_threads.write[i] = context;
}
} else {
call_deferred("register_update");
}
apply_settings();
}

ThreadPool::~ThreadPool() {
Expand All @@ -365,7 +403,6 @@ ThreadPool::~ThreadPool() {
_threads.clear();

_queue.clear();
//_job_pool.clear();
}

void ThreadPool::_bind_methods() {
Expand All @@ -389,6 +426,9 @@ void ThreadPool::_bind_methods() {
ClassDB::bind_method(D_METHOD("set_max_time_per_frame", "value"), &ThreadPool::set_max_time_per_frame);
ADD_PROPERTY(PropertyInfo(Variant::REAL, "max_time_per_frame"), "set_max_time_per_frame", "get_max_time_per_frame");

ClassDB::bind_method(D_METHOD("is_working"), &ThreadPool::is_working);
ClassDB::bind_method(D_METHOD("is_working_no_lock"), &ThreadPool::is_working_no_lock);

ClassDB::bind_method(D_METHOD("has_job", "job"), &ThreadPool::has_job);
ClassDB::bind_method(D_METHOD("add_job", "job"), &ThreadPool::add_job);

Expand Down
Loading

0 comments on commit b76e803

Please sign in to comment.