Skip to content

Rewrite the old proxying API in terms of the new API #15880

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 36 commits into from
Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
6584c16
[WIP] New low-level proxying C API
tlively Dec 10, 2021
99f14f5
Remove normalize_thread
tlively Dec 11, 2021
59a7bb3
Make header internal
tlively Dec 11, 2021
faa6423
Assert that q is not null
tlively Dec 11, 2021
f241353
Simplify em_proyxing_queue_create
tlively Dec 11, 2021
c3732d5
goto failed
tlively Dec 11, 2021
be9db82
Nicer struct initialization syntax
tlively Dec 11, 2021
5b6294e
Split out task_queue_grow
tlively Dec 11, 2021
e5ed1ba
More "Not thread safe"
tlively Dec 11, 2021
8e17f91
Split out task_queue_{de}init
tlively Dec 11, 2021
be3de8f
Add test (excluding postMessage notification)
tlively Dec 14, 2021
f274a3c
Test the recursion guard logic
tlively Dec 14, 2021
d147a5a
Fix dangling tasks pointer
tlively Dec 20, 2021
64c1d42
new postMessage notification
tlively Dec 20, 2021
4d5aa0e
Test returner thread
tlively Dec 20, 2021
b421b3c
test tasks_queue growth during processing
tlively Dec 21, 2021
2a21b12
Fix test output
tlively Dec 21, 2021
82ea908
Address comments
tlively Dec 21, 2021
45f7b97
Put the emscripten_force_exit back
tlively Dec 21, 2021
3b9cbd8
Expand comment and move EMSCRIPTEN_KEEPALIVE
tlively Jan 4, 2022
e9af913
rename to task_queue_is_empty
tlively Jan 4, 2022
fa0d198
Expand recursion guard comment
tlively Jan 4, 2022
54ae773
Merge remote-tracking branch 'origin/main' into new-proxying-api
tlively Jan 4, 2022
e4b05eb
Wait for `returner` to start up
tlively Jan 4, 2022
28be33e
Rewrite the old proxying API in terms of the new API
tlively Dec 21, 2021
0fa13af
Merge remote-tracking branch 'origin/main' into rewrite-old-proxying-api
tlively Mar 9, 2022
af9d2ce
Remove pthread_create from deps_info.py to fix test
tlively Mar 9, 2022
d03c3da
Rebaseline size test
tlively Mar 9, 2022
9a89e69
Recursion guard on emscripten_main_thread_process_queued_calls
tlively Mar 9, 2022
f67a82e
[ci skip] improve comment
tlively Mar 9, 2022
18eebf5
Remove duplicate function
tlively Mar 9, 2022
67aa5ef
Feedback
tlively Mar 9, 2022
1db6681
Merge and rebaseline
tlively Mar 9, 2022
72d7457
Move system recursion guard to emscripten_proxy_execute_queue
tlively Mar 10, 2022
47fbd08
Fix build after previous commit
tlively Mar 10, 2022
a5a151b
Remove duplicated worker JS
tlively Mar 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 2 additions & 25 deletions src/library_pthread.js
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,9 @@ var LibraryPThread = {
return;
}

