Skip to content

Commit

Permalink
Merge pull request #10930 from omoerbeek/rec-control-stream
Browse files Browse the repository at this point in the history
rec: Move to a stream based socket for the control channel
  • Loading branch information
omoerbeek authored Nov 3, 2021
2 parents 90ec8eb + c25917c commit afd2b96
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 110 deletions.
21 changes: 9 additions & 12 deletions pdns/pdns_recursor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4207,21 +4207,18 @@ template ThreadTimes broadcastAccFunction(const boost::function<ThreadTimes*()>&
static void handleRCC(int fd, FDMultiplexer::funcparam_t& var)
{
try {
string remote;
string msg = s_rcc.recv(&remote).d_str;
FDWrapper clientfd = accept(fd, nullptr, nullptr);
if (clientfd == -1) {
throw PDNSException("accept failed");
}
string msg = s_rcc.recv(clientfd).d_str;
g_log << Logger::Info << "Received rec_control command '" << msg << "' via controlsocket" << endl;

RecursorControlParser rcp;
RecursorControlParser::func_t* command;
auto answer = rcp.getAnswer(clientfd, msg, &command);

g_log << Logger::Info << "Received rec_control command '" << msg << "' from control socket" << endl;
auto answer = rcp.getAnswer(fd, msg, &command);

// If we are inside a chroot, we need to strip
if (!arg()["chroot"].empty()) {
size_t len = arg()["chroot"].length();
remote = remote.substr(len);
}

s_rcc.send(answer, &remote);
s_rcc.send(clientfd, answer);
command();
}
catch(const std::exception& e) {
Expand Down
132 changes: 38 additions & 94 deletions pdns/rec_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,34 +34,9 @@ RecursorControlChannel::~RecursorControlChannel()
unlink(d_local.sun_path);
}

static void setSocketBuffer(int fd, int optname, uint32_t size)
{
uint32_t psize = 0;
socklen_t len = sizeof(psize);

if (getsockopt(fd, SOL_SOCKET, optname, (void*)&psize, &len))
throw PDNSException("Unable to getsocket buffer size: " + stringerror());

if (psize > size)
return;

// failure to raise is not fatal
(void)setsockopt(fd, SOL_SOCKET, optname, (const void*)&size, sizeof(size));
}

static void setSocketReceiveBuffer(int fd, uint32_t size)
{
setSocketBuffer(fd, SO_RCVBUF, size);
}

static void setSocketSendBuffer(int fd, uint32_t size)
{
setSocketBuffer(fd, SO_SNDBUF, size);
}

int RecursorControlChannel::listen(const string& fname)
{
d_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
d_fd = socket(AF_UNIX, SOCK_STREAM, 0);
setCloseOnExec(d_fd);

if (d_fd < 0)
Expand All @@ -80,19 +55,17 @@ int RecursorControlChannel::listen(const string& fname)

if (bind(d_fd, (sockaddr*)&d_local, sizeof(d_local)) < 0)
throw PDNSException("Unable to bind to controlsocket '" + fname + "': " + stringerror());

// receive buf should be size of max datagram plus address size
setSocketReceiveBuffer(d_fd, 60 * 1024);
setSocketSendBuffer(d_fd, 64 * 1024);

if (::listen(d_fd, 0) == -1) {
throw PDNSException("Unable to listen on controlsocket '" + fname + "': " + stringerror());
}
return d_fd;
}

void RecursorControlChannel::connect(const string& path, const string& fname)
{
struct sockaddr_un remote;

d_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
d_fd = socket(AF_UNIX, SOCK_STREAM, 0);
setCloseOnExec(d_fd);

if (d_fd < 0)
Expand All @@ -103,24 +76,6 @@ void RecursorControlChannel::connect(const string& path, const string& fname)
if (setsockopt(d_fd, SOL_SOCKET, SO_REUSEADDR, (char*)&tmp, sizeof tmp) < 0)
throw PDNSException("Setsockopt failed: " + stringerror());

string localname = path + "/lsockXXXXXX";
*d_local.sun_path = 0;
if (makeUNsockaddr(localname, &d_local))
throw PDNSException("Unable to bind to local temporary file, path '" + localname + "' is not a valid UNIX socket path.");

if (mkstemp(d_local.sun_path) < 0)
throw PDNSException("Unable to generate local temporary file in directory '" + path + "': " + stringerror());

int err = unlink(d_local.sun_path);
if (err < 0 && errno != ENOENT)
throw PDNSException("Unable to remove local controlsocket: " + stringerror());

if (bind(d_fd, (sockaddr*)&d_local, sizeof(d_local)) < 0)
throw PDNSException("Unable to bind to local temporary file: " + stringerror());

if (chmod(d_local.sun_path, 0666) < 0) // make sure that pdns can reply!
throw PDNSException("Unable to chmod local temporary socket: " + stringerror());

string remotename = path + "/" + fname;
if (makeUNsockaddr(remotename, &remote))
throw PDNSException("Unable to connect to controlsocket, path '" + remotename + "' is not a valid UNIX socket path.");
Expand All @@ -130,10 +85,6 @@ void RecursorControlChannel::connect(const string& path, const string& fname)
unlink(d_local.sun_path);
throw PDNSException("Unable to connect to remote '" + string(remote.sun_path) + "': " + stringerror());
}

// receive buf should be size of max datagram plus address size
setSocketReceiveBuffer(d_fd, 60 * 1024);
setSocketSendBuffer(d_fd, 64 * 1024);
}
catch (...) {
close(d_fd);
Expand All @@ -143,7 +94,7 @@ void RecursorControlChannel::connect(const string& path, const string& fname)
}
}

