Skip to content

Commit

Permalink
Merge pull request #23 from shikokuchuo/dev
Browse files Browse the repository at this point in the history
%~>% signal forwarder
  • Loading branch information
shikokuchuo authored Nov 16, 2023
2 parents a8f48b4 + 43c30be commit 1512036
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 0 deletions.
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ S3method(print,tlsConfig)
S3method(print,unresolvedValue)
S3method(start,nanoDialer)
S3method(start,nanoListener)
export("%~>%")
export("opt<-")
export(.context)
export(.unresolved)
Expand Down
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#### New Features

* Introduces `wait_aio()`, a user-interruptible version of `call_aio()`.
* Implements `%~>%` signal forwarder from one 'conditionVariable' to another.

#### Updates

Expand Down
34 changes: 34 additions & 0 deletions R/sync.R
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,37 @@ lock <- function(socket, cv = NULL) invisible(.Call(rnng_socket_lock, socket, cv
#' @export
#'
unlock <- function(socket) invisible(.Call(rnng_socket_unlock, socket))

#' Signal Forwarder
#'
#' Forwards signals from one 'conditionVariable' to another.
#'
#' @param cv a 'conditionVariable' object, from which to forward the signal.
#' @param cv2 a 'conditionVariable' object, to which the signal is forwarded.
#'
#' @return Invisibly, 'cv2'.
#'
#' @details This is an experimental operator.
#'
#' The condition value of 'cv' is initially reset to zero when this operator
#' returns.
#'
#' Changes in the condition value of 'cv' are forwarded to 'cv2', but only
#' on each occassion 'cv' is signalled. This means that waiting on 'cv'
#' will cause a temporary divergence between the actual condition value of
#' 'cv' and that recorded at 'cv2', until the next time 'cv' is signalled.
#'
#' @examples
#' cva <- cv(); cvb <- cv(); cv1 <- cv(); cv2 <- cv()
#'
#' cva %~>% cv1 %~>% cv2
#' cvb %~>% cv2
#'
#' cv_signal(cva)
#' cv_signal(cvb)
#' cv_value(cv1)
#' cv_value(cv2)
#'
#' @export
#'
`%~>%` <- function(cv, cv2) invisible(.Call(rnng_signal_thread_create, cv, cv2))
42 changes: 42 additions & 0 deletions man/grapes-twiddle-greater-than-grapes.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ static const R_CallMethodDef callMethods[] = {
{"rnng_sha256", (DL_FUNC) &rnng_sha256, 3},
{"rnng_sha384", (DL_FUNC) &rnng_sha384, 3},
{"rnng_sha512", (DL_FUNC) &rnng_sha512, 3},
{"rnng_signal_thread_create", (DL_FUNC) &rnng_signal_thread_create, 2},
{"rnng_sleep", (DL_FUNC) &rnng_sleep, 1},
{"rnng_socket_lock", (DL_FUNC) &rnng_socket_lock, 2},
{"rnng_socket_unlock", (DL_FUNC) &rnng_socket_unlock, 1},
Expand Down
1 change: 1 addition & 0 deletions src/nanonext.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ extern SEXP rnng_sha224(SEXP, SEXP, SEXP);
extern SEXP rnng_sha256(SEXP, SEXP, SEXP);
extern SEXP rnng_sha384(SEXP, SEXP, SEXP);
extern SEXP rnng_sha512(SEXP, SEXP, SEXP);
extern SEXP rnng_signal_thread_create(SEXP, SEXP);
extern SEXP rnng_sleep(SEXP);
extern SEXP rnng_socket_lock(SEXP, SEXP);
extern SEXP rnng_socket_unlock(SEXP);
Expand Down
102 changes: 102 additions & 0 deletions src/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ typedef struct nano_thread_aio_s {
nng_aio *aio;
} nano_thread_aio;

typedef struct nano_thread_duo_s {
nng_thread *thr;
nano_cv *cv;
nano_cv *cv2;
} nano_thread_duo;

static void thread_finalizer(SEXP xptr) {

if (R_ExternalPtrAddr(xptr) == NULL)
Expand All @@ -58,6 +64,25 @@ static void thread_aio_finalizer(SEXP xptr) {

}

static void thread_duo_finalizer(SEXP xptr) {

if (R_ExternalPtrAddr(xptr) == NULL)
return;
nano_thread_duo *xp = (nano_thread_duo *) R_ExternalPtrAddr(xptr);
nano_cv *ncv = xp->cv;
nng_mtx *mtx = ncv->mtx;
nng_cv *cv = ncv->cv;

nng_mtx_lock(mtx);
ncv->condition = -1;
nng_cv_wake(cv);
nng_mtx_unlock(mtx);

nng_thread_destroy(xp->thr);
R_Free(xp);

}

static void rnng_messenger_thread(void *args) {

SEXP plist = (SEXP) args;
Expand Down Expand Up @@ -275,3 +300,80 @@ SEXP rnng_wait_thread_create(SEXP aio) {
return aio;

}

static void rnng_signal_thread(void *args) {

nano_thread_duo *duo = (nano_thread_duo *) args;
nano_cv *ncv = duo->cv;
nng_mtx *mtx = ncv->mtx;
nng_cv *cv = ncv->cv;
nano_cv *ncv2 = duo->cv2;
nng_mtx *mtx2 = ncv2->mtx;
nng_cv *cv2 = ncv2->cv;

int incr, cond = 0;

nng_mtx_lock(mtx);
while (ncv->condition == cond)
nng_cv_wait(cv);
if (ncv->condition < 0) {
ncv->condition = cond;
nng_mtx_unlock(mtx);
return;
}
incr = ncv->condition - cond;
cond = cond + incr;
nng_mtx_unlock(mtx);

while (1) {

nng_mtx_lock(mtx2);
ncv2->condition = ncv2->condition + incr;
nng_cv_wake(cv2);
nng_mtx_unlock(mtx2);

nng_mtx_lock(mtx);
while (ncv->condition == cond)
nng_cv_wait(cv);
if (ncv->condition < 0) {
ncv->condition = cond;
nng_mtx_unlock(mtx);
break;
}
incr = ncv->condition - cond;
cond = cond + incr;
nng_mtx_unlock(mtx);

}

}

SEXP rnng_signal_thread_create(SEXP cv, SEXP cv2) {

if (R_ExternalPtrTag(cv) != nano_CvSymbol)
Rf_error("'cv' is not a valid Condition Variable");

if (R_ExternalPtrTag(cv2) != nano_CvSymbol)
Rf_error("'cv2' is not a valid Condition Variable");

nano_thread_duo *duo = R_Calloc(1, nano_thread_duo);
nano_cv *ncv = (nano_cv *) R_ExternalPtrAddr(cv);
nano_cv *ncv2 = (nano_cv *) R_ExternalPtrAddr(cv2);
duo->cv = ncv;
duo->cv2 = ncv2;

nng_mtx *dmtx = ncv->mtx;
nng_mtx_lock(dmtx);
ncv->condition = 0;
nng_mtx_unlock(dmtx);

nng_thread_create(&duo->thr, rnng_signal_thread, duo);

SEXP xptr = R_MakeExternalPtr(duo, R_NilValue, R_NilValue);
Rf_setAttrib(cv, nano_CvSymbol, xptr);
R_RegisterCFinalizerEx(xptr, thread_duo_finalizer, TRUE);
R_MakeWeakRef(xptr, cv, R_NilValue, FALSE);

return cv2;

}
3 changes: 3 additions & 0 deletions tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,11 @@ nanotestz(sub$close())
nanotestz(pub$close())
nanotest(wait(cv))
nanotest(wait(cv2))
nanotestxp(cv3 <- cv())
nanotestxp(cv %~>% cv2 %~>% cv3)
nanotestz(cv_signal(cv))
nanotest(cv_value(cv) == 1L)
nanotest(wait(cv3))

nanotestnano(surv <- nano(protocol = "surveyor", listen = "inproc://sock1", dial = "inproc://sock2"))
nanotestp(surv)
Expand Down

0 comments on commit 1512036

Please sign in to comment.