Skip to content

Commit 0d92d40

Browse files
kwvgUdjinM6
andcommitted
net: implement WaitMany variants for {epoll, kqueue}
Co-authored-by: UdjinM6 <UdjinM6@users.noreply.github.com>
1 parent 0a8b8a6 commit 0d92d40

File tree

9 files changed

+149
-119
lines changed

9 files changed

+149
-119
lines changed

src/i2p.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ bool Session::Accept(Connection& conn)
160160

161161
while (!*m_interrupt) {
162162
Sock::Event occurred;
163-
if (!conn.sock->Wait(MAX_WAIT_FOR_IO, Sock::RECV, SEM_LT_DEFAULT, &occurred)) {
163+
if (!conn.sock->Wait(MAX_WAIT_FOR_IO, Sock::RECV, SocketEventsParams(), &occurred)) {
164164
errmsg = "wait on socket failed";
165165
break;
166166
}

src/net.cpp

Lines changed: 1 addition & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,6 @@
5656
#include <ifaddrs.h>
5757
#endif
5858

59-
#ifdef USE_EPOLL
60-
#include <sys/epoll.h>
61-
#endif
62-
63-
#ifdef USE_KQUEUE
64-
#include <sys/event.h>
65-
#endif
66-
6759
#include <algorithm>
6860
#include <array>
6961
#include <cstdint>
@@ -2375,79 +2367,6 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes)
23752367
return events_per_sock;
23762368
}
23772369

2378-
#ifdef USE_KQUEUE
2379-
void CConnman::SocketEventsKqueue(std::set<SOCKET>& recv_set,
2380-
std::set<SOCKET>& send_set,
2381-
std::set<SOCKET>& error_set,
2382-
bool only_poll)
2383-
{
2384-
std::array<struct kevent, MAX_EVENTS> events{};
2385-
struct timespec timeout = MillisToTimespec(only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);
2386-
2387-
int ret{-1};
2388-
ToggleWakeupPipe([&](){
2389-
ret = kevent(Assert(m_edge_trig_events)->GetFileDescriptor(), nullptr, 0, events.data(), events.size(), &timeout);
2390-
});
2391-
if (ret == SOCKET_ERROR) {
2392-
LogPrintf("kevent wait error\n");
2393-
return;
2394-
}
2395-
2396-
for (int i = 0; i < ret; i++) {
2397-
auto& event = events[i];
2398-
if ((event.flags & EV_ERROR) || (event.flags & EV_EOF)) {
2399-
error_set.insert((SOCKET)event.ident);
2400-
continue;
2401-
}
2402-
2403-
if (event.filter == EVFILT_READ) {
2404-
recv_set.insert((SOCKET)event.ident);
2405-
}
2406-
2407-
if (event.filter == EVFILT_WRITE) {
2408-
send_set.insert((SOCKET)event.ident);
2409-
}
2410-
}
2411-
}
2412-
#endif
2413-
2414-
#ifdef USE_EPOLL
2415-
void CConnman::SocketEventsEpoll(std::set<SOCKET>& recv_set,
2416-
std::set<SOCKET>& send_set,
2417-
std::set<SOCKET>& error_set,
2418-
bool only_poll)
2419-
{
2420-
std::array<epoll_event, MAX_EVENTS> events{};
2421-
2422-
int ret{-1};
2423-
ToggleWakeupPipe([&](){
2424-
ret = epoll_wait(Assert(m_edge_trig_events)->GetFileDescriptor(), events.data(), events.size(),
2425-
only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);
2426-
});
2427-
2428-
if (ret == SOCKET_ERROR) {
2429-
LogPrintf("epoll_wait error\n");
2430-
return;
2431-
}
2432-
2433-
for (int i = 0; i < ret; i++) {
2434-
auto& e = events[i];
2435-
if((e.events & EPOLLERR) || (e.events & EPOLLHUP)) {
2436-
error_set.insert((SOCKET)e.data.fd);
2437-
continue;
2438-
}
2439-
2440-
if (e.events & EPOLLIN) {
2441-
recv_set.insert((SOCKET)e.data.fd);
2442-
}
2443-
2444-
if (e.events & EPOLLOUT) {
2445-
send_set.insert((SOCKET)e.data.fd);
2446-
}
2447-
}
2448-
}
2449-
#endif
2450-
24512370
void CConnman::SocketHandler(CMasternodeSync& mn_sync)
24522371
{
24532372
AssertLockNotHeld(m_total_bytes_sent_mutex);
@@ -2480,7 +2399,7 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync)
24802399
// select(2)). If none are ready, wait for a short while and return
24812400
// empty sets.
24822401
events_per_sock = GenerateWaitSockets(snap.Nodes());
2483-
if ((is_lt && events_per_sock.empty()) || !Sock::WaitManyInternal(timeout, events_per_sock, socketEventsMode)) {
2402+
if ((is_lt && events_per_sock.empty()) || !Sock::WaitManyInternal(timeout, events_per_sock, SocketEventsParams{socketEventsMode, GetModeFileDescriptor()})) {
24842403
if (is_lt) {
24852404
interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS));
24862405
}

