Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

R callbacks and promises #28

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ SystemRequirements: 'libnng' >= 1.5 and 'libmbedtls' >= 2.5, or 'cmake' to
compile NNG and/or Mbed TLS included in package sources
Depends:
R (>= 3.5)
Imports:
later
LinkingTo:
later
Suggests:
knitr,
markdown
Expand Down
3 changes: 3 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ S3method(print,recvAio)
S3method(print,sendAio)
S3method(print,tlsConfig)
S3method(print,unresolvedValue)
S3method(promises::as.promise,recvAio)
S3method(promises::is.promising,recvAio)
S3method(start,nanoDialer)
S3method(start,nanoListener)
export("%~>%")
Expand Down Expand Up @@ -96,6 +98,7 @@ export(until_)
export(wait)
export(wait_)
export(write_cert)
importFrom(later,later)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed just to force the later package to load before nanonext does; there's a R_RegisterCCallable/R_GetCCallable dance that needs to happen, otherwise you get an error when the later_api.h static initializer loads.

importFrom(stats,start)
importFrom(tools,md5sum)
importFrom(utils,.DollarNames)
Expand Down
62 changes: 60 additions & 2 deletions R/aio.R
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,14 @@ recv_aio <- function(con,
"integer", "logical", "numeric", "raw", "string"),
timeout = NULL,
n = 65536L)
data <- .Call(rnng_recv_aio, con, mode, timeout, n, environment())
data <- .Call(rnng_recv_aio, con, mode, timeout, n, environment(),
function() {
cb <- data$callback
if (!is.null(cb)) {
cb(data)
}
}
)

#' Receive Async and Signal a Condition
#'
Expand Down Expand Up @@ -157,7 +164,58 @@ recv_aio_signal <- function(con,
"integer", "logical", "numeric", "raw", "string"),
timeout = NULL,
n = 65536L)
data <- .Call(rnng_recv_aio_signal, con, cv, mode, timeout, n, environment())
data <- .Call(rnng_recv_aio_signal, con, cv, mode, timeout, n, environment(),
function() {
cb <- data$callback
if (!is.null(cb)) {
cb(data)
}
}
)

#' @exportS3Method promises::is.promising
is.promising.recvAio <- function(x) {
TRUE
}

#' @exportS3Method promises::as.promise
as.promise.recvAio <- function(x) {
prom <- x$promise

if (is.null(prom)) {
prom <- promises::promise(function(resolve, reject) {
assign("callback", function(...) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to use assign here because $<-.recvAio is a no-op. (Took me quite some time to realize that! 😅)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sorry about that! :p


# WARNING: x$data is heavily side-effecty!
value <- x$data

if (is_error_value(value)) {
reject(simpleError(nng_error(value)))
} else {
resolve(value)
}
}, x)
})

# WARNING: x$data is heavily side-effecty!
value <- x$data

if (!inherits(value, "unresolvedValue")) {
if (is_error_value(value)) {
prom <- promises::promise_reject(simpleError(nng_error(value)))
} else {
prom <- promises::promise_resolve(value)
}
}

# Save for next time. This is not just an optimization but essential for
# correct behavior if as.promise is called multiple times, because only one
# `callback` can exist on the recvAio object at a time.
assign("promise", prom, x)
}

prom
}

# Core aio functions -----------------------------------------------------------

Expand Down
1 change: 1 addition & 0 deletions R/nanonext-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
#' @importFrom stats start
#' @importFrom tools md5sum
#' @importFrom utils .DollarNames
#' @importFrom later later
#' @useDynLib nanonext, .registration = TRUE
#'
"_PACKAGE"
34 changes: 29 additions & 5 deletions src/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#define NANONEXT_SUPPLEMENTALS
#define NANONEXT_SIGNALS
#include "nanonext.h"
#include "later_shim.h"

// internals -------------------------------------------------------------------

Expand Down Expand Up @@ -195,6 +196,21 @@ static void isaio_complete(void *arg) {

}


static void raio_invoke_cb(void* arg) {
nano_aio *raio = (nano_aio *) arg;
if (raio->cb == NULL || Rf_isNull(raio->cb)) return;
SEXP func = (SEXP)raio->cb;
SEXP callExpr, result;
if (!Rf_isNull(func)) {
PROTECT(callExpr = Rf_lcons(func, R_NilValue)); // Prepare call
PROTECT(result = Rf_eval(callExpr, R_GlobalEnv)); // Execute call

UNPROTECT(2);
R_ReleaseObject(func);
Comment on lines +206 to +210
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not super confident in this part, co-wrote with ChatGPT. Maybe should be written not to PROTECT the result since we don't use it?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Time to start using ChatGPT! You're right we don't need to protect the result, or we could even cast the Rf_eval to void. More importantly, we shouldn't be adding/removing potentially large numbers of objects to/from the R precious list - but that's a detail that can be solved.

Copy link
Owner

@shikokuchuo shikokuchuo Apr 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I don't think we need to protect the callback functions in the Precious List, which simplifies things greatly. EDIT: this wasn't the case, but I've made this evaluation safe through R_UnwindProtect. I've continued your work in the 'dev' branch as I don't seem to have permissions to modify this PR. I'm still thinking through what an optimal API would be.

}
}

