From 61005d7f124334cf1a50fb3d54621dbbc1da79f8 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Tue, 6 Aug 2024 15:51:50 +0100 Subject: [PATCH] adds compat for existing mirai release --- DESCRIPTION | 2 +- NEWS.md | 2 +- src/aio.c | 18 +++++++++-- src/comms.c | 18 +++++++++-- src/core.c | 87 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/nanonext.h | 1 + src/sync.c | 9 +++++- 7 files changed, 130 insertions(+), 7 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 3e7248e5f..bc5b8e824 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: nanonext Type: Package Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library -Version: 1.1.1.9016 +Version: 1.1.1.9017 Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is a socket library implementing 'Scalability Protocols', a reliable, high-performance standard for common communications patterns including diff --git a/NEWS.md b/NEWS.md index 7e12a0196..709a1829e 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,4 +1,4 @@ -# nanonext 1.1.1.9016 (development) +# nanonext 1.1.1.9017 (development) #### New Features diff --git a/src/aio.c b/src/aio.c index a966f7a7c..40274d33a 100644 --- a/src/aio.c +++ b/src/aio.c @@ -444,7 +444,14 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP clo) { const SEXP ptrtag = NANO_TAG(con); if ((sock = ptrtag == nano_SocketSymbol) || ptrtag == nano_ContextSymbol) { - nano_encodes(mode) == 2 ? nano_encode(&buf, data) : nano_serialize(&buf, data, NANO_PROT(con)); + switch (nano_encodes(mode)) { + case 1: + nano_serialize(&buf, data, NANO_PROT(con)); break; + case 2: + nano_encode(&buf, data); break; + default: + nano_serialize_old(&buf, data, NANO_PROT(con)); break; + } nng_msg *msg; saio = R_Calloc(1, nano_aio); saio->type = SENDAIO; @@ -500,7 +507,14 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP clo) { nng_pipe *p = (nng_pipe *) NANO_PTR(con); nng_socket sock = nng_pipe_socket(*p); - nano_encodes(mode) == 2 ? nano_encode(&buf, data) : nano_serialize(&buf, data, NANO_PROT(con)); + switch (nano_encodes(mode)) { + case 1: + nano_serialize(&buf, data, NANO_PROT(con)); break; + case 2: + nano_encode(&buf, data); break; + default: + nano_serialize_old(&buf, data, NANO_PROT(con)); break; + } nng_msg *msg; saio = R_Calloc(1, nano_aio); saio->type = SENDAIO; diff --git a/src/comms.c b/src/comms.c index ecda331f4..73e0456a6 100644 --- a/src/comms.c +++ b/src/comms.c @@ -312,7 +312,14 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block) { const SEXP ptrtag = NANO_TAG(con); if (ptrtag == nano_SocketSymbol) { - nano_encodes(mode) == 2 ? nano_encode(&buf, data) : nano_serialize(&buf, data, NANO_PROT(con)); + switch (nano_encodes(mode)) { + case 1: + nano_serialize(&buf, data, NANO_PROT(con)); break; + case 2: + nano_encode(&buf, data); break; + default: + nano_serialize_old(&buf, data, NANO_PROT(con)); break; + } nng_socket *sock = (nng_socket *) NANO_PTR(con); if (flags <= 0) { @@ -347,7 +354,14 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block) { } else if (ptrtag == nano_ContextSymbol) { - nano_encodes(mode) == 2 ? nano_encode(&buf, data) : nano_serialize(&buf, data, NANO_PROT(con)); + switch (nano_encodes(mode)) { + case 1: + nano_serialize(&buf, data, NANO_PROT(con)); break; + case 2: + nano_encode(&buf, data); break; + default: + nano_serialize_old(&buf, data, NANO_PROT(con)); break; + } nng_ctx *ctxp = (nng_ctx *) NANO_PTR(con); nng_msg *msgp; diff --git a/src/core.c b/src/core.c index a380ab1f1..093e12c47 100644 --- a/src/core.c +++ b/src/core.c @@ -235,6 +235,93 @@ SEXP rawToChar(const unsigned char *buf, const size_t sz) { } +void nano_serialize_old(nano_buf *buf, const SEXP object, SEXP hook) { + + NANO_ALLOC(buf, NANONEXT_INIT_BUFSIZE); + const int reg = hook != R_NilValue; + int vec; + + vec = reg ? NANO_INTEGER(CADDDR(hook)) : 0; + buf->buf[0] = 0x7; + buf->buf[1] = (uint8_t) vec; + buf->buf[2] = special_bit; + buf->cur += 16; + + struct R_outpstream_st output_stream; + + R_InitOutPStream( + &output_stream, + (R_pstream_data_t) buf, +#ifdef WORDS_BIGENDIAN + R_pstream_xdr_format, +#else + R_pstream_binary_format, +#endif + NANONEXT_SERIAL_VER, + NULL, + nano_write_bytes, + reg ? nano_inHook : NULL, + reg ? hook : R_NilValue + ); + + R_Serialize(object, &output_stream); + + if (reg && TAG(hook) != R_NilValue) { + ((uint64_t *) (buf->buf))[1] = (uint64_t) buf->cur; + SEXP call, out; + + if (vec) { + + PROTECT(call = Rf_lcons(CADR(hook), Rf_cons(TAG(hook), R_NilValue))); + PROTECT(out = R_UnwindProtect(eval_safe, call, rl_reset, hook, NULL)); + if (TYPEOF(out) == RAWSXP) { + R_xlen_t xlen = XLENGTH(out); + if (buf->cur + xlen > buf->len) { + buf->len = buf->cur + xlen; + buf->buf = R_Realloc(buf->buf, buf->len, unsigned char); + } + memcpy(buf->buf + buf->cur, DATAPTR_RO(out), xlen); + buf->cur += xlen; + } + UNPROTECT(2); + + } else { + + SEXP refList = TAG(hook); + SEXP func = CADR(hook); + R_xlen_t llen = Rf_xlength(refList); + if (buf->cur + sizeof(R_xlen_t) > buf->len) { + buf->len = buf->cur + NANONEXT_INIT_BUFSIZE; + buf->buf = R_Realloc(buf->buf, buf->len, unsigned char); + } + memcpy(buf->buf + buf->cur, &llen, sizeof(R_xlen_t)); + buf->cur += sizeof(R_xlen_t); + + for (R_xlen_t i = 0; i < llen; i++) { + PROTECT(call = Rf_lcons(func, Rf_cons(NANO_VECTOR(refList)[i], R_NilValue))); + PROTECT(out = R_UnwindProtect(eval_safe, call, rl_reset, hook, NULL)); + if (TYPEOF(out) == RAWSXP) { + R_xlen_t xlen = XLENGTH(out); + if (buf->cur + xlen + sizeof(R_xlen_t) > buf->len) { + buf->len = buf->cur + xlen + sizeof(R_xlen_t); + buf->buf = R_Realloc(buf->buf, buf->len, unsigned char); + } + memcpy(buf->buf + buf->cur, &xlen, sizeof(R_xlen_t)); + buf->cur += sizeof(R_xlen_t); + memcpy(buf->buf + buf->cur, DATAPTR_RO(out), xlen); + buf->cur += xlen; + } + UNPROTECT(2); + } + + } + + SET_TAG(hook, R_NilValue); + + } + +} + void nano_serialize(nano_buf *buf, const SEXP object, SEXP hook) { NANO_ALLOC(buf, NANONEXT_INIT_BUFSIZE); diff --git a/src/nanonext.h b/src/nanonext.h index 6a9ccc4f7..dbf5dac99 100644 --- a/src/nanonext.h +++ b/src/nanonext.h @@ -217,6 +217,7 @@ int nano_integer(SEXP); SEXP mk_error(const int); SEXP mk_error_data(const int); SEXP rawToChar(const unsigned char *, const size_t); +void nano_serialize_old(nano_buf *, const SEXP, SEXP); void nano_serialize(nano_buf *, const SEXP, SEXP); SEXP nano_unserialize(unsigned char *, const size_t, SEXP); SEXP nano_decode(unsigned char *, const size_t, const int, SEXP); diff --git a/src/sync.c b/src/sync.c index e5886b47c..090aeaf36 100644 --- a/src/sync.c +++ b/src/sync.c @@ -456,7 +456,14 @@ SEXP rnng_request(SEXP con, SEXP data, SEXP sendmode, SEXP recvmode, SEXP timeou nng_msg *msg; int xc; - nano_encodes(sendmode) == 2 ? nano_encode(&buf, data) : nano_serialize(&buf, data, NANO_PROT(con)); + switch (nano_encodes(sendmode)) { + case 1: + nano_serialize(&buf, data, NANO_PROT(con)); break; + case 2: + nano_encode(&buf, data); break; + default: + nano_serialize_old(&buf, data, NANO_PROT(con)); break; + } saio = R_Calloc(1, nano_aio); saio->data = NULL; saio->next = ncv;