src/net.h

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1623,19 +1623,6 @@ friend class CNode;
16231623
*/
16241624
Sock::EventsPerSock GenerateWaitSockets(Span<CNode* const> nodes);
16251625

1626-
#ifdef USE_KQUEUE
1627-
void SocketEventsKqueue(std::set<SOCKET>& recv_set,
1628-
std::set<SOCKET>& send_set,
1629-
std::set<SOCKET>& error_set,
1630-
bool only_poll);
1631-
#endif
1632-
#ifdef USE_EPOLL
1633-
void SocketEventsEpoll(std::set<SOCKET>& recv_set,
1634-
std::set<SOCKET>& send_set,
1635-
std::set<SOCKET>& error_set,
1636-
bool only_poll);
1637-
#endif
1638-
16391626
/**
16401627
* Check connected and listening sockets for IO readiness and process them accordingly.
16411628
*/
@@ -1867,6 +1854,14 @@ friend class CNode;
18671854
std::unique_ptr<EdgeTriggeredEvents> m_edge_trig_events{nullptr};
18681855
std::unique_ptr<WakeupPipe> m_wakeup_pipe{nullptr};
18691856

1857+
SOCKET GetModeFileDescriptor()
1858+
{
1859+
if (m_edge_trig_events) {
1860+
return static_cast<SOCKET>(m_edge_trig_events->GetFileDescriptor());
1861+
}
1862+
return INVALID_SOCKET;
1863+
}
1864+
18701865
template <typename Callable>
18711866
void ToggleWakeupPipe(Callable&& func)
18721867
{

src/netbase.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -572,7 +572,7 @@ static bool ConnectToSocket(const Sock& sock, struct sockaddr* sockaddr, socklen
572572
// synchronously to check for successful connection with a timeout.
573573
const Sock::Event requested = Sock::RECV | Sock::SEND;
574574
Sock::Event occurred;
575-
if (!sock.Wait(std::chrono::milliseconds{nConnectTimeout}, requested, SEM_LT_DEFAULT, &occurred)) {
575+
if (!sock.Wait(std::chrono::milliseconds{nConnectTimeout}, requested, SocketEventsParams(), &occurred)) {
576576
LogPrintf("wait for connect to %s failed: %s\n",
577577
dest_str,
578578
NetworkErrorString(WSAGetLastError()));

src/test/fuzz/util.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ bool FuzzedSock::IsSelectable(bool is_select) const
270270
return m_selectable;
271271
}
272272

273-
bool FuzzedSock::Wait(std::chrono::milliseconds timeout, Event requested, SocketEventsMode event_mode, Event* occurred) const
273+
bool FuzzedSock::Wait(std::chrono::milliseconds timeout, Event requested, SocketEventsParams event_params, Event* occurred) const
274274
{
275275
constexpr std::array wait_errnos{
276276
EBADF,
@@ -287,7 +287,7 @@ bool FuzzedSock::Wait(std::chrono::milliseconds timeout, Event requested, Socket
287287
return true;
288288
}
289289

290-
bool FuzzedSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsMode event_mode) const
290+
bool FuzzedSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsParams event_params) const
291291
{
292292
for (auto& [sock, events] : events_per_sock) {
293293
(void)sock;

src/test/fuzz/util.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,9 @@ class FuzzedSock : public Sock
8989

9090
bool IsSelectable(bool is_select) const override;
9191

92-
bool Wait(std::chrono::milliseconds timeout, Event requested, SocketEventsMode event_mode = SEM_LT_DEFAULT, Event* occurred = nullptr) const override;
92+
bool Wait(std::chrono::milliseconds timeout, Event requested, SocketEventsParams event_params = SocketEventsParams(), Event* occurred = nullptr) const override;
9393

94-
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsMode event_mode = SEM_LT_DEFAULT) const override;
94+
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsParams event_params = SocketEventsParams()) const override;
9595

9696
bool IsConnected(std::string& errmsg) const override;
9797
};

src/test/util/net.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ class StaticContentsSock : public Sock
202202

203203
bool Wait(std::chrono::milliseconds timeout,
204204
Event requested,
205-
SocketEventsMode event_mode = SEM_LT_DEFAULT,
205+
SocketEventsParams event_params = SocketEventsParams(),
206206
Event* occurred = nullptr) const override
207207
{
208208
if (occurred != nullptr) {
@@ -211,7 +211,9 @@ class StaticContentsSock : public Sock
211211
return true;
212212
}
213213

214-
bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsMode event_mode = SEM_LT_DEFAULT) const override
214+
bool WaitMany(std::chrono::milliseconds timeout,
215+
EventsPerSock& events_per_sock,
216+
SocketEventsParams event_params = SocketEventsParams()) const override
215217
{
216218
for (auto& [sock, events] : events_per_sock) {
217219
(void)sock;

src/util/sock.cpp

Lines changed: 100 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,14 @@
1515
#include <stdexcept>
1616
#include <string>
1717

18+
#ifdef USE_EPOLL
19+
#include <sys/epoll.h>
20+
#endif
21+
22+
#ifdef USE_KQUEUE
23+
#include <sys/event.h>
24+
#endif
25+
1826
#ifdef USE_POLL
1927
#include <poll.h>
2028
#endif
@@ -145,11 +153,16 @@ bool Sock::IsSelectable(bool is_select) const
145153
return IsSelectableSocket(m_socket, is_select);
146154
}
147155

148-
bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, SocketEventsMode event_mode, Event* occurred) const
156+
bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, SocketEventsParams event_params, Event* occurred) const
149157
{
150158
EventsPerSock events_per_sock{std::make_pair(m_socket, Events{requested})};
151159

152-
if (!WaitMany(timeout, events_per_sock)) {
160+
if (auto [sem, _] = event_params; sem != SocketEventsMode::Poll && sem != SocketEventsMode::Select) {
161+
// We need to ensure we are only using a level-triggered mode because we are expecting
162+
// a direct correlation between the events reported and the one socket we are querying
163+
event_params = SocketEventsParams();
164+
}
165+
if (!WaitMany(timeout, events_per_sock, event_params)) {
153166
return false;
154167
}
155168

@@ -160,16 +173,16 @@ bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, SocketEvents
160173
return true;
161174
}
162175

163-
bool Sock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsMode event_mode) const
176+
bool Sock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsParams event_params) const
164177
{
165-
return WaitManyInternal(timeout, events_per_sock, event_mode);
178+
return WaitManyInternal(timeout, events_per_sock, event_params);
166179
}
167180