if (cmd === 'processQueuedMainThreadWork') {
if (cmd === 'processProxyingQueue') {
// TODO: Must post message to main Emscripten thread in PROXY_TO_WORKER mode.
_emscripten_main_thread_process_queued_calls();
_emscripten_proxy_execute_queue(d['queue']);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we save a little of JS codesize here by passing zero args and having _emscripten_proxy_execute_queue assume that NULL == system queue? Would save the extra post message packing / unpacking too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, potentially, but I would prefer to make it a clear-cut error for NULL to ever be passed to emscripten_proxy_execute_queue. Better for users to get an assertion failure (in debug builds) when they do that by accident than for the system queue to be executed.

} else if (cmd === 'spawnThread') {
spawnThread(d);
} else if (cmd === 'cleanupThread') {
Expand Down Expand Up @@ -1040,29 +1040,6 @@ var LibraryPThread = {
return {{{ makeDynCall('ii', 'ptr') }}}(arg);
},

// This function is called internally to notify target thread ID that it has messages it needs to
// process in its message queue inside the Wasm heap. As a helper, the caller must also pass the
// ID of the main browser thread to this function, to avoid needlessly ping-ponging between JS and
// Wasm boundaries.
_emscripten_notify_thread_queue: function(targetThreadId, mainThreadId) {
if (targetThreadId == mainThreadId) {
postMessage({'cmd' : 'processQueuedMainThreadWork'});
} else if (ENVIRONMENT_IS_PTHREAD) {
postMessage({'targetThread': targetThreadId, 'cmd': 'processThreadQueue'});
} else {
var pthread = PThread.pthreads[targetThreadId];
var worker = pthread && pthread.worker;
if (!worker) {
#if ASSERTIONS
err('Cannot send message to thread with ID ' + targetThreadId + ', unknown thread ID!');
#endif
return /*0*/;
}
worker.postMessage({'cmd' : 'processThreadQueue'});
}
return 1;
},

_emscripten_notify_proxying_queue: function(targetThreadId, currThreadId, mainThreadId, queue) {
if (targetThreadId == currThreadId) {
setTimeout(function() { _emscripten_proxy_execute_queue(queue); });
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why turn this into a setTimeout?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this new _emscdripten_notify_proxying_queue is a duplicate of the one that already exists just below. I'm not sure how I ended up with the duplicate function, but I'll remove it. The diff here should simply be a removal of the old _emscripten_notify_thread_queue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason this is setTimeout is so that we process the queue after returning to the event loop, similar to how queues on other threads will only be processed once they return to their event loops.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting.. I didn't see that the condition here changed from (targetThreadId == mainThreadId) to (targetThreadId == currThreadId).

How was the (targetThreadId == currThreadId) handled in the old code.. it looks like it was not handled specially and

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old code eagerly executed the proxied work if the target thread was the same as the current thread, so _emscripten_notify_thread_queue was never called in that case. The new API is more flexible and does not force eager evaluation in that case, so the new JS code has to handle it.

Expand Down
4 changes: 0 additions & 4 deletions src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,6 @@ self.onmessage = (e) => {
}
} else if (e.data.target === 'setimmediate') {
// no-op
} else if (e.data.cmd === 'processThreadQueue') {
if (Module['_pthread_self']()) { // If this thread is actually running?
Module['_emscripten_current_thread_process_queued_calls']();
}
} else if (e.data.cmd === 'processProxyingQueue') {
if (Module['_pthread_self']()) { // If this thread is actually running?
Module['_emscripten_proxy_execute_queue'](e.data.queue);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to pass the queue pointer around or can we just assume its the system queue here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to pass the queue pointer around because this is the same code path used for both the system queue and user queues. Actually, I see that this ended up duplicated somehow as well. Will fix.

Expand Down
213 changes: 30 additions & 183 deletions system/lib/pthread/library_pthread.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <emscripten/stack.h>

#include "threading_internal.h"
#include "proxying.h"

int emscripten_pthread_attr_gettransferredcanvases(const pthread_attr_t* a, const char** str) {
*str = a->_a_transferredcanvases;
Expand Down Expand Up @@ -177,7 +178,6 @@ void emscripten_async_waitable_close(em_queued_call* call) {

extern EMSCRIPTEN_RESULT _emscripten_set_offscreencanvas_size(const char *target, int width, int height);
extern double emscripten_receive_on_main_thread_js(int functionIndex, int numCallArgs, double* args);
extern int _emscripten_notify_thread_queue(pthread_t targetThreadId, pthread_t mainThreadId);

static void _do_call(void* arg) {
em_queued_call* q = (em_queued_call*)arg;
Expand Down Expand Up @@ -321,60 +321,6 @@ static void _do_call(void* arg) {
}
}

#define CALL_QUEUE_SIZE 128

// Shared data synchronized by call_queue_lock.
typedef struct CallQueueEntry {
void (*func)(void*);
void* arg;
} CallQueueEntry;

typedef struct CallQueue {
void* target_thread;
CallQueueEntry* call_queue;
int call_queue_head;
int call_queue_tail;
struct CallQueue* next;
} CallQueue;

// Currently global to the queue, but this can be improved to be per-queue specific. (TODO: with
// lockfree list operations on callQueue_head, or removing the list by moving this data to
// pthread_t)
static pthread_mutex_t call_queue_lock = PTHREAD_MUTEX_INITIALIZER;
static CallQueue* callQueue_head = 0;

// Not thread safe, call while having call_queue_lock obtained.
static CallQueue* GetQueue(void* target) {
assert(target);
CallQueue* q = callQueue_head;
while (q && q->target_thread != target)
q = q->next;
return q;
}

// Not thread safe, call while having call_queue_lock obtained.
static CallQueue* GetOrAllocateQueue(void* target) {
CallQueue* q = GetQueue(target);
if (q)
return q;

q = (CallQueue*)malloc(sizeof(CallQueue));
q->target_thread = target;
q->call_queue = 0;
q->call_queue_head = 0;
q->call_queue_tail = 0;
q->next = 0;
if (callQueue_head) {
CallQueue* last = callQueue_head;
while (last->next)
last = last->next;
last->next = q;
} else {
callQueue_head = q;
}
return q;
}

EMSCRIPTEN_RESULT emscripten_wait_for_call_v(em_queued_call* call, double timeoutMSecs) {
int r;

Expand Down Expand Up @@ -410,85 +356,43 @@ pthread_t emscripten_main_browser_thread_id() {
return &__main_pthread;
}

int _emscripten_do_dispatch_to_thread(pthread_t target_thread, em_queued_call* call) {
assert(call);

// #if PTHREADS_DEBUG // TODO: Create a debug version of pthreads library
// EM_ASM_INT({dump('thread ' + _pthread_self() + ' (ENVIRONMENT_IS_WORKER: ' +
//ENVIRONMENT_IS_WORKER + '), queueing call of function enum=' + $0 + '/ptr=' + $1 + ' on thread '
//+ $2 + '\n' + new Error().stack)}, call->functionEnum, call->functionPtr, target_thread);
// #endif

// Can't be a null pointer here, and can't be
// EM_CALLBACK_THREAD_CONTEXT_MAIN_BROWSER_THREAD either.
static pthread_t normalize_thread(pthread_t target_thread) {
assert(target_thread);
if (target_thread == EM_CALLBACK_THREAD_CONTEXT_MAIN_BROWSER_THREAD)
target_thread = emscripten_main_browser_thread_id();

// If we are the target recipient of this message, we can just call the operation directly.
if (target_thread == EM_CALLBACK_THREAD_CONTEXT_CALLING_THREAD ||
target_thread == pthread_self()) {
_do_call(call);
return 1;
if (target_thread == EM_CALLBACK_THREAD_CONTEXT_MAIN_BROWSER_THREAD) {
return emscripten_main_browser_thread_id();
}

// Add the operation to the call queue of the main runtime thread.
pthread_mutex_lock(&call_queue_lock);
CallQueue* q = GetOrAllocateQueue(target_thread);
if (!q->call_queue) {
// Shared data synchronized by call_queue_lock.
q->call_queue = malloc(sizeof(CallQueueEntry) * CALL_QUEUE_SIZE);
if (target_thread == EM_CALLBACK_THREAD_CONTEXT_CALLING_THREAD) {
return pthread_self();
}
return target_thread;
}

int head = q->call_queue_head;
int tail = q->call_queue_tail;
int new_tail = (tail + 1) % CALL_QUEUE_SIZE;

while (new_tail == head) { // Queue is full?
pthread_mutex_unlock(&call_queue_lock);

// If queue of the main browser thread is full, then we wait. (never drop messages for the main
// browser thread)
if (target_thread == emscripten_main_browser_thread_id()) {
emscripten_futex_wait((void*)&q->call_queue_head, head, INFINITY);
pthread_mutex_lock(&call_queue_lock);
head = q->call_queue_head;
tail = q->call_queue_tail;
new_tail = (tail + 1) % CALL_QUEUE_SIZE;
} else {
// For the queues of other threads, just drop the message.
// #if DEBUG TODO: a debug build of pthreads library?
// EM_ASM(console.error('Pthread queue overflowed, dropping queued
//message to thread. ' + new Error().stack));
// #endif
em_queued_call_free(call);
return 0;
}
// Execute `call` and return 1 only if already on the `target_thread`. Otherwise
// return 0.
static int maybe_call_on_current_thread(pthread_t target_thread,
em_queued_call* call) {
if (pthread_equal(target_thread, pthread_self())) {
_do_call(call);
return 1;
}
return 0;
}

q->call_queue[tail].func = _do_call;
q->call_queue[tail].arg = call;

// If the call queue was empty, the main runtime thread is likely idle in the browser event loop,
// so send a message to it to ensure that it wakes up to start processing the command we have
// posted.
if (head == tail) {
int success = _emscripten_notify_thread_queue(target_thread, emscripten_main_browser_thread_id());
// Failed to dispatch the thread, delete the crafted message.
if (!success) {
em_queued_call_free(call);
pthread_mutex_unlock(&call_queue_lock);
return 0;
}
// Execute or proxy `call`. Return 1 if the work was executed or otherwise
// return 0.
static int do_dispatch_to_thread(pthread_t target_thread,
em_queued_call* call) {
target_thread = normalize_thread(target_thread);
if (maybe_call_on_current_thread(target_thread, call)) {
return 1;
}

q->call_queue_tail = new_tail;
pthread_mutex_unlock(&call_queue_lock);
emscripten_proxy_async(
emscripten_proxy_get_system_queue(), target_thread, _do_call, call);
return 0;
}

void emscripten_async_run_in_main_thread(em_queued_call* call) {
_emscripten_do_dispatch_to_thread(emscripten_main_browser_thread_id(), call);
do_dispatch_to_thread(emscripten_main_browser_thread_id(), call);
}

void emscripten_sync_run_in_main_thread(em_queued_call* call) {
Expand Down Expand Up @@ -589,50 +493,7 @@ void* emscripten_sync_run_in_main_thread_7(int function, void* arg1,
}

void emscripten_current_thread_process_queued_calls() {
// #if PTHREADS_DEBUG == 2
// EM_ASM(console.error('thread ' + _pthread_self() + ':
//emscripten_current_thread_process_queued_calls(), ' + new Error().stack));
// #endif

static thread_local bool thread_is_processing_queued_calls = false;

// It is possible that when processing a queued call, the control flow leads back to calling this
// function in a nested fashion! Therefore this scenario must explicitly be detected, and
// processing the queue must be avoided if we are nesting, or otherwise the same queued calls
// would be processed again and again.
if (thread_is_processing_queued_calls)
return;
// This must be before pthread_mutex_lock(), since pthread_mutex_lock() can call back to this
// function.
thread_is_processing_queued_calls = true;

pthread_mutex_lock(&call_queue_lock);
CallQueue* q = GetQueue(pthread_self());
if (!q) {
pthread_mutex_unlock(&call_queue_lock);
thread_is_processing_queued_calls = false;
return;
}

int head = q->call_queue_head;
int tail = q->call_queue_tail;
while (head != tail) {
// Assume that the call is heavy, so unlock access to the call queue while it is being
// performed.
pthread_mutex_unlock(&call_queue_lock);
q->call_queue[head].func(q->call_queue[head].arg);
pthread_mutex_lock(&call_queue_lock);

head = (head + 1) % CALL_QUEUE_SIZE;
q->call_queue_head = head;
tail = q->call_queue_tail;
}
pthread_mutex_unlock(&call_queue_lock);

// If the queue was full and we had waiters pending to get to put data to queue, wake them up.
emscripten_futex_wake((void*)&q->call_queue_head, INT_MAX);

thread_is_processing_queued_calls = false;
emscripten_proxy_execute_queue(emscripten_proxy_get_system_queue());
}

// At times when we disallow the main thread to process queued calls, this will
Expand Down Expand Up @@ -733,17 +594,6 @@ em_queued_call* emscripten_async_waitable_run_in_main_runtime_thread_(
return q;
}

typedef struct DispatchToThreadArgs {
pthread_t target_thread;
em_queued_call* q;
} DispatchToThreadArgs;

static void dispatch_to_thread_helper(void* user_data) {
DispatchToThreadArgs* args = (DispatchToThreadArgs*)user_data;
_emscripten_do_dispatch_to_thread(args->target_thread, args->q);
free(user_data);
}

int emscripten_dispatch_to_thread_args(pthread_t target_thread,
EM_FUNC_SIGNATURE sig,
void* func_ptr,
Expand All @@ -761,7 +611,7 @@ int emscripten_dispatch_to_thread_args(pthread_t target_thread,

// `q` will not be used after it is called, so let the call clean it up.
q->calleeDelete = 1;
return _emscripten_do_dispatch_to_thread(target_thread, q);
return do_dispatch_to_thread(target_thread, q);
}

int emscripten_dispatch_to_thread_(pthread_t target_thread,
Expand Down Expand Up @@ -792,10 +642,7 @@ int emscripten_dispatch_to_thread_async_args(pthread_t target_thread,
q->calleeDelete = 1;

// Schedule the call to run later on this thread.
DispatchToThreadArgs* args = malloc(sizeof(DispatchToThreadArgs));
args->target_thread = target_thread;
args->q = q;
emscripten_set_timeout(dispatch_to_thread_helper, 0, args);
emscripten_set_timeout(_do_call, 0, q);
return 0;
}

Expand Down
22 changes: 20 additions & 2 deletions system/lib/pthread/proxying.c
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,26 @@ static task_queue* get_or_add_tasks_for_thread(em_proxying_queue* q,
EMSCRIPTEN_KEEPALIVE
void emscripten_proxy_execute_queue(em_proxying_queue* q) {
assert(q != NULL);

// Recursion guard to avoid infinite recursion when we arrive here from the
// pthread_lock call below that executes the system queue. The per-task_queue
// recursion lock can't catch these recursions because it can only be checked
// after the lock has been acquired.
static _Thread_local int executing_system_queue = 0;
int is_system_queue = q == &system_proxying_queue;
if (is_system_queue) {
if (executing_system_queue) {
return;
}
executing_system_queue = 1;
}

pthread_mutex_lock(&q->mutex);
int tasks_index = get_tasks_index_for_thread(q, pthread_self());
task_queue* tasks = tasks_index == -1 ? NULL : &q->task_queues[tasks_index];
if (tasks == NULL || tasks->processing) {
// No tasks for this thread or they are already being processed.
pthread_mutex_unlock(&q->mutex);
return;
goto end;
}
// Found the task queue; process the tasks.
tasks->processing = 1;
Expand All @@ -227,7 +240,12 @@ void emscripten_proxy_execute_queue(em_proxying_queue* q) {
tasks = &q->task_queues[tasks_index];
}
tasks->processing = 0;

end:
pthread_mutex_unlock(&q->mutex);
if (is_system_queue) {
executing_system_queue = 0;
}
}

int emscripten_proxy_async(em_proxying_queue* q,
Expand Down
Loading