Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle backlog on UNIX socket too #2493

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Handle backlog of UNIX socket too
The cheaper subsystem specifies that "backlog is only available on Linux
and only on TCP sockets (not UNIX domain sockets)."
This commit specifically implement this: UNIX domain socket support for
backlog on Linux, using Netlink to call the kernel and get the queue status.
  • Loading branch information
Pierre Ducroquet committed Oct 17, 2022
commit 77ee4ba3be35f8402a48d9a49097a47a610d8e39
152 changes: 147 additions & 5 deletions core/master.c
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,13 @@ static void get_tcp_info(struct uwsgi_socket *uwsgi_sock) {

#ifdef UNBIT
#define SIOBKLGQ 0x8908
#else

#include <linux/netlink.h>
#include <linux/rtnetlink.h>
#include <linux/unix_diag.h>
#include <linux/sock_diag.h>

#endif

#ifdef SIOBKLGQ
Expand All @@ -272,7 +279,141 @@ static void get_linux_unbit_SIOBKLGQ(struct uwsgi_socket *uwsgi_sock) {
uwsgi_sock->max_queue = (uint64_t) uwsgi.listen_queue;
}
}
#else

static int get_socket_inode_from_fd(int fd) {
int inode = -1;
char source_link_path[32];
char link_target[32];
sprintf(source_link_path, "/proc/self/fd/%i", fd);
if (readlink(source_link_path, link_target, 32) > 0)
sscanf(link_target, "socket:[%i]", &inode);
return inode;
}

static void send_netlink_query_for_inode(int fd, int target_ino) {
struct sockaddr_nl nladdr = {.nl_family = AF_NETLINK};
struct {
struct nlmsghdr nlh;
struct unix_diag_req udr;
} req = {
.nlh = {
.nlmsg_len = sizeof(req),
.nlmsg_type = SOCK_DIAG_BY_FAMILY,
.nlmsg_flags = NLM_F_REQUEST
},
.udr = {
.sdiag_family = AF_UNIX,
.udiag_show = UDIAG_SHOW_RQLEN,
.udiag_ino = target_ino,
.udiag_cookie = {-1, -1}
}
};
struct iovec iov = {.iov_base = &req, .iov_len = sizeof(req)};
struct msghdr msg = {
.msg_name = &nladdr,
.msg_namelen = sizeof(nladdr),
.msg_iov = &iov,
.msg_iovlen = 1
};

for (;;) {
if (sendmsg(fd, &msg, 0) < 0) {
if (errno == EINTR)
continue;

perror("sendmsg");
return;
}

return;
}
}

static int receive_netlink_answers(int fd, uint64_t *current_queue, uint64_t *max_queue) {
long buf[8192 / sizeof(long)];
struct sockaddr_nl nladdr;
struct iovec iov = {.iov_base = buf, .iov_len = sizeof(buf)};
int flags = 0;

for (;;) {
struct msghdr msg = {
.msg_name = &nladdr,
.msg_namelen = sizeof(nladdr),
.msg_iov = &iov,
.msg_iovlen = 1
};

ssize_t ret = recvmsg(fd, &msg, flags);

if (ret < 0) {
if (errno == EINTR)
continue;

perror("recvmsg");
return -1;
}
if (ret == 0)
return 0;

if (nladdr.nl_family != AF_NETLINK) {
fputs("!AF_NETLINK\n", stderr);
return -1;
}

const struct nlmsghdr *h = (struct nlmsghdr *)buf;

if (!NLMSG_OK(h, ret)) {
fputs("!NLMSG_OK\n", stderr);
return -1;
}

for (; NLMSG_OK(h, ret); h = NLMSG_NEXT(h, ret)) {
if (h->nlmsg_type == NLMSG_DONE)
return 0;
if (h->nlmsg_type == NLMSG_ERROR) {
const struct nlmsgerr *err = NLMSG_DATA(h);
if (h->nlmsg_len < NLMSG_LENGTH(sizeof(*err))) {
fputs("NLMSG_ERROR\n", stderr);
} else {
errno = -err->error;
perror("NLMSG_ERROR");
}
return -1;
}
if (h->nlmsg_type != SOCK_DIAG_BY_FAMILY) {
fprintf(stderr, "unexpected nlmsg_type %u\n", (unsigned)h->nlmsg_type);
return -1;
}

// Now extract queue len from results
const struct unix_diag_msg *diag = NLMSG_DATA(h);
unsigned int rta_len = h->nlmsg_len - NLMSG_LENGTH(sizeof(*diag));

for (struct rtattr *attr = (struct rtattr *)(diag + 1); RTA_OK(attr, rta_len); attr = RTA_NEXT(attr, rta_len)) {
switch (attr->rta_type) {
case UNIX_DIAG_RQLEN:
*current_queue = ((struct unix_diag_rqlen *) RTA_DATA(attr))->udiag_rqueue;
*max_queue = ((struct unix_diag_rqlen *) RTA_DATA(attr))->udiag_wqueue;
break;
}
}
}
return 0;
}
}

static void get_linux_unix_socket_queue(struct uwsgi_socket *uwsgi_sock) {
int fd = uwsgi_sock->fd;
int inode = get_socket_inode_from_fd(fd);
int diag_socket = socket(AF_NETLINK, SOCK_RAW | SOCK_CLOEXEC, NETLINK_SOCK_DIAG);
send_netlink_query_for_inode(diag_socket, inode);
receive_netlink_answers(diag_socket, &uwsgi_sock->queue, &uwsgi_sock->max_queue);
close(diag_socket);
}

#endif

#endif

static void master_check_listen_queue() {
Expand All @@ -282,15 +423,16 @@ static void master_check_listen_queue() {
while(uwsgi_sock) {
if (uwsgi_sock->family == AF_INET) {
get_tcp_info(uwsgi_sock);
}
}
else if (uwsgi_sock->family == AF_UNIX) {
#ifdef __linux__
#ifdef SIOBKLGQ
else if (uwsgi_sock->family == AF_UNIX) {
get_linux_unbit_SIOBKLGQ(uwsgi_sock);
}
get_linux_unbit_SIOBKLGQ(uwsgi_sock);
#else
get_linux_unix_socket_queue(uwsgi_sock);
#endif
#endif

}
if (uwsgi_sock->queue > backlog) {
backlog = uwsgi_sock->queue;
}
Expand Down