168-
bool Sock::WaitManyInternal(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsMode event_mode)
181+
bool Sock::WaitManyInternal(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsParams event_params)
169182
{
170183
std::string debug_str;
171184

172-
switch (event_mode)
185+
switch (event_params.m_event_mode)
173186
{
174187
case SocketEventsMode::Poll:
175188
#ifdef USE_POLL
@@ -181,11 +194,21 @@ bool Sock::WaitManyInternal(std::chrono::milliseconds timeout, EventsPerSock& ev
181194
case SocketEventsMode::Select:
182195
return WaitManySelect(timeout, events_per_sock);
183196
case SocketEventsMode::EPoll:
184-
debug_str += "Sock::Wait -- Unimplemented for epoll, falling back on ";
197+
#ifdef USE_EPOLL
198+
assert(event_params.m_event_fd != INVALID_SOCKET);
199+
return WaitManyEPoll(timeout, events_per_sock, event_params.m_event_fd);
200+
#else
201+
debug_str += "Sock::Wait -- Support for epoll not compiled in, falling back on ";
185202
break;
203+
#endif /* USE_EPOLL */
186204
case SocketEventsMode::KQueue:
187-
debug_str += "Sock::Wait -- Unimplemented for kqueue, falling back on ";
205+
#ifdef USE_KQUEUE
206+
assert(event_params.m_event_fd != INVALID_SOCKET);
207+
return WaitManyKQueue(timeout, events_per_sock, event_params.m_event_fd);
208+
#else
209+
debug_str += "Sock::Wait -- Support for kqueue not compiled in, falling back on ";
188210
break;
211+
#endif /* USE_KQUEUE */
189212
default:
190213
assert(false);
191214
}
@@ -202,6 +225,75 @@ bool Sock::WaitManyInternal(std::chrono::milliseconds timeout, EventsPerSock& ev
202225
#endif /* USE_POLL */
203226
}
204227

228+
#ifdef USE_EPOLL
229+
bool Sock::WaitManyEPoll(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SOCKET epoll_fd)
230+
{
231+
std::array<epoll_event, MAX_EVENTS> events{};
232+
233+
int ret = epoll_wait(epoll_fd, events.data(), events.size(), count_milliseconds(timeout));
234+
if (ret == SOCKET_ERROR) {
235+
return false;
236+
}
237+
238+
// Events reported do not correspond to sockets requested in edge-triggered modes, we will clear the
239+
// entire map before populating it with our events data.
240+
events_per_sock.clear();
241+
242+
for (int idx = 0; idx < ret; idx++) {
243+
auto& ev = events[idx];
244+
Event occurred = 0;
245+
if (ev.events & (EPOLLERR | EPOLLHUP)) {
246+
occurred |= ERR;
247+
} else {
248+
if (ev.events & EPOLLIN) {
249+
occurred |= RECV;
250+
}
251+
if (ev.events & EPOLLOUT) {
252+
occurred |= SEND;
253+
}
254+
}
255+
events_per_sock.emplace(static_cast<SOCKET>(ev.data.fd), Sock::Events{/*req=*/RECV | SEND, occurred});
256+
}
257+
258+
return true;
259+
}
260+
#endif /* USE_EPOLL */
261+
262+
#ifdef USE_KQUEUE
263+
bool Sock::WaitManyKQueue(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SOCKET kqueue_fd)
264+
{
265+
std::array<struct kevent, MAX_EVENTS> events{};
266+
struct timespec ts = MillisToTimespec(timeout);
267+
268+
int ret = kevent(kqueue_fd, nullptr, 0, events.data(), events.size(), &ts);
269+
if (ret == SOCKET_ERROR) {
270+
return false;
271+
}
272+
273+
// Events reported do not correspond to sockets requested in edge-triggered modes, we will clear the
274+
// entire map before populating it with our events data.
275+
events_per_sock.clear();
276+
277+
for (int idx = 0; idx < ret; idx++) {
278+
auto& ev = events[idx];
279+
Event occurred = 0;
280+
if (ev.flags & (EV_ERROR | EV_EOF)) {
281+
occurred |= ERR;
282+
} else {
283+
if (ev.filter == EVFILT_READ) {
284+
occurred |= RECV;
285+
}
286+
if (ev.filter == EVFILT_WRITE) {
287+
occurred |= SEND;
288+
}
289+
}
290+
events_per_sock.emplace(static_cast<SOCKET>(ev.ident), Sock::Events{/*req=*/RECV | SEND, occurred});
291+
}
292+
293+
return true;
294+
}
295+
#endif /* USE_KQUEUE */
296+
205297
#ifdef USE_POLL
206298
bool Sock::WaitManyPoll(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock)
207299
{

0 commit comments

Comments
 (0)