Skip to content

New low-level proxying C API #15737

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Feb 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6584c16
[WIP] New low-level proxying C API
tlively Dec 10, 2021
99f14f5
Remove normalize_thread
tlively Dec 11, 2021
59a7bb3
Make header internal
tlively Dec 11, 2021
faa6423
Assert that q is not null
tlively Dec 11, 2021
f241353
Simplify em_proyxing_queue_create
tlively Dec 11, 2021
c3732d5
goto failed
tlively Dec 11, 2021
be9db82
Nicer struct initialization syntax
tlively Dec 11, 2021
5b6294e
Split out task_queue_grow
tlively Dec 11, 2021
e5ed1ba
More "Not thread safe"
tlively Dec 11, 2021
8e17f91
Split out task_queue_{de}init
tlively Dec 11, 2021
be3de8f
Add test (excluding postMessage notification)
tlively Dec 14, 2021
f274a3c
Test the recursion guard logic
tlively Dec 14, 2021
d147a5a
Fix dangling tasks pointer
tlively Dec 20, 2021
64c1d42
new postMessage notification
tlively Dec 20, 2021
4d5aa0e
Test returner thread
tlively Dec 20, 2021
b421b3c
test tasks_queue growth during processing
tlively Dec 21, 2021
2a21b12
Fix test output
tlively Dec 21, 2021
82ea908
Address comments
tlively Dec 21, 2021
45f7b97
Put the emscripten_force_exit back
tlively Dec 21, 2021
3b9cbd8
Expand comment and move EMSCRIPTEN_KEEPALIVE
tlively Jan 4, 2022
e9af913
rename to task_queue_is_empty
tlively Jan 4, 2022
fa0d198
Expand recursion guard comment
tlively Jan 4, 2022
54ae773
Merge remote-tracking branch 'origin/main' into new-proxying-api
tlively Jan 4, 2022
e4b05eb
Wait for `returner` to start up
tlively Jan 4, 2022
2e10028
Merge remote-tracking branch 'origin/main' into new-proxying-api
tlively Feb 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/library_pthread.js
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,25 @@ var LibraryPThread = {
worker.postMessage({'cmd' : 'processThreadQueue'});
}
return 1;
},

_emscripten_notify_proxying_queue: function(targetThreadId, currThreadId, mainThreadId, queue) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we locate this to a .js file of its own, e.g. library_proxy_queue.js?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds nice. I'll look into it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turned out to be pretty simple to move this into its own JS file, but given that the implementation uses internal details of the pthread library (e.g. PThreads.pthreads[targetThreadId] and pthread.worker, and given that the existing _emscripten_notify_thread_queue will be removed in the next PR, I'm not sure there's much benefit to this after all.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its small enough and closely coupled enough to belong here... and as you say we hope it will replace scripten_notify_thread_queue which is already here.

if (targetThreadId == currThreadId) {
setTimeout(function() { _emscripten_proxy_execute_queue(queue); });
} else if (ENVIRONMENT_IS_PTHREAD) {
postMessage({'targetThread' : targetThreadId, 'cmd' : 'processProxyingQueue', 'queue' : queue});
} else {
var pthread = PThread.pthreads[targetThreadId];
var worker = pthread && pthread.worker;
if (!worker) {
#if ASSERTIONS
err('Cannot send message to thread with ID ' + targetThreadId + ', unknown thread ID!');
#endif
return /*0*/;
}
worker.postMessage({'cmd' : 'processProxyingQueue', 'queue': queue});
}
return 1;
}
};

Expand Down
4 changes: 4 additions & 0 deletions src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ self.onmessage = (e) => {
if (Module['_pthread_self']()) { // If this thread is actually running?
Module['_emscripten_current_thread_process_queued_calls']();
}
} else if (e.data.cmd === 'processProxyingQueue') {
if (Module['_pthread_self']()) { // If this thread is actually running?
Module['_emscripten_proxy_execute_queue'](e.data.queue);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part won't DCE away if user does not utilize this proxying queue.

This might be worth adding a #if LIBRARY_PROXY_QUEUE preprocessor that gets enabled when the user is targeting the new proxy queue mechanism.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, there wouldn't be any benefit to having that macro once we start using this for the system queue as well, right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can wrap this in hasExportedFunction('__emscripten_proxy_execute_queue') which will have the effect of only including it when needed. (note the double underscores)

} else {
err('worker.js received unknown command ' + e.data.cmd);
err(e.data);
Expand Down
334 changes: 334 additions & 0 deletions system/lib/pthread/proxying.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,334 @@
/*
* Copyright 2021 The Emscripten Authors. All rights reserved.
* Emscripten is available under two separate licenses, the MIT license and the
* University of Illinois/NCSA Open Source License. Both these licenses can be
* found in the LICENSE file.
*/

#include <assert.h>
#include <emscripten/threading.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>

#include "proxying.h"

#define TASK_QUEUE_INITIAL_CAPACITY 128

extern int _emscripten_notify_proxying_queue(pthread_t target_thread,
pthread_t curr_thread,
pthread_t main_thread,
em_proxying_queue* queue);

typedef struct task {
void (*func)(void*);
void* arg;
} task;

// A task queue for a particular thread. Organized into a linked list of
// task_queues for different threads.
typedef struct task_queue {
// The target thread for this task_queue.
pthread_t thread;
// Recursion guard. TODO: We disallow recursive processing because that's what
// the old proxying API does, so it is safer to start with the same behavior.
// Experiment with relaxing this restriction once the old API uses these
// queues as well.
int processing;
// Ring buffer of tasks of size `capacity`. New tasks are enqueued at
// `tail` and dequeued at `head`.
task* tasks;
int capacity;
int head;
int tail;
} task_queue;

static int task_queue_init(task_queue* tasks, pthread_t thread) {
task* task_buffer = malloc(sizeof(task) * TASK_QUEUE_INITIAL_CAPACITY);
if (task_buffer == NULL) {
return 0;
}
*tasks = (task_queue){.thread = thread,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need the (task_queue) cast here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, otherwise I get an error: expected expression. This is a necessary part of the compound literal syntax.

.processing = 0,
.tasks = task_buffer,
.capacity = TASK_QUEUE_INITIAL_CAPACITY,
.head = 0,
.tail = 0};
return 1;
}

static void task_queue_deinit(task_queue* tasks) { free(tasks->tasks); }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task_queue_free()? (feel free to ignore suggestion)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to keep deinit because it is more clearly pairs with init and because it's not tasks that's being freed, but rather one of its members.


// Not thread safe.
static int task_queue_is_empty(task_queue* tasks) {
return tasks->head == tasks->tail;
}

// Not thread safe.
static int task_queue_full(task_queue* tasks) {
return tasks->head == (tasks->tail + 1) % tasks->capacity;
}

// // Not thread safe. Returns 1 on success and 0 on failure.
static int task_queue_grow(task_queue* tasks) {
// Allocate a larger task queue.
int new_capacity = tasks->capacity * 2;
task* new_tasks = malloc(sizeof(task) * new_capacity);
if (new_tasks == NULL) {
return 0;
}
// Copy the tasks such that the head of the queue is at the beginning of the
// buffer. There are two cases to handle: either the queue wraps around the
// end of the old buffer or it does not.
int queued_tasks;
if (tasks->head <= tasks->tail) {
// No wrap. Copy the tasks in one chunk.
queued_tasks = tasks->tail - tasks->head;
memcpy(new_tasks, &tasks->tasks[tasks->head], sizeof(task) * queued_tasks);
} else {
// Wrap. Copy `first_queued` tasks up to the end of the old buffer and
// `last_queued` tasks at the beginning of the old buffer.
int first_queued = tasks->capacity - tasks->head;
int last_queued = tasks->tail;
queued_tasks = first_queued + last_queued;
memcpy(new_tasks, &tasks->tasks[tasks->head], sizeof(task) * first_queued);
memcpy(new_tasks + first_queued, tasks->tasks, sizeof(task) * last_queued);
}
free(tasks->tasks);
tasks->tasks = new_tasks;
tasks->capacity = new_capacity;
tasks->head = 0;
tasks->tail = queued_tasks;
return 1;
}

// Not thread safe. Returns 1 on success and 0 on failure.
static int task_queue_enqueue(task_queue* tasks, task t) {
if (task_queue_full(tasks) && !task_queue_grow(tasks)) {
return 0;
}
tasks->tasks[tasks->tail] = t;
tasks->tail = (tasks->tail + 1) % tasks->capacity;
return 1;
}

// Not thread safe. Assumes the queue is not empty.
static task task_queue_dequeue(task_queue* tasks) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a check here for queue being empty? Or is that a precondition at all the call sites?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a precondition at the call site. I'll add that to the comment.

task t = tasks->tasks[tasks->head];
tasks->head = (tasks->head + 1) % tasks->capacity;
return t;
}

struct em_proxying_queue {
// Protects all accesses to all task_queues.
pthread_mutex_t mutex;
// `size` task queues stored in an array of size `capacity`.
task_queue* task_queues;
int size;
int capacity;
};

static em_proxying_queue system_proxying_queue = {.mutex =
PTHREAD_MUTEX_INITIALIZER,
.task_queues = NULL,
.size = 0,
.capacity = 0};

em_proxying_queue* emscripten_proxy_get_system_queue(void) {
return &system_proxying_queue;
}

em_proxying_queue* em_proxying_queue_create(void) {
em_proxying_queue* q = malloc(sizeof(em_proxying_queue));
if (q == NULL) {
return NULL;
}
*q = (em_proxying_queue){.mutex = PTHREAD_MUTEX_INITIALIZER,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cast needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is another compound literal expression.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting.. I don't remember using the casts when I've used that syntax in the past... seems like the compiler should be able know.

Like for example in this case there is no cast needed:

typedef struct {                                                                                        
  int a;                                                                         
  int b;                                                                         
} Foo;                                                                           
                                                                                 
Foo f = {.a = 1, .b = 2};  

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that should be simple type inference since C doesn't have any sort of struct subtyping. I'm not sure what the rationale for requiring the type to be repeated is.

.task_queues = NULL,
.size = 0,
.capacity = 0};
return q;
}

void em_proxying_queue_destroy(em_proxying_queue* q) {
assert(q != NULL);
assert(q != &system_proxying_queue && "cannot destroy system proxying queue");
// No need to acquire the lock; no one should be racing with the destruction
// of the queue.
pthread_mutex_destroy(&q->mutex);
for (int i = 0; i < q->size; i++) {
task_queue_deinit(&q->task_queues[i]);
}
free(q->task_queues);
free(q);
}

// Not thread safe. Returns -1 if there are no tasks for the thread.
static int get_tasks_index_for_thread(em_proxying_queue* q, pthread_t thread) {
assert(q != NULL);
for (int i = 0; i < q->size; i++) {
if (pthread_equal(q->task_queues[i].thread, thread)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use TLS here to avoid the linear search? I.e. can't task_queues be accessed by pthread_get_specific?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would work for looking up the queue for the current thread, but is there a way to make that work for looking up the queue for another thread? I guess we could have both TLS and the array, but that seems more complicated for unclear gain. I expect the linear search to very fast anyway, since I don't imagine there would ever be more than a small handful of target threads.

return i;
}
}
return -1;
}

// Not thread safe.
static task_queue* get_or_add_tasks_for_thread(em_proxying_queue* q,
pthread_t thread) {
int tasks_index = get_tasks_index_for_thread(q, thread);
if (tasks_index != -1) {
return &q->task_queues[tasks_index];
}
// There were no tasks for the thread; initialize a new task_queue. If there
// are not enough queues, allocate more.
if (q->size == q->capacity) {
int new_capacity = q->capacity == 0 ? 1 : q->capacity * 2;
task_queue* new_task_queues =
realloc(q->task_queues, sizeof(task_queue) * new_capacity);
if (new_task_queues == NULL) {
return NULL;
}
q->task_queues = new_task_queues;
q->capacity = new_capacity;
}
// Initialize the next available task queue.
task_queue* tasks = &q->task_queues[q->size];
if (!task_queue_init(tasks, thread)) {
return NULL;
}
q->size++;
return tasks;
}

// Exported for use in worker.js.
EMSCRIPTEN_KEEPALIVE
void emscripten_proxy_execute_queue(em_proxying_queue* q) {
assert(q != NULL);
pthread_mutex_lock(&q->mutex);
int tasks_index = get_tasks_index_for_thread(q, pthread_self());
task_queue* tasks = tasks_index == -1 ? NULL : &q->task_queues[tasks_index];
if (tasks == NULL || tasks->processing) {
// No tasks for this thread or they are already being processed.
pthread_mutex_unlock(&q->mutex);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that tasks are dequeud before being run that mutex is unlocked.. why not allow for recursive running of the queue? It certainly seems safe enough.

If we are going to fail due to recursive call perhaps we should return an error code so that we can detect that case when we have queued items but we are refusing the run them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old proxying API prevents recursive queue processing, which makes sense given how frequently it is processed, although I don't whether it's necessary or not. Since we want to rewrite the old API in terms of the new one, I think it makes sense to be conservative and match the old behavior here, then see what happens if we try to relax it by allowing recursive processing in the future. Another option would be to make the system queue special in the new API and allow recursive processing of all queues except the system queue, but I would rather not special case the system queue like that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a TODO to investigate this, though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about leaving out this logic and adding it instead to the old API when it calls into this one? (I guess it could be as simple as a CAS with a is_processing global?). Would make this API slightly simpler and lower level which seems in line with what you are going for maybe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this can't be an entirely separate layer. As part of the rewrite of the old API I would like to remove the postMessage handler for the old system and have everything go through the new handler. But the new handler would then need to call into a wrapper function for handling the recursion avoidance logic specific to the system queue. Since this recursion avoidance can't be entirely layered on top of the new system, it seems better to treat it uniformly across all queues. WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's possible that removing the recursion guard in a follow-up might Just Work, though. Here's the relevant comment from the old system:

// It is possible that when processing a queued call, the control flow leads back to calling this
// function in a nested fashion! Therefore this scenario must explicitly be detected, and
// processing the queue must be avoided if we are nesting, or otherwise the same queued calls
// would be processed again and again.

You're right that the new system is different in that it dequeues before calling, so it won't run into this specific problem. I wouldn't be surprised if there are other reasons we currently depend on this behavior, so I would rather investigate as a follow-up.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I even had a PR up for while to remove the nesting limitation.. but I agree there could be other semantic reasons why recursive running is not desirable. I can't think of them.. but I suspect they exist.

lgtm to this change with or without the recursion restriction (perhaps mention in the docs that recursion doesn't currently work, and return failure when we fail to run for this reason).

return;
}
// Found the task queue; process the tasks.
tasks->processing = 1;
while (!task_queue_is_empty(tasks)) {
task t = task_queue_dequeue(tasks);
// Unlock while the task is running to allow more work to be queued in
// parallel.
pthread_mutex_unlock(&q->mutex);
t.func(t.arg);
pthread_mutex_lock(&q->mutex);
// The tasks might have been reallocated, so recalculate the pointer.
tasks = &q->task_queues[tasks_index];
}
tasks->processing = 0;
pthread_mutex_unlock(&q->mutex);
}

int emscripten_proxy_async(em_proxying_queue* q,
pthread_t target_thread,
void (*func)(void*),
void* arg) {
assert(q != NULL);
pthread_mutex_lock(&q->mutex);
task_queue* tasks = get_or_add_tasks_for_thread(q, target_thread);
if (tasks == NULL) {
goto failed;
}
int empty = task_queue_is_empty(tasks);
if (!task_queue_enqueue(tasks, (task){func, arg})) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

goto failed and put the unlock and return at the end of the function?

goto failed;
}
pthread_mutex_unlock(&q->mutex);
// If the queue was previously empty, notify the target thread to process it.
// Otherwise, the target thread was already notified when the existing work
// was enqueued so we don't need to notify it again.
if (empty) {
_emscripten_notify_proxying_queue(
target_thread, pthread_self(), emscripten_main_browser_thread_id(), q);
}
return 1;

failed:
pthread_mutex_unlock(&q->mutex);
return 0;
}

struct em_proxying_ctx {
// The user-provided function and argument.
void (*func)(em_proxying_ctx*, void*);
void* arg;
// Set `done` to 1 and signal the condition variable once the proxied task is
// done.
int done;
pthread_mutex_t mutex;
pthread_cond_t cond;
};

static void em_proxying_ctx_init(em_proxying_ctx* ctx,
void (*func)(em_proxying_ctx*, void*),
void* arg) {
*ctx = (em_proxying_ctx){.func = func,
.arg = arg,
.done = 0,
.mutex = PTHREAD_MUTEX_INITIALIZER,
.cond = PTHREAD_COND_INITIALIZER};
}

static void em_proxying_ctx_deinit(em_proxying_ctx* ctx) {
pthread_mutex_destroy(&ctx->mutex);
pthread_cond_destroy(&ctx->cond);
}

void emscripten_proxy_finish(em_proxying_ctx* ctx) {
pthread_mutex_lock(&ctx->mutex);
ctx->done = 1;
pthread_mutex_unlock(&ctx->mutex);
pthread_cond_signal(&ctx->cond);
}

// Helper for wrapping the call with ctx as a `void (*)(void*)`.
static void call_with_ctx(void* p) {
em_proxying_ctx* ctx = (em_proxying_ctx*)p;
ctx->func(ctx, ctx->arg);
}

int emscripten_proxy_sync_with_ctx(em_proxying_queue* q,
pthread_t target_thread,
void (*func)(em_proxying_ctx*, void*),
void* arg) {
assert(!pthread_equal(target_thread, pthread_self()) &&
"Cannot synchronously wait for work proxied to the current thread");
em_proxying_ctx ctx;
em_proxying_ctx_init(&ctx, func, arg);
if (!emscripten_proxy_async(q, target_thread, call_with_ctx, &ctx)) {
return 0;
}
pthread_mutex_lock(&ctx.mutex);
while (!ctx.done) {
pthread_cond_wait(&ctx.cond, &ctx.mutex);
}
pthread_mutex_unlock(&ctx.mutex);
em_proxying_ctx_deinit(&ctx);
return 1;
}

// Helper for signaling the end of the task after the user function returns.
static void call_then_finish(em_proxying_ctx* ctx, void* arg) {
task* t = (task*)arg;
t->func(t->arg);
emscripten_proxy_finish(ctx);
}

int emscripten_proxy_sync(em_proxying_queue* q,
pthread_t target_thread,
void (*func)(void*),
void* arg) {
task t = {func, arg};
return emscripten_proxy_sync_with_ctx(q, target_thread, call_then_finish, &t);
}
Loading