Skip to content

Optimize allocations #133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
Type: Package
Package: nanonext
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
Version: 1.5.2.9012
Version: 1.5.2.9013
Authors@R: c(
person("Charlie", "Gao", , "charlie.gao@posit.co", role = c("aut", "cre"),
person("Charlie", "Gao", , "charlie.gao@shikokuchuo.net", role = c("aut", "cre"),
comment = c(ORCID = "0000-0002-0750-061X")),
person("Posit Software, PBC", role = c("cph", "fnd"),
comment = c(ROR = "03wc8by49")),
Expand Down
46 changes: 26 additions & 20 deletions src/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ void nano_list_do(nano_list_op listop, nano_aio *saio) {
nng_mtx_unlock(free_mtx);
nng_aio_free(saio->aio);
if (saio->data != NULL)
R_Free(saio->data);
R_Free(saio);
free(saio->data);
free(saio);
} else {
saio->mode = 0x1;
nng_mtx_unlock(free_mtx);
Expand Down Expand Up @@ -66,8 +66,8 @@ void nano_list_do(nano_list_op listop, nano_aio *saio) {
nano_aio *data = (nano_aio *) current->data;
nng_aio_free(data->aio);
if (data->data != NULL)
R_Free(data->data);
R_Free(data);
free(data->data);
free(data);
free(current);
}
nng_mtx_unlock(free_mtx);
Expand Down Expand Up @@ -199,8 +199,8 @@ static void iaio_finalizer(SEXP xptr) {
nano_aio *xp = (nano_aio *) NANO_PTR(xptr);
nng_aio_free(xp->aio);
if (xp->data != NULL)
R_Free(xp->data);
R_Free(xp);
free(xp->data);
free(xp);

}

Expand All @@ -211,7 +211,7 @@ static void raio_finalizer(SEXP xptr) {
nng_aio_free(xp->aio);
if (xp->data != NULL)
nng_msg_free((nng_msg *) xp->data);
R_Free(xp);
free(xp);

}

Expand Down Expand Up @@ -508,8 +508,9 @@ SEXP rnng_unresolved2(SEXP x) {
SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP pipe, SEXP clo) {

const nng_duration dur = timeout == R_NilValue ? NNG_DURATION_DEFAULT : (nng_duration) nano_integer(timeout);
nano_aio *saio;

SEXP aio, env, fun;
nano_aio *saio;
nano_buf buf;
int sock, xc;

Expand All @@ -518,7 +519,9 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP pipe, SEXP
const int pipeid = sock ? nano_integer(pipe) : 0;
nano_encodes(mode) == 2 ? nano_encode(&buf, data) : nano_serialize(&buf, data, NANO_PROT(con));
nng_msg *msg;
saio = R_Calloc(1, nano_aio);

saio = calloc(1, sizeof(nano_aio));
NANO_ENSURE_ALLOC(saio);
saio->type = SENDAIO;

if ((xc = nng_msg_alloc(&msg, 0)))
Expand Down Expand Up @@ -553,9 +556,11 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP pipe, SEXP
nng_stream *sp = nst->stream;
nng_iov iov;

saio = R_Calloc(1, nano_aio);
saio = calloc(1, sizeof(nano_aio));
NANO_ENSURE_ALLOC(saio);
saio->type = IOV_SENDAIO;
saio->data = R_Calloc(buf.cur, unsigned char);
saio->data = calloc(buf.cur, sizeof(unsigned char));
NANO_ENSURE_ALLOC_FREE(saio->data, saio);
memcpy(saio->data, buf.buf, buf.cur);
iov.iov_len = buf.cur - nst->textframes;
iov.iov_buf = saio->data;
Expand Down Expand Up @@ -592,10 +597,10 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP pipe, SEXP
exitlevel3:
nng_aio_free(saio->aio);
exitlevel2:
R_Free(saio->data);
free(saio->data);
exitlevel1:
R_Free(saio);
NANO_FREE(buf);
free(saio);
return mk_error_data(-xc);

}
Expand All @@ -619,11 +624,11 @@ SEXP rnng_recv_aio(SEXP con, SEXP mode, SEXP timeout, SEXP cvar, SEXP bytes, SEX
if ((sock = !NANO_PTR_CHECK(con, nano_SocketSymbol)) || !NANO_PTR_CHECK(con, nano_ContextSymbol)) {

const uint8_t mod = (uint8_t) nano_matcharg(mode);
raio = R_Calloc(1, nano_aio);
raio = calloc(1, sizeof(nano_aio));
NANO_ENSURE_ALLOC(raio);
raio->next = ncv;
raio->type = signal ? RECVAIOS : RECVAIO;
raio->mode = mod;
raio->cb = NULL;

if ((xc = nng_aio_alloc(&raio->aio, interrupt ? raio_complete_interrupt : raio_complete, raio)))
goto exitlevel1;
Expand All @@ -642,12 +647,13 @@ SEXP rnng_recv_aio(SEXP con, SEXP mode, SEXP timeout, SEXP cvar, SEXP bytes, SEX
nng_stream **sp = (nng_stream **) NANO_PTR(con);
nng_iov iov;

raio = R_Calloc(1, nano_aio);
raio = calloc(1, sizeof(nano_aio));
NANO_ENSURE_ALLOC(raio);
raio->next = ncv;
raio->type = signal ? IOV_RECVAIOS : IOV_RECVAIO;
raio->mode = mod;
raio->cb = NULL;
raio->data = R_Calloc(xlen, unsigned char);
raio->data = calloc(xlen, sizeof(unsigned char));
NANO_ENSURE_ALLOC_FREE(raio->data, raio);
iov.iov_len = xlen;
iov.iov_buf = raio->data;

Expand Down Expand Up @@ -680,9 +686,9 @@ SEXP rnng_recv_aio(SEXP con, SEXP mode, SEXP timeout, SEXP cvar, SEXP bytes, SEX
exitlevel3:
nng_aio_free(raio->aio);
exitlevel2:
R_Free(raio->data);
free(raio->data);
exitlevel1:
R_Free(raio);
free(raio);
return mk_error_data(xc);

}
Expand Down
29 changes: 17 additions & 12 deletions src/comms.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ static void context_finalizer(SEXP xptr) {
if (NANO_PTR(xptr) == NULL) return;
nng_ctx *xp = (nng_ctx *) NANO_PTR(xptr);
nng_ctx_close(*xp);
R_Free(xp);
free(xp);

}

Expand All @@ -21,12 +21,13 @@ SEXP rnng_ctx_open(SEXP socket) {
Rf_error("'socket' is not a valid Socket");

nng_socket *sock = (nng_socket *) NANO_PTR(socket);
nng_ctx *ctx = R_Calloc(1, nng_ctx);
nng_ctx *ctx = malloc(sizeof(nng_ctx));
NANO_ENSURE_ALLOC(ctx);
SEXP context;

const int xc = nng_ctx_open(ctx, *sock);
if (xc) {
R_Free(ctx);
free(ctx);
ERROR_OUT(xc);
}

Expand All @@ -50,12 +51,13 @@ SEXP rnng_ctx_create(SEXP socket) {
Rf_error("'socket' is not a valid Socket");

nng_socket *sock = (nng_socket *) NANO_PTR(socket);
nng_ctx *ctx = R_Calloc(1, nng_ctx);
nng_ctx *ctx = malloc(sizeof(nng_ctx));
NANO_ENSURE_ALLOC(ctx);
SEXP context;

const int xc = nng_ctx_open(ctx, *sock);
if (xc) {
R_Free(ctx);
free(ctx);
ERROR_OUT(xc);
}

Expand Down Expand Up @@ -96,7 +98,8 @@ SEXP rnng_dial(SEXP socket, SEXP url, SEXP tls, SEXP autostart, SEXP error) {
nng_socket *sock = (nng_socket *) NANO_PTR(socket);
const int start = NANO_INTEGER(autostart);
const char *ur = CHAR(STRING_ELT(url, 0));
nng_dialer *dp = R_Calloc(1, nng_dialer);
nng_dialer *dp = malloc(sizeof(nng_dialer));
NANO_ENSURE_ALLOC(dp);
SEXP dialer, attr, newattr, xp;
nng_tls_config *cfg;
nng_url *up;
Expand Down Expand Up @@ -151,7 +154,7 @@ SEXP rnng_dial(SEXP socket, SEXP url, SEXP tls, SEXP autostart, SEXP error) {
exitlevel2:
nng_url_free(up);
exitlevel1:
R_Free(dp);
free(dp);
if (NANO_INTEGER(error)) ERROR_OUT(xc);
ERROR_RET(xc);

Expand All @@ -170,7 +173,8 @@ SEXP rnng_listen(SEXP socket, SEXP url, SEXP tls, SEXP autostart, SEXP error) {
nng_socket *sock = (nng_socket *) NANO_PTR(socket);
const int start = NANO_INTEGER(autostart);
const char *ur = CHAR(STRING_ELT(url, 0));
nng_listener *lp = R_Calloc(1, nng_listener);
nng_listener *lp = malloc(sizeof(nng_listener));
NANO_ENSURE_ALLOC(lp);
SEXP listener, attr, newattr, xp;
nng_tls_config *cfg;
int xc;
Expand Down Expand Up @@ -218,7 +222,7 @@ SEXP rnng_listen(SEXP socket, SEXP url, SEXP tls, SEXP autostart, SEXP error) {
return nano_success;

exitlevel1:
R_Free(lp);
free(lp);
if (NANO_INTEGER(error)) ERROR_OUT(xc);
ERROR_RET(xc);

Expand Down Expand Up @@ -477,7 +481,8 @@ SEXP rnng_recv(SEXP con, SEXP mode, SEXP block, SEXP bytes) {
nng_iov iov;
nng_aio *aiop;

buf = R_Calloc(xlen, unsigned char);
buf = calloc(xlen, sizeof(unsigned char));
NANO_ENSURE_ALLOC(buf);
iov.iov_len = xlen;
iov.iov_buf = buf;

Expand All @@ -501,7 +506,7 @@ SEXP rnng_recv(SEXP con, SEXP mode, SEXP block, SEXP bytes) {
sz = nng_aio_count(aiop);
nng_aio_free(aiop);
res = nano_decode(buf, sz, mod, NANO_PROT(con));
R_Free(buf);
free(buf);

} else {
Rf_error("'con' is not a valid Socket, Context or Stream");
Expand All @@ -510,7 +515,7 @@ SEXP rnng_recv(SEXP con, SEXP mode, SEXP block, SEXP bytes) {
return res;

exitlevel2:
R_Free(buf);
free(buf);
exitlevel1:
return mk_error(xc);

Expand Down
10 changes: 5 additions & 5 deletions src/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ static void nano_write_bytes(R_outpstream_t stream, void *src, int len) {
size_t req = buf->cur + (size_t) len;
if (req > buf->len) {
if (req > R_XLEN_T_MAX) {
if (buf->len) R_Free(buf->buf);
if (buf->len) free(buf->buf);
Rf_error("serialization exceeds max length of raw vector");
}
do {
buf->len += buf->len > NANONEXT_SERIAL_THR ? NANONEXT_SERIAL_THR : buf->len;
} while (buf->len < req);
buf->buf = R_Realloc(buf->buf, buf->len, unsigned char);
buf->buf = realloc(buf->buf, buf->len);
}

memcpy(buf->buf + buf->cur, src, len);
Expand Down Expand Up @@ -174,7 +174,7 @@ void dialer_finalizer(SEXP xptr) {
if (NANO_PTR(xptr) == NULL) return;
nng_dialer *xp = (nng_dialer *) NANO_PTR(xptr);
nng_dialer_close(*xp);
R_Free(xp);
free(xp);

}

Expand All @@ -183,7 +183,7 @@ void listener_finalizer(SEXP xptr) {
if (NANO_PTR(xptr) == NULL) return;
nng_listener *xp = (nng_listener *) NANO_PTR(xptr);
nng_listener_close(*xp);
R_Free(xp);
free(xp);

}

Expand All @@ -192,7 +192,7 @@ void socket_finalizer(SEXP xptr) {
if (NANO_PTR(xptr) == NULL) return;
nng_socket *xp = (nng_socket *) NANO_PTR(xptr);
nng_close(*xp);
R_Free(xp);
free(xp);

}

Expand Down
7 changes: 5 additions & 2 deletions src/nanonext.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,19 +126,22 @@ extern int R_interrupts_pending;
#define NANONEXT_CHUNK_SIZE INT_MAX // must be <= INT_MAX
#define NANONEXT_STR_SIZE 40
#define NANO_ALLOC(x, sz) \
(x)->buf = R_Calloc(sz, unsigned char); \
(x)->buf = calloc(sz, sizeof(unsigned char)); \
if ((x)->buf == NULL) Rf_error("Memory allocation failed"); \
(x)->len = sz; \
(x)->cur = 0
#define NANO_INIT(x, ptr, sz) \
(x)->buf = ptr; \
(x)->len = 0; \
(x)->cur = sz
#define NANO_FREE(x) if (x.len) R_Free(x.buf)
#define NANO_FREE(x) if (x.len) free(x.buf)
#define NANO_CLASS2(x, cls1, cls2) \
SEXP klass = Rf_allocVector(STRSXP, 2); \
Rf_classgets(x, klass); \
SET_STRING_ELT(klass, 0, Rf_mkChar(cls1)); \
SET_STRING_ELT(klass, 1, Rf_mkChar(cls2))
#define NANO_ENSURE_ALLOC(x) if (x == NULL) Rf_error("Memory allocation failed")
#define NANO_ENSURE_ALLOC_FREE(x, y) if (x == NULL) { free(y); Rf_error("Memory allocation failed"); }

typedef union nano_opt_u {
char *str;
Expand Down
Loading
Loading