Skip to content

Commit

Permalink
IPC connections in tcpmuxd are properly cleaned up.
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
  • Loading branch information
sustrik committed Nov 21, 2014
1 parent 14f4559 commit 3acb492
Showing 1 changed file with 68 additions and 14 deletions.
82 changes: 68 additions & 14 deletions src/devices/tcpmuxd.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ int nn_tcpmuxd (int port)
#include "../utils/mutex.h"
#include "../utils/closefd.h"

#include <stddef.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
Expand All @@ -61,6 +62,9 @@ struct nn_tcpmuxd_ctx {
int tcp_listener;
int ipc_listener;
struct nn_list conns;
struct pollfd *pfd;
size_t pfd_size;
size_t pfd_capacity;
struct nn_thread thread;
};

Expand All @@ -72,7 +76,8 @@ struct nn_tcpmuxd_conn {

/* Forward declarations. */
static void nn_tcpmuxd_routine (void *arg);
static int send_fd (int s, int fd);
static void nn_tcpmuxd_disconnect (struct nn_tcpmuxd_ctx *ctx, int i);
static int nn_tcpmuxd_send_fd (int s, int fd);

int nn_tcpmuxd (int port)
{
Expand Down Expand Up @@ -118,7 +123,15 @@ int nn_tcpmuxd (int port)
ctx->tcp_listener = tcp_listener;
ctx->ipc_listener = ipc_listener;
nn_list_init (&ctx->conns);

ctx->pfd = nn_alloc (sizeof (struct pollfd) * 16, "tcpmuxd pollfd");
alloc_assert (ctx->pfd);
ctx->pfd_capacity = 16;
ctx->pfd [0].fd = tcp_listener;
ctx->pfd [0].events = POLLIN;
ctx->pfd [1].fd = ipc_listener;
ctx->pfd [1].events = POLLIN;
ctx->pfd_size = 2;

/* Run the daemon in a dedicated thread. */
nn_thread_init (&ctx->thread, nn_tcpmuxd_routine, ctx);

Expand All @@ -130,7 +143,6 @@ static void nn_tcpmuxd_routine (void *arg)
{
int rc;
struct nn_tcpmuxd_ctx *ctx;
struct pollfd pfd [2];
int conn;
int pos;
char service [256];
Expand All @@ -144,20 +156,15 @@ static void nn_tcpmuxd_routine (void *arg)

ctx = (struct nn_tcpmuxd_ctx*) arg;

pfd [0].fd = ctx->tcp_listener;
pfd [0].events = POLLIN;
pfd [1].fd = ctx->ipc_listener;
pfd [1].events = POLLIN;

while (1) {

/* Wait for events. */
rc = poll (pfd, 2, -1);
rc = poll (ctx->pfd, ctx->pfd_size, -1);
errno_assert (rc >= 0);
nn_assert (rc != 0);

/* There's an incoming TCP connection. */
if (pfd [0].revents & POLLIN) {
if (ctx->pfd [0].revents & POLLIN) {

/* Accept the connection. */
conn = accept (ctx->tcp_listener, NULL, NULL);
Expand Down Expand Up @@ -225,12 +232,12 @@ static void nn_tcpmuxd_routine (void *arg)
nn_assert (ssz == 3);

/* Pass the file descriptor to the listening process. */
rc = send_fd (tc->fd, conn);
rc = nn_tcpmuxd_send_fd (tc->fd, conn);
errno_assert (rc == 0);
}

/* There's an incoming IPC connection. */
if (pfd [1].revents & POLLIN) {
if (ctx->pfd [1].revents & POLLIN) {

/* Accept the connection. */
conn = accept (ctx->ipc_listener, NULL, NULL);
Expand All @@ -242,7 +249,18 @@ static void nn_tcpmuxd_routine (void *arg)
tc = nn_alloc (sizeof (struct nn_tcpmuxd_conn), "tcpmuxd_conn");
nn_assert (tc);
tc->fd = conn;
nn_list_item_init (&tc->item);
nn_list_item_init (&tc->item);

/* Adjust the pollset. We will poll for errors only. */
ctx->pfd_size++;
if (ctx->pfd_size > ctx->pfd_capacity) {
ctx->pfd_capacity *= 2;
ctx->pfd = nn_realloc (ctx->pfd,
sizeof (struct pollfd) * ctx->pfd_capacity);
alloc_assert (ctx->pfd);
}
ctx->pfd [ctx->pfd_size - 1].fd = conn;
ctx->pfd [ctx->pfd_size - 1].events = 0;

/* Read the connection header. */
ssz = recv (conn, buf, 2, 0);
Expand All @@ -261,11 +279,47 @@ static void nn_tcpmuxd_routine (void *arg)
/* Add the entry to the IPC connections list. */
nn_list_insert (&ctx->conns, &tc->item, nn_list_end (&ctx->conns));
}

for (i = 2; i < ctx->pfd_size; ++i) {
if (ctx->pfd [i].revents & POLLERR ||
ctx->pfd [i].revents & POLLHUP) {
nn_tcpmuxd_disconnect (ctx, i);
i--;
}
}
}
}

/* Tear down the IPC connection with index i in the pollset. */
static void nn_tcpmuxd_disconnect (struct nn_tcpmuxd_ctx *ctx, int i)
{
int fd;
struct nn_list_item *it;
struct nn_tcpmuxd_conn *conn;

fd = ctx->pfd [i].fd;

/* Remove the descriptor from the pollset. */
if (ctx->pfd_size > 3)
ctx->pfd [i] = ctx->pfd [ctx->pfd_size - 1];
ctx->pfd_size--;

/* Remove the connection entry. */
for (it = nn_list_begin (&ctx->conns);
it != nn_list_end (&ctx->conns);
it = nn_list_next (&ctx->conns, it)) {
conn = nn_cont (it, struct nn_tcpmuxd_conn, item);
if (conn->fd == fd) {
nn_list_erase (&ctx->conns, it);
nn_free (conn->service);
nn_free (conn);
break;
}
}
}

/* Send file descriptor fd to IPC socket s. */
static int send_fd (int s, int fd)
static int nn_tcpmuxd_send_fd (int s, int fd)
{
int rc;
struct iovec iov;
Expand Down

0 comments on commit 3acb492

Please sign in to comment.