static void raio_complete(void *arg) {

nano_aio *raio = (nano_aio *) arg;
Expand All @@ -210,6 +226,7 @@ static void raio_complete(void *arg) {
raio->result = res - !res;
#endif

later2(raio_invoke_cb, arg, 0);
}

static void raio_complete_signal(void *arg) {
Expand All @@ -229,6 +246,7 @@ static void raio_complete_signal(void *arg) {
nng_cv_wake(cv);
nng_mtx_unlock(mtx);

later2(raio_invoke_cb, arg, 0);
}

static void request_complete_signal(void *arg) {
Expand Down Expand Up @@ -709,7 +727,7 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP clo) {
}

SEXP rnng_recv_aio_impl(const SEXP con, const SEXP mode, const SEXP timeout,
const SEXP bytes, const SEXP clo, nano_cv *ncv) {
const SEXP bytes, const SEXP clo, const SEXP cb, nano_cv *ncv) {

const nng_duration dur = timeout == R_NilValue ? NNG_DURATION_DEFAULT : (nng_duration) Rf_asInteger(timeout);
const int signal = ncv != NULL;
Expand All @@ -725,6 +743,12 @@ SEXP rnng_recv_aio_impl(const SEXP con, const SEXP mode, const SEXP timeout,
raio->next = ncv;
raio->type = RECVAIO;
raio->mode = mod;
if (Rf_isNull(cb)) {
raio->cb = NULL;
} else {
R_PreserveObject(cb);
raio->cb = (void*)cb;
}

if ((xc = nng_aio_alloc(&raio->aio, signal ? raio_complete_signal : raio_complete, raio)))
goto exitlevel1;
Expand Down Expand Up @@ -791,19 +815,19 @@ SEXP rnng_recv_aio_impl(const SEXP con, const SEXP mode, const SEXP timeout,

}

SEXP rnng_recv_aio(SEXP con, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo) {
SEXP rnng_recv_aio(SEXP con, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo, SEXP cb) {

return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, NULL);
return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, cb, NULL);

}

SEXP rnng_recv_aio_signal(SEXP con, SEXP cvar, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo) {
SEXP rnng_recv_aio_signal(SEXP con, SEXP cvar, SEXP mode, SEXP timeout, SEXP bytes, SEXP clo, SEXP cb) {

if (R_ExternalPtrTag(cvar) != nano_CvSymbol)
Rf_error("'cv' is not a valid Condition Variable");
nano_cv *ncv = (nano_cv *) R_ExternalPtrAddr(cvar);

return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, ncv);
return rnng_recv_aio_impl(con, mode, timeout, bytes, clo, cb, ncv);

}

Expand Down
5 changes: 5 additions & 0 deletions src/later_shim.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include <later_api.h>

extern "C" void later2(void (*func)(void*), void* data, double secs) {
later::later(func, data, secs);
}
7 changes: 7 additions & 0 deletions src/later_shim.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#ifndef LATER_SHIM_H
#define LATER_SHIM_H

// This is simply a shim so that later::later can be accessed from C, not C++
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really designed later_api.h for packages that use Rcpp, luckily this little shim is all that's needed to make it work with C. (If you're aware of a better technique than this, please let me know)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Joe, I'm looking at this and in this commit 68fa503, I switch to using the usual GetCCallable on execLaterNative rather than this shim method. Is there a reason this is to be avoided? I see there is a very old comment block in later here: https://github.com/r-lib/later/blob/main/src/init.c#L57-L74 which says that this interface is to be removed, but I'm not aware of the reason. Thanks!

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've switched to calling execLaterNative2 in fb14833 which is what later_api.h appears to be doing anyway. It is called once in the package init function, so should be safe as well. Unless there are pitfalls I am not aware of, I prefer this method as it (i) utilises the 'official' R linking API and (ii) does not need a C++ compiler to build the nanonext shared object.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am now initializing the GetCCallable to execLaterNative2 when the first promise is 'registered' by calling back into R to load the later namespace. This is inspired by an approach Winston Chang and I took to 'lazy-load' rlang in the later package itself. This means that there is no implementation overhead now for cases that do not use promises.

void later2(void (*func)(void*), void* data, double secs);

#endif
5 changes: 3 additions & 2 deletions src/nanonext.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ typedef struct nano_aio_s {
int result;
void *data;
void *next;
void *cb;
} nano_aio;

typedef struct nano_cv_s {
Expand Down Expand Up @@ -250,8 +251,8 @@ SEXP rnng_protocol_open(SEXP, SEXP);
SEXP rnng_random(SEXP, SEXP);
SEXP rnng_reap(SEXP);
SEXP rnng_recv(SEXP, SEXP, SEXP, SEXP);
SEXP rnng_recv_aio(SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP rnng_recv_aio_signal(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP rnng_recv_aio(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP rnng_recv_aio_signal(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP rnng_request(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP rnng_request_signal(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP rnng_send(SEXP, SEXP, SEXP, SEXP);
Expand Down