Skip to content

Commit

Permalink
Message: privatized associatedFd and ad
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdelisle committed Feb 23, 2021
1 parent 7272cbd commit 897bc02
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 61 deletions.
2 changes: 1 addition & 1 deletion admin/Admin.c
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ void Admin_registerFunctionWithArgCount(char* name,
static void importFd(Dict* args, void* vAdmin, String* txid, struct Allocator* requestAlloc)
{
struct Admin_pvt* admin = Identity_check((struct Admin_pvt*) vAdmin);
int fd = admin->currentRequest->associatedFd;
int fd = Message_getAssociatedFd(admin->currentRequest);
Dict* res = Dict_new(requestAlloc);
char* error = "none";
if (fd < 0) {
Expand Down
14 changes: 8 additions & 6 deletions interface/FramingIface.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,16 @@ static struct Message* mergeMessage(struct FramingIface_pvt* fi, struct Message*

struct Message* out = Message_new(0, length + REQUIRED_PADDING, fi->frameAlloc);
Er_assert(Message_epush(out, last->bytes, last->length));
out->associatedFd = last->associatedFd;
int fd = Message_getAssociatedFd(last);
for (part = fi->frameParts; part; part = part->next) {
Er_assert(Message_epush(out, part->msg->bytes, part->msg->length));
if (!out->associatedFd) {
out->associatedFd = part->msg->associatedFd;
if (fd == -1) {
fd = Message_getAssociatedFd(part->msg);
}
}

if (fd > -1) {
Message_setAssociatedFd(out, fd);
}
Assert_true(length <= out->length);
return out;
}
Expand Down Expand Up @@ -129,7 +131,7 @@ static Iface_DEFUN receiveMessage(struct Message* msg, struct Iface* streamIf)
} else if (fi->bytesRemaining < (uint32_t)msg->length) {
struct Allocator* alloc = Allocator_child(msg->alloc);
struct Message* m = Message_new(fi->bytesRemaining, REQUIRED_PADDING, alloc);
m->associatedFd = msg->associatedFd;
Message_setAssociatedFd(m, Message_getAssociatedFd(msg));
Bits_memcpy(m->bytes, msg->bytes, fi->bytesRemaining);
Er_assert(Message_eshift(msg, -fi->bytesRemaining));
fi->bytesRemaining = 0;
Expand All @@ -140,7 +142,7 @@ static Iface_DEFUN receiveMessage(struct Message* msg, struct Iface* streamIf)
} else {
fi->frameAlloc = Allocator_child(fi->alloc);
struct Message* m = Allocator_calloc(fi->frameAlloc, sizeof(struct Message), 1);
m->associatedFd = msg->associatedFd;
Message_setAssociatedFd(m, Message_getAssociatedFd(msg));
m->capacity = m->length = msg->length + 4;
m->bytes = Allocator_calloc(fi->frameAlloc, m->length, 1);
m->alloc = fi->frameAlloc;
Expand Down
34 changes: 13 additions & 21 deletions util/events/libuv/Pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ static void sendMessage2(struct Pipe_WriteRequest_pvt* req)
};

int ret = -1;
if (pipe->ipc && m->associatedFd && !Defined(win32)) {
int fd = Message_getAssociatedFd(m);
int fd = Message_getAssociatedFd(m);
if (pipe->ipc && fd > -1 && !Defined(win32)) {
uv_stream_t* fake_handle = Allocator_calloc(req->alloc, sizeof(uv_stream_t), 1);
#ifndef win32
fake_handle->io_watcher.fd = fd;
Expand Down Expand Up @@ -194,10 +194,10 @@ static void incoming(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) stream->data);

// Grab out the allocator which was placed there by allocate()
struct Allocator* alloc = buf->base ? ALLOC(buf->base) : NULL;
struct Message* msg = buf->base ? ALLOC(buf->base) : NULL;
pipe->isInCallback = 1;

Assert_true(!alloc || alloc->fileName == pipe->alloc->fileName);
Assert_true(!msg || msg->alloc->fileName == pipe->alloc->fileName);

if (nread < 0) {
if (pipe->pub.onClose) {
Expand All @@ -209,24 +209,18 @@ static void incoming(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
//Log_debug(pipe->log, "Pipe 0 length read [%s]", pipe->pub.fullName);

} else {
Assert_true(alloc);
struct Message* m = Allocator_calloc(alloc, sizeof(struct Message), 1);
m->length = nread;
m->padding = Pipe_PADDING_AMOUNT;
m->capacity = buf->len;
m->bytes = (uint8_t*)buf->base;
m->ad = &m->bytes[-Pipe_PADDING_AMOUNT];
m->alloc = alloc;
Assert_true(msg);
msg->length = nread;
if (pipe->ipc) {
#ifndef win32
Message_setAssociatedFd(m, stream->accepted_fd);
Message_setAssociatedFd(msg, stream->accepted_fd);
#endif
}
Iface_send(&pipe->pub.iface, m);
Iface_send(&pipe->pub.iface, msg);
}

if (alloc) {
Allocator_free(alloc);
if (msg) {
Allocator_free(msg->alloc);
}

pipe->isInCallback = 0;
Expand All @@ -244,15 +238,13 @@ static void allocate(uv_handle_t* handle, size_t size, uv_buf_t* buf)
{
struct Pipe_pvt* pipe = Identity_check((struct Pipe_pvt*) handle->data);
size = Pipe_BUFFER_CAP;
size_t fullSize = size + Pipe_PADDING_AMOUNT;

struct Allocator* child = Allocator_child(pipe->alloc);
char* buff = Allocator_malloc(child, fullSize);
buff += Pipe_PADDING_AMOUNT;
struct Message* msg = Message_new(size, Pipe_PADDING_AMOUNT, child);

ALLOC(buff) = child;
ALLOC(msg->bytes) = msg;

buf->base = buff;
buf->base = msg->bytes;
buf->len = size;
}

Expand Down
33 changes: 15 additions & 18 deletions util/events/libuv/UDPAddrIface.c
Original file line number Diff line number Diff line change
Expand Up @@ -151,38 +151,32 @@ static void incoming(uv_udp_t* handle,
context->inCallback = 1;

// Grab out the allocator which was placed there by allocate()
struct Allocator* alloc = buf->base ? ALLOC(buf->base) : NULL;
struct Message* msg = buf->base ? ALLOC(buf->base) : NULL;

// if nread < 0, we used to log uv_last_error, which doesn't exist anymore.
if (nread == 0) {
// Happens constantly
//Log_debug(context->logger, "0 length read");

} else {
struct Message* m = Allocator_calloc(alloc, sizeof(struct Message), 1);
m->length = nread;
m->padding = UDPAddrIface_PADDING_AMOUNT + context->pub.generic.addr->addrLen;
m->capacity = buf->len;
m->bytes = (uint8_t*)buf->base;
m->ad = &m->bytes[-m->padding];
m->alloc = alloc;
Er_assert(Message_epush(m, addr, context->pub.generic.addr->addrLen - Sockaddr_OVERHEAD));
msg->length = nread;
Er_assert(Message_epush(msg, addr, context->pub.generic.addr->addrLen - Sockaddr_OVERHEAD));

// make sure the sockaddr doesn't have crap in it which will
// prevent it from being used as a lookup key
Sockaddr_normalizeNative((struct sockaddr*) m->bytes);
Sockaddr_normalizeNative((struct sockaddr*) msg->bytes);

Er_assert(Message_epush(m, context->pub.generic.addr, Sockaddr_OVERHEAD));
Er_assert(Message_epush(msg, context->pub.generic.addr, Sockaddr_OVERHEAD));

/*uint8_t buff[256] = {0};
Assert_true(Hex_encode(buff, 255, m->bytes, context->pub.generic.addr->addrLen));
Log_debug(context->logger, "Message from [%s]", buff);*/

Iface_send(&context->pub.generic.iface, m);
Iface_send(&context->pub.generic.iface, msg);
}

if (alloc) {
Allocator_free(alloc);
if (msg) {
Allocator_free(msg->alloc);
}

context->inCallback = 0;
Expand All @@ -199,12 +193,15 @@ static void allocate(uv_handle_t* handle, size_t size, uv_buf_t* buf)
size_t fullSize = size + UDPAddrIface_PADDING_AMOUNT + context->pub.generic.addr->addrLen;

struct Allocator* child = Allocator_child(context->allocator);
char* buff = Allocator_malloc(child, fullSize);
buff += UDPAddrIface_PADDING_AMOUNT + context->pub.generic.addr->addrLen;
struct Message* msg = Message_new(
UDPAddrIface_BUFFER_CAP,
UDPAddrIface_PADDING_AMOUNT + context->pub.generic.addr->addrLen,
child
);

ALLOC(buff) = child;
ALLOC(msg->bytes) = msg;

buf->base = buff;
buf->base = msg->bytes;
buf->len = size;
}

Expand Down
16 changes: 8 additions & 8 deletions wire/Message.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ struct Message* Message_new(uint32_t messageLength,
{
uint8_t* buff = Allocator_malloc(alloc, messageLength + amountOfPadding);
struct Message* out = Allocator_calloc(alloc, sizeof(struct Message), 1);
out->ad = buff;
out->_ad = buff;
out->adLen = 0;
out->bytes = &buff[amountOfPadding];
out->length = out->capacity = messageLength;
Expand All @@ -33,22 +33,22 @@ struct Message* Message_new(uint32_t messageLength,
void Message_setAssociatedFd(struct Message* msg, int fd)
{
if (fd == -1) {
msg->associatedFd = 0;
msg->_associatedFd = 0;
} else if (fd == 0) {
msg->associatedFd = -1;
msg->_associatedFd = -1;
} else {
msg->associatedFd = fd;
msg->_associatedFd = fd;
}
}

int Message_getAssociatedFd(struct Message* msg)
{
if (msg->associatedFd == -1) {
if (msg->_associatedFd == -1) {
return 0;
} else if (msg->associatedFd == 0) {
} else if (msg->_associatedFd == 0) {
return -1;
} else {
return msg->associatedFd;
return msg->_associatedFd;
}
}

Expand All @@ -65,7 +65,7 @@ struct Message* Message_clone(struct Message* toClone, struct Allocator* alloc)
.length = toClone->length,
.padding = toClone->padding,
.bytes = allocation + toClone->adLen + toClone->padding,
.ad = allocation + toClone->adLen,
._ad = allocation + toClone->adLen,
.adLen = toClone->adLen,
.capacity = toClone->capacity,
.alloc = alloc
Expand Down
14 changes: 7 additions & 7 deletions wire/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ typedef struct Message
int32_t adLen;

// Pointer to associated data
uint8_t* ad;
uint8_t* _ad;

/**
* When sending/receiving a Message on a unix socket, a file descriptor to attach.
* Caviat: In order to maintain backward compatibility with a Message which is
* allocated using calloc, file descriptor 0 is referred to by -1
*/
int associatedFd;
int _associatedFd;

#ifdef PARANOIA
/** This is used inside of Iface.h to support Iface_next() */
Expand Down Expand Up @@ -99,13 +99,13 @@ static inline Er_DEFUN(void Message_epushAd(struct Message* restrict msg,
Er_raise(msg->alloc, "not enough padding to push ad");
}
if (object) {
Bits_memcpy(msg->ad, object, size);
Bits_memcpy(msg->_ad, object, size);
} else {
Bits_memset(msg->ad, 0x00, size);
Bits_memset(msg->_ad, 0x00, size);
}
msg->adLen += size;
msg->padding -= size;
msg->ad = &msg->ad[size];
msg->_ad = &msg->_ad[size];
Er_ret();
}

Expand All @@ -118,9 +118,9 @@ static inline Er_DEFUN(void Message_epopAd(struct Message* restrict msg,
}
msg->adLen -= size;
msg->padding += size;
msg->ad = &msg->ad[-((int)size)];
msg->_ad = &msg->_ad[-((int)size)];
if (object) {
Bits_memcpy(object, msg->ad, size);
Bits_memcpy(object, msg->_ad, size);
}
Er_ret();
}
Expand Down

0 comments on commit 897bc02

Please sign in to comment.