static void sendfd(int s, int fd, const string* remote)
static void sendfd(int s, int fd)
{
struct msghdr msg;
struct cmsghdr* cmsg;
Expand All @@ -159,10 +110,6 @@ static void sendfd(int s, int fd, const string* remote)
io_vector[0].iov_len = 1;

memset(&msg, 0, sizeof(msg));
if (remote) {
msg.msg_name = const_cast<char*>(remote->c_str());
msg.msg_namelen = remote->length();
}
msg.msg_control = &cmsgbuf.buf;
msg.msg_controllen = sizeof(cmsgbuf.buf);
msg.msg_iov = io_vector;
Expand All @@ -179,62 +126,59 @@ static void sendfd(int s, int fd, const string* remote)
}
}

void RecursorControlChannel::send(const Answer& msg, const std::string* remote, unsigned int timeout, int fd)
void RecursorControlChannel::send(int fd, const Answer& msg, unsigned int timeout, int fd_to_pass)
{
int ret = waitForRWData(d_fd, false, timeout, 0);
int ret = waitForRWData(fd, false, timeout, 0);
if (ret == 0) {
throw PDNSException("Timeout sending message over control channel");
}
else if (ret < 0) {
throw PDNSException("Error sending message over control channel:" + stringerror());
}

if (remote) {
struct sockaddr_un remoteaddr;
memset(&remoteaddr, 0, sizeof(remoteaddr));

remoteaddr.sun_family = AF_UNIX;
strncpy(remoteaddr.sun_path, remote->c_str(), sizeof(remoteaddr.sun_path) - 1);
remoteaddr.sun_path[sizeof(remoteaddr.sun_path) - 1] = '\0';

if (::sendto(d_fd, &msg.d_ret, sizeof(msg.d_ret), 0, (struct sockaddr*)&remoteaddr, sizeof(remoteaddr)) < 0)
throw PDNSException("Unable to send message over control channel '" + string(remoteaddr.sun_path) + "': " + stringerror());
if (::sendto(d_fd, msg.d_str.c_str(), msg.d_str.length(), 0, (struct sockaddr*)&remoteaddr, sizeof(remoteaddr)) < 0)
throw PDNSException("Unable to send message over control channel '" + string(remoteaddr.sun_path) + "': " + stringerror());
if (::send(fd, &msg.d_ret, sizeof(msg.d_ret), 0) < 0) {
throw PDNSException("Unable to send return code over control channel: " + stringerror());
}
else {
if (::send(d_fd, &msg.d_ret, sizeof(msg.d_ret), 0) < 0)
throw PDNSException("Unable to send message over control channel: " + stringerror());
if (::send(d_fd, msg.d_str.c_str(), msg.d_str.length(), 0) < 0)
throw PDNSException("Unable to send message over control channel: " + stringerror());
size_t len = msg.d_str.length();
if (::send(fd, &len, sizeof(len), 0) < 0) {
throw PDNSException("Unable to send length over control channel: " + stringerror());
}
if (fd != -1) {
sendfd(d_fd, fd, remote);
if (::send(fd, msg.d_str.c_str(), len, 0) != static_cast<ssize_t>(len)) {
throw PDNSException("Unable to send message over control channel: " + stringerror());
}

if (fd_to_pass != -1) {
sendfd(fd, fd_to_pass);
}
}

RecursorControlChannel::Answer RecursorControlChannel::recv(std::string* remote, unsigned int timeout)
RecursorControlChannel::Answer RecursorControlChannel::recv(int fd, unsigned int timeout)
{
char buffer[16384];
ssize_t len;
struct sockaddr_un remoteaddr;
socklen_t addrlen = sizeof(remoteaddr);

int ret = waitForData(d_fd, timeout, 0);
int ret = waitForData(fd, timeout, 0);
if (ret == 0) {
throw PDNSException("Timeout waiting for answer from control channel");
}
int err;
if (::recvfrom(d_fd, &err, sizeof(err), 0, (struct sockaddr*)&remoteaddr, &addrlen) != sizeof(err)) {
throw PDNSException("Unable to receive return status over control channel1: " + stringerror());
if (::recv(fd, &err, sizeof(err), 0) != sizeof(err)) {
throw PDNSException("Unable to receive return status over control channel: " + stringerror());
}
if ((len = ::recvfrom(d_fd, buffer, sizeof(buffer), 0, (struct sockaddr*)&remoteaddr, &addrlen)) < 0) {
throw PDNSException("Unable to receive message over control channel2: " + stringerror());
size_t len;
if (::recv(fd, &len, sizeof(len), 0) != sizeof(len)) {
throw PDNSException("Unable to receive length over control channel: " + stringerror());
}

if (remote) {
*remote = remoteaddr.sun_path;
string str;
str.reserve(len);
while (str.length() < len) {
char buffer[1024];
size_t toRead = std::min(len - str.length(), sizeof(buffer));
ssize_t recvd = ::recv(fd, buffer, toRead, 0);
if (recvd <= 0) {
// EOF means we have a length error
throw PDNSException("Unable to receive message over control channel: " + stringerror());
}
str.append(buffer, recvd);
}

return {err, string(buffer, buffer + len)};
return {err, str};
}
4 changes: 2 additions & 2 deletions pdns/rec_channel.hh
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ public:
std::string d_str;
};

void send(const Answer&, const std::string* remote = nullptr, unsigned int timeout = 5, int fd = -1);
RecursorControlChannel::Answer recv(std::string* remote = nullptr, unsigned int timeout = 5);
void send(int remote, const Answer&, unsigned int timeout = 5, int fd_to_pass = -1);
RecursorControlChannel::Answer recv(int fd, unsigned int timeout = 5);

int d_fd;
static std::atomic<bool> stop;
Expand Down
4 changes: 2 additions & 2 deletions pdns/rec_control.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,9 @@ int main(int argc, char** argv)
auto timeout = arg().asNum("timeout");
RecursorControlChannel rccS;
rccS.connect(arg()["socket-dir"], sockname);
rccS.send({0, command}, nullptr, timeout, fd);
rccS.send(rccS.d_fd, {0, command}, timeout, fd);

auto receive = rccS.recv(0, timeout);
auto receive = rccS.recv(rccS.d_fd, timeout);
if (receive.d_ret != 0) {
cerr << receive.d_str;
}
Expand Down

0 comments on commit afd2b96

Please sign in to comment.