From f9e54ca561e40cb61398534c3b069c800c537a41 Mon Sep 17 00:00:00 2001 From: Bernardo Ramos Date: Sun, 21 May 2017 17:32:53 +0000 Subject: [PATCH] preserve order of calls --- test/test.c | 23 +++++++++++---------- uv_callback.c | 56 +++++++++++++++++++++++++++++++-------------------- uv_callback.h | 8 +++++--- 3 files changed, 51 insertions(+), 36 deletions(-) diff --git a/test/test.c b/test/test.c index d296f74..3528958 100644 --- a/test/test.c +++ b/test/test.c @@ -4,9 +4,9 @@ #include "../uv_callback.c" #include -uv_thread_t worker_thread; -uv_barrier_t barrier; -uv_async_t stop_worker; +uv_thread_t worker_thread; +uv_barrier_t barrier; +uv_callback_t stop_worker; int progress_called = 0; int static_call_counter = 0; @@ -60,26 +60,25 @@ void * on_sum(uv_callback_t *callback, void *data) { struct numbers *request = (struct numbers *)data; struct numbers *response = malloc(sizeof(struct numbers)); assert(response != 0); - printf("sum (%p) number1: %d number2: %d\n", data, request->number1, request->number2); response->number1 = request->number1; response->number2 = request->number2; response->result = request->number1 + request->number2; + printf("sum1 (%p) number1: %d number2: %d result: %d\n", data, request->number1, request->number2, response->result); free(request); - printf("sum result: %d\n", response->result); return response; } void * on_sum2(uv_callback_t *callback, void *data) { struct numbers *request = (struct numbers *)data; int result = request->number1 + request->number2; - printf("sum (%p) number1: %d number2: %d result: %d\n", data, request->number1, request->number2, result); + printf("sum2 (%p) number1: %d number2: %d result: %d\n", data, request->number1, request->number2, result); free(request); return (void*)result; } -void stop_worker_cb(uv_async_t *handle) { +void * stop_worker_cb(uv_callback_t *handle, void *arg) { puts("signal received to stop worker thread"); - uv_stop(handle->loop); + uv_stop(((uv_handle_t*)handle)->loop); } void worker_start(void *arg) { @@ -108,7 +107,9 @@ void worker_start(void *arg) { printf("uv_callback_init rc=%d\n", rc); assert(rc == 0); - uv_async_init(&loop, &stop_worker, stop_worker_cb); + rc = uv_callback_init(&loop, &stop_worker, stop_worker_cb, UV_COALESCE); + printf("uv_callback_init rc=%d\n", rc); + assert(rc == 0); /* signal to the main thread the the listening socket is ready */ uv_barrier_wait(&barrier); @@ -130,7 +131,7 @@ uv_callback_t cb_result; void * on_result(uv_callback_t *callback, void *data) { struct numbers *response = (struct numbers *)data; - printf("sum result (%p) number1: %d number2: %d result=%d\n", data, response->number1, response->number2, response->result); + printf("on sum result (%p) number1: %d number2: %d result=%d\n", data, response->number1, response->number2, response->result); assert(response->number1 == 123); assert(response->number2 == 456); assert(response->result == 579); @@ -241,7 +242,7 @@ int main() { assert(result == 333); /* send a signal to the worker thread to exit */ - uv_async_send(&stop_worker); + uv_callback_fire(&stop_worker, NULL, NULL); /* wait the worker thread to exit */ uv_thread_join(&worker_thread); diff --git a/uv_callback.c b/uv_callback.c index fc30ec6..91d6aa2 100644 --- a/uv_callback.c +++ b/uv_callback.c @@ -14,6 +14,20 @@ void uv_callback_idle_cb(uv_idle_t* handle); +/* Master Callback ***********************************************************/ + +void master_on_walk(uv_handle_t *handle, void *arg) { + if (handle->type == UV_ASYNC && ((uv_callback_t*)handle)->usequeue) { + *(uv_callback_t**)arg = (uv_callback_t *) handle; + } +} + +uv_callback_t * get_master_callback(uv_loop_t *loop) { + uv_callback_t *callback=0; + uv_walk(loop, master_on_walk, &callback); + return callback; +} + /* Dequeue *******************************************************************/ void * dequeue_call(uv_callback_t* callback) { @@ -45,7 +59,7 @@ void uv_callback_async_cb(uv_async_t* handle) { if (callback->usequeue) { uv_call_t *call = dequeue_call(callback); if (call) { - void *result = callback->function(callback, call->data); + void *result = call->callback->function(call->callback, call->data); if (call->notify) uv_callback_fire(call->notify, result, NULL); free(call); /* don't check for new calls now to prevent the loop from blocking @@ -77,25 +91,26 @@ int uv_callback_init(uv_loop_t* loop, uv_callback_t* callback, uv_callback_func if (!loop || !callback || !function) return UV_EINVAL; + memset(callback, 0, sizeof(uv_callback_t)); callback->function = function; - callback->queue = NULL; switch(callback_type) { case UV_DEFAULT: callback->usequeue = 1; - uv_mutex_init(&callback->mutex); - break; + callback->master = get_master_callback(loop); + if (callback->master) { + return 0; + } else { + uv_mutex_init(&callback->mutex); + rc = uv_idle_init(loop, &callback->idle); + if (rc) return rc; + } case UV_COALESCE: - callback->usequeue = 0; break; default: return UV_EINVAL; } - callback->idle_active = 0; - rc = uv_idle_init(loop, &callback->idle); - if (rc) return rc; - return uv_async_init(loop, (uv_async_t*) callback, uv_callback_async_cb); } @@ -119,6 +134,9 @@ int uv_callback_fire(uv_callback_t* callback, void *data, uv_callback_t* notify) /* save the call info */ call->data = data; call->notify = notify; + call->callback = callback; + /* if there is a master callback, use it */ + if (callback->master) callback = callback->master; /* add the call to the queue */ uv_mutex_lock(&callback->mutex); call->next = callback->queue; @@ -168,24 +186,17 @@ int uv_callback_fire_sync(uv_callback_t* callback, void *data, void** presult, i if (!callback || callback->usequeue==0) return UV_EINVAL; - /* allocate a new call info */ - uv_call_t *call = malloc(sizeof(uv_call_t)); - if (!call) return UV_ENOMEM; /* set the call result */ uv_loop_init(&loop); uv_callback_init(&loop, ¬ify, on_call_result, UV_DEFAULT); loop.data = &result; - /* save the call info */ - call->data = data; - call->notify = ¬ify; - /* add the call to the queue */ - uv_mutex_lock(&callback->mutex); - call->next = callback->queue; - callback->queue = call; - uv_mutex_unlock(&callback->mutex); - /* call uv_async_send to fire the callback on the other thread */ - uv_async_send((uv_async_t*)callback); + /* fire the callback on the other thread */ + rc = uv_callback_fire(callback, data, ¬ify); + if (rc) { + uv_close((uv_handle_t *) ¬ify, NULL); + goto loc_exit; + } /* if a timeout is supplied, set a timer */ if (timeout > 0) { @@ -199,6 +210,7 @@ int uv_callback_fire_sync(uv_callback_t* callback, void *data, void** presult, i /* exited the event loop */ uv_walk(&loop, callback_on_walk, NULL); uv_run(&loop, UV_RUN_DEFAULT); +loc_exit: uv_loop_close(&loop); /* store the result */ diff --git a/uv_callback.h b/uv_callback.h index f0b3068..9636e7b 100644 --- a/uv_callback.h +++ b/uv_callback.h @@ -47,12 +47,14 @@ struct uv_callback_s { void *arg; /* data argument for coalescing calls (when not using queue) */ uv_idle_t idle; /* idle handle used to drain the queue if new async request was sent while an old one was being processed */ int idle_active; /* flags if the idle handle is active */ + uv_callback_t *master; /* master callback handle */ }; struct uv_call_s { - uv_call_t *next; /* pointer to the next call in the queue */ - void *data; /* data argument for this call */ - uv_callback_t *notify; /* callback to be fired with the result of this one */ + uv_call_t *next; /* pointer to the next call in the queue */ + uv_callback_t *callback; /* callback linked to this call */ + void *data; /* data argument for this call */ + uv_callback_t *notify; /* callback to be fired with the result of this one */ };