-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
R callbacks and promises #28
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -118,7 +118,14 @@ recv_aio <- function(con, | |
"integer", "logical", "numeric", "raw", "string"), | ||
timeout = NULL, | ||
n = 65536L) | ||
data <- .Call(rnng_recv_aio, con, mode, timeout, n, environment()) | ||
data <- .Call(rnng_recv_aio, con, mode, timeout, n, environment(), | ||
function() { | ||
cb <- data$callback | ||
if (!is.null(cb)) { | ||
cb(data) | ||
} | ||
} | ||
) | ||
|
||
#' Receive Async and Signal a Condition | ||
#' | ||
|
@@ -157,7 +164,58 @@ recv_aio_signal <- function(con, | |
"integer", "logical", "numeric", "raw", "string"), | ||
timeout = NULL, | ||
n = 65536L) | ||
data <- .Call(rnng_recv_aio_signal, con, cv, mode, timeout, n, environment()) | ||
data <- .Call(rnng_recv_aio_signal, con, cv, mode, timeout, n, environment(), | ||
function() { | ||
cb <- data$callback | ||
if (!is.null(cb)) { | ||
cb(data) | ||
} | ||
} | ||
) | ||
|
||
#' @exportS3Method promises::is.promising | ||
is.promising.recvAio <- function(x) { | ||
TRUE | ||
} | ||
|
||
#' @exportS3Method promises::as.promise | ||
as.promise.recvAio <- function(x) { | ||
prom <- x$promise | ||
|
||
if (is.null(prom)) { | ||
prom <- promises::promise(function(resolve, reject) { | ||
assign("callback", function(...) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Had to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, sorry about that! :p |
||
|
||
# WARNING: x$data is heavily side-effecty! | ||
value <- x$data | ||
|
||
if (is_error_value(value)) { | ||
reject(simpleError(nng_error(value))) | ||
} else { | ||
resolve(value) | ||
} | ||
}, x) | ||
}) | ||
|
||
# WARNING: x$data is heavily side-effecty! | ||
value <- x$data | ||
|
||
if (!inherits(value, "unresolvedValue")) { | ||
if (is_error_value(value)) { | ||
prom <- promises::promise_reject(simpleError(nng_error(value))) | ||
} else { | ||
prom <- promises::promise_resolve(value) | ||
} | ||
} | ||
|
||
# Save for next time. This is not just an optimization but essential for | ||
# correct behavior if as.promise is called multiple times, because only one | ||
# `callback` can exist on the recvAio object at a time. | ||
assign("promise", prom, x) | ||
} | ||
|
||
prom | ||
} | ||
|
||
# Core aio functions ----------------------------------------------------------- | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
#define NANONEXT_SUPPLEMENTALS | ||
#define NANONEXT_SIGNALS | ||
#include "nanonext.h" | ||
#include "later_shim.h" | ||
|
||
// internals ------------------------------------------------------------------- | ||
|
||
|
@@ -195,6 +196,21 @@ static void isaio_complete(void *arg) { | |
|
||
} | ||
|
||
|
||
static void raio_invoke_cb(void* arg) { | ||
nano_aio *raio = (nano_aio *) arg; | ||
if (raio->cb == NULL || Rf_isNull(raio->cb)) return; | ||
SEXP func = (SEXP)raio->cb; | ||
SEXP callExpr, result; | ||
if (!Rf_isNull(func)) { | ||
PROTECT(callExpr = Rf_lcons(func, R_NilValue)); // Prepare call | ||
PROTECT(result = Rf_eval(callExpr, R_GlobalEnv)); // Execute call | ||
|
||
UNPROTECT(2); | ||
R_ReleaseObject(func); | ||
Comment on lines
+206
to
+210
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not super confident in this part, co-wrote with ChatGPT. Maybe should be written not to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Time to start using ChatGPT! You're right we don't need to protect the result, or we could even cast the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
} | ||
|
||
static void raio_complete(void *arg) { | ||
|
||
nano_aio *raio = (nano_aio *) arg; | ||
|
@@ -210,6 +226,7 @@ static void raio_complete(void *arg) { | |
raio->result = res - !res; | ||
#endif | ||
|
||
later2(raio_invoke_cb, arg, 0); | ||
} | ||
|
||
static void raio_complete_signal(void *arg) { | ||
|
@@ -229,6 +246,7 @@ static void raio_complete_signal(void *arg) { | |
nng_cv_wake(cv); | ||
nng_mtx_unlock(mtx); | ||
|
||
later2(raio_invoke_cb, arg, 0); | ||
} | ||
|
||
static void request_complete_signal(void *arg) { | ||
|
@@ -709,7 +727,7 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP clo) { | |
} | ||
|
||
SEXP rnng_recv_aio_impl(const SEXP con, const SEXP mode, const SEXP timeout, | ||
const SEXP bytes, const SEXP clo, nano_cv *ncv) { | ||
const SEXP bytes, const SEXP clo, const SEXP cb, nano_cv *ncv) { | ||
|
||
const nng_duration dur = timeout == R_NilValue ? NNG_DURATION_DEFAULT : (nng_duration) Rf_asInteger(timeout); | ||
const int signal = ncv != NULL; | ||
|
@@ -725,6 +743,12 @@ SEXP rnng_recv_aio_impl(const SEXP con, const SEXP mode, const SEXP timeout, | |
raio->next = ncv; | ||
raio->type = RECVAIO; | ||
raio->mode = mod; | ||
if (Rf_isNull(cb)) { | ||
raio->cb = NULL; | ||
} else { | ||
R_PreserveObject(cb); | ||
raio->cb = (void*)cb; | ||
} | ||
|
||
if ((xc = nng_aio_alloc(&raio->aio, signal ? raio_complete_signal : raio_complete, raio))) | ||
goto exitlevel1; | ||
|
@@ -791,19 +815,19 @@ SEXP rnng_recv_aio_impl(const SEXP con, const SEXP mode, const SEXP timeout, | |
|
||
} | ||
|
||
SEXP rnng_recv_aio(SEXP con, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo) { | ||
SEXP rnng_recv_aio(SEXP con, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo, SEXP cb) { | ||
|
||
return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, NULL); | ||
return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, cb, NULL); | ||
|
||
} | ||
|
||
SEXP rnng_recv_aio_signal(SEXP con, SEXP cvar, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo) { | ||
SEXP rnng_recv_aio_signal(SEXP con, SEXP cvar, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo, SEXP cb) { | ||
|
||
if (R_ExternalPtrTag(cvar) != nano_CvSymbol) | ||
Rf_error("'cv' is not a valid Condition Variable"); | ||
nano_cv *ncv = (nano_cv *) R_ExternalPtrAddr(cvar); | ||
|
||
return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, ncv); | ||
return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, cb, ncv); | ||
|
||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
#include <later_api.h> | ||
|
||
extern "C" void later2(void (*func)(void*), void* data, double secs) { | ||
later::later(func, data, secs); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
#ifndef LATER_SHIM_H | ||
#define LATER_SHIM_H | ||
|
||
// This is simply a shim so that later::later can be accessed from C, not C++ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I really designed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi Joe, I'm looking at this and in this commit 68fa503, I switch to using the usual There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've switched to calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am now initializing the |
||
void later2(void (*func)(void*), void* data, double secs); | ||
|
||
#endif |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed just to force the later package to load before nanonext does; there's a
R_RegisterCCallable
/R_GetCCallable
dance that needs to happen, otherwise you get an error when thelater_api.h
static initializer loads.