Skip to content

Commit

Permalink
preserve order of calls
Browse files Browse the repository at this point in the history
  • Loading branch information
kroggen committed May 21, 2017
1 parent f19aba8 commit f9e54ca
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 36 deletions.
23 changes: 12 additions & 11 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
#include "../uv_callback.c"
#include <assert.h>

uv_thread_t worker_thread;
uv_barrier_t barrier;
uv_async_t stop_worker;
uv_thread_t worker_thread;
uv_barrier_t barrier;
uv_callback_t stop_worker;

int progress_called = 0;
int static_call_counter = 0;
Expand Down Expand Up @@ -60,26 +60,25 @@ void * on_sum(uv_callback_t *callback, void *data) {
struct numbers *request = (struct numbers *)data;
struct numbers *response = malloc(sizeof(struct numbers));
assert(response != 0);
printf("sum (%p) number1: %d number2: %d\n", data, request->number1, request->number2);
response->number1 = request->number1;
response->number2 = request->number2;
response->result = request->number1 + request->number2;
printf("sum1 (%p) number1: %d number2: %d result: %d\n", data, request->number1, request->number2, response->result);
free(request);
printf("sum result: %d\n", response->result);
return response;
}

void * on_sum2(uv_callback_t *callback, void *data) {
struct numbers *request = (struct numbers *)data;
int result = request->number1 + request->number2;
printf("sum (%p) number1: %d number2: %d result: %d\n", data, request->number1, request->number2, result);
printf("sum2 (%p) number1: %d number2: %d result: %d\n", data, request->number1, request->number2, result);
free(request);
return (void*)result;
}

void stop_worker_cb(uv_async_t *handle) {
void * stop_worker_cb(uv_callback_t *handle, void *arg) {
puts("signal received to stop worker thread");
uv_stop(handle->loop);
uv_stop(((uv_handle_t*)handle)->loop);
}

void worker_start(void *arg) {
Expand Down Expand Up @@ -108,7 +107,9 @@ void worker_start(void *arg) {
printf("uv_callback_init rc=%d\n", rc);
assert(rc == 0);

uv_async_init(&loop, &stop_worker, stop_worker_cb);
rc = uv_callback_init(&loop, &stop_worker, stop_worker_cb, UV_COALESCE);
printf("uv_callback_init rc=%d\n", rc);
assert(rc == 0);

/* signal to the main thread the the listening socket is ready */
uv_barrier_wait(&barrier);
Expand All @@ -130,7 +131,7 @@ uv_callback_t cb_result;

void * on_result(uv_callback_t *callback, void *data) {
struct numbers *response = (struct numbers *)data;
printf("sum result (%p) number1: %d number2: %d result=%d\n", data, response->number1, response->number2, response->result);
printf("on sum result (%p) number1: %d number2: %d result=%d\n", data, response->number1, response->number2, response->result);
assert(response->number1 == 123);
assert(response->number2 == 456);
assert(response->result == 579);
Expand Down Expand Up @@ -241,7 +242,7 @@ int main() {
assert(result == 333);

/* send a signal to the worker thread to exit */
uv_async_send(&stop_worker);
uv_callback_fire(&stop_worker, NULL, NULL);

/* wait the worker thread to exit */
uv_thread_join(&worker_thread);
Expand Down
56 changes: 34 additions & 22 deletions uv_callback.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@

void uv_callback_idle_cb(uv_idle_t* handle);

/* Master Callback ***********************************************************/

void master_on_walk(uv_handle_t *handle, void *arg) {
if (handle->type == UV_ASYNC && ((uv_callback_t*)handle)->usequeue) {
*(uv_callback_t**)arg = (uv_callback_t *) handle;
}
}

uv_callback_t * get_master_callback(uv_loop_t *loop) {
uv_callback_t *callback=0;
uv_walk(loop, master_on_walk, &callback);
return callback;
}

/* Dequeue *******************************************************************/

void * dequeue_call(uv_callback_t* callback) {
Expand Down Expand Up @@ -45,7 +59,7 @@ void uv_callback_async_cb(uv_async_t* handle) {
if (callback->usequeue) {
uv_call_t *call = dequeue_call(callback);
if (call) {
void *result = callback->function(callback, call->data);
void *result = call->callback->function(call->callback, call->data);
if (call->notify) uv_callback_fire(call->notify, result, NULL);
free(call);
/* don't check for new calls now to prevent the loop from blocking
Expand Down Expand Up @@ -77,25 +91,26 @@ int uv_callback_init(uv_loop_t* loop, uv_callback_t* callback, uv_callback_func

if (!loop || !callback || !function) return UV_EINVAL;

memset(callback, 0, sizeof(uv_callback_t));
callback->function = function;
callback->queue = NULL;

switch(callback_type) {
case UV_DEFAULT:
callback->usequeue = 1;
uv_mutex_init(&callback->mutex);
break;
callback->master = get_master_callback(loop);
if (callback->master) {
return 0;
} else {
uv_mutex_init(&callback->mutex);
rc = uv_idle_init(loop, &callback->idle);
if (rc) return rc;
}
case UV_COALESCE:
callback->usequeue = 0;
break;
default:
return UV_EINVAL;
}

callback->idle_active = 0;
rc = uv_idle_init(loop, &callback->idle);
if (rc) return rc;

return uv_async_init(loop, (uv_async_t*) callback, uv_callback_async_cb);
}

Expand All @@ -119,6 +134,9 @@ int uv_callback_fire(uv_callback_t* callback, void *data, uv_callback_t* notify)
/* save the call info */
call->data = data;
call->notify = notify;
call->callback = callback;
/* if there is a master callback, use it */
if (callback->master) callback = callback->master;
/* add the call to the queue */
uv_mutex_lock(&callback->mutex);
call->next = callback->queue;
Expand Down Expand Up @@ -168,24 +186,17 @@ int uv_callback_fire_sync(uv_callback_t* callback, void *data, void** presult, i

if (!callback || callback->usequeue==0) return UV_EINVAL;

/* allocate a new call info */
uv_call_t *call = malloc(sizeof(uv_call_t));
if (!call) return UV_ENOMEM;
/* set the call result */
uv_loop_init(&loop);
uv_callback_init(&loop, &notify, on_call_result, UV_DEFAULT);
loop.data = &result;
/* save the call info */
call->data = data;
call->notify = &notify;
/* add the call to the queue */
uv_mutex_lock(&callback->mutex);
call->next = callback->queue;
callback->queue = call;
uv_mutex_unlock(&callback->mutex);

/* call uv_async_send to fire the callback on the other thread */
uv_async_send((uv_async_t*)callback);
/* fire the callback on the other thread */
rc = uv_callback_fire(callback, data, &notify);
if (rc) {
uv_close((uv_handle_t *) &notify, NULL);
goto loc_exit;
}

/* if a timeout is supplied, set a timer */
if (timeout > 0) {
Expand All @@ -199,6 +210,7 @@ int uv_callback_fire_sync(uv_callback_t* callback, void *data, void** presult, i
/* exited the event loop */
uv_walk(&loop, callback_on_walk, NULL);
uv_run(&loop, UV_RUN_DEFAULT);
loc_exit:
uv_loop_close(&loop);

/* store the result */
Expand Down
8 changes: 5 additions & 3 deletions uv_callback.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ struct uv_callback_s {
void *arg; /* data argument for coalescing calls (when not using queue) */
uv_idle_t idle; /* idle handle used to drain the queue if new async request was sent while an old one was being processed */
int idle_active; /* flags if the idle handle is active */
uv_callback_t *master; /* master callback handle */
};

struct uv_call_s {
uv_call_t *next; /* pointer to the next call in the queue */
void *data; /* data argument for this call */
uv_callback_t *notify; /* callback to be fired with the result of this one */
uv_call_t *next; /* pointer to the next call in the queue */
uv_callback_t *callback; /* callback linked to this call */
void *data; /* data argument for this call */
uv_callback_t *notify; /* callback to be fired with the result of this one */
};


Expand Down

0 comments on commit f9e54ca

Please sign in to comment.