Skip to content

Commit e4cc5ac

Browse files
committed
net: implement ToggleWakeupPipe in all WaitMany variants
1 parent f01a871 commit e4cc5ac

File tree

4 files changed

+64
-25
lines changed

4 files changed

+64
-25
lines changed

src/net.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2399,7 +2399,7 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync)
23992399
// select(2)). If none are ready, wait for a short while and return
24002400
// empty sets.
24012401
events_per_sock = GenerateWaitSockets(snap.Nodes());
2402-
if ((is_lt && events_per_sock.empty()) || !Sock::WaitManyInternal(timeout, events_per_sock, SocketEventsParams{socketEventsMode, GetModeFileDescriptor()})) {
2402+
if ((is_lt && events_per_sock.empty()) || !Sock::WaitManyInternal(timeout, events_per_sock, SocketEventsParams{socketEventsMode, GetModeFileDescriptor(), ToggleWakeupPipe})) {
24032403
if (is_lt) {
24042404
interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS));
24052405
}

src/net.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1862,15 +1862,14 @@ friend class CNode;
18621862
return INVALID_SOCKET;
18631863
}
18641864

1865-
template <typename Callable>
1866-
void ToggleWakeupPipe(Callable&& func)
1865+
SocketEventsParams::wrap_fn ToggleWakeupPipe = [&](std::function<void()>&& func)
18671866
{
18681867
if (m_wakeup_pipe) {
18691868
m_wakeup_pipe->Toggle(func);
18701869
} else {
18711870
func();
18721871
}
1873-
}
1872+
};
18741873

18751874
Mutex cs_sendable_receivable_nodes;
18761875
std::unordered_map<NodeId, CNode*> mapReceivableNodes GUARDED_BY(cs_sendable_receivable_nodes);

src/util/sock.cpp

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, SocketEvents
157157
{
158158
EventsPerSock events_per_sock{std::make_pair(m_socket, Events{requested})};
159159

160-
if (auto [sem, _] = event_params; sem != SocketEventsMode::Poll && sem != SocketEventsMode::Select) {
160+
if (auto [sem, _, __] = event_params; sem != SocketEventsMode::Poll && sem != SocketEventsMode::Select) {
161161
// We need to ensure we are only using a level-triggered mode because we are expecting
162162
// a direct correlation between the events reported and the one socket we are querying
163163
event_params = SocketEventsParams();
@@ -186,25 +186,25 @@ bool Sock::WaitManyInternal(std::chrono::milliseconds timeout, EventsPerSock& ev
186186
{
187187
case SocketEventsMode::Poll:
188188
#ifdef USE_POLL
189-
return WaitManyPoll(timeout, events_per_sock);
189+
return WaitManyPoll(timeout, events_per_sock, event_params.m_wrap_func);
190190
#else
191191
debug_str += "Sock::Wait -- Support for poll not compiled in, falling back on ";
192192
break;
193193
#endif /* USE_POLL */
194194
case SocketEventsMode::Select:
195-
return WaitManySelect(timeout, events_per_sock);
195+
return WaitManySelect(timeout, events_per_sock, event_params.m_wrap_func);
196196
case SocketEventsMode::EPoll:
197197
#ifdef USE_EPOLL
198198
assert(event_params.m_event_fd != INVALID_SOCKET);
199-
return WaitManyEPoll(timeout, events_per_sock, event_params.m_event_fd);
199+
return WaitManyEPoll(timeout, events_per_sock, event_params.m_event_fd, event_params.m_wrap_func);
200200
#else
201201
debug_str += "Sock::Wait -- Support for epoll not compiled in, falling back on ";
202202
break;
203203
#endif /* USE_EPOLL */
204204
case SocketEventsMode::KQueue:
205205
#ifdef USE_KQUEUE
206206
assert(event_params.m_event_fd != INVALID_SOCKET);
207-
return WaitManyKQueue(timeout, events_per_sock, event_params.m_event_fd);
207+
return WaitManyKQueue(timeout, events_per_sock, event_params.m_event_fd, event_params.m_wrap_func);
208208
#else
209209
debug_str += "Sock::Wait -- Support for kqueue not compiled in, falling back on ";
210210
break;
@@ -219,18 +219,24 @@ bool Sock::WaitManyInternal(std::chrono::milliseconds timeout, EventsPerSock& ev
219219
#endif /* USE_POLL*/
220220
LogPrint(BCLog::NET, "%s\n", debug_str);
221221
#ifdef USE_POLL
222-
return WaitManyPoll(timeout, events_per_sock);
222+
return WaitManyPoll(timeout, events_per_sock, event_params.m_wrap_func);
223223
#else
224-
return WaitManySelect(timeout, events_per_sock);
224+
return WaitManySelect(timeout, events_per_sock, event_params.m_wrap_func);
225225
#endif /* USE_POLL */
226226
}
227227

228228
#ifdef USE_EPOLL
229-
bool Sock::WaitManyEPoll(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SOCKET epoll_fd)
229+
bool Sock::WaitManyEPoll(std::chrono::milliseconds timeout,
230+
EventsPerSock& events_per_sock,
231+
SOCKET epoll_fd,
232+
SocketEventsParams::wrap_fn wrap_func)
230233
{
231234
std::array<epoll_event, MAX_EVENTS> events{};
232235

233-
int ret = epoll_wait(epoll_fd, events.data(), events.size(), count_milliseconds(timeout));
236+
int ret{SOCKET_ERROR};
237+
wrap_func([&](){
238+
ret = epoll_wait(epoll_fd, events.data(), events.size(), count_milliseconds(timeout));
239+
});
234240
if (ret == SOCKET_ERROR) {
235241
return false;
236242
}
@@ -260,12 +266,18 @@ bool Sock::WaitManyEPoll(std::chrono::milliseconds timeout, EventsPerSock& event
260266
#endif /* USE_EPOLL */
261267

262268
#ifdef USE_KQUEUE
263-
bool Sock::WaitManyKQueue(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SOCKET kqueue_fd)
269+
bool Sock::WaitManyKQueue(std::chrono::milliseconds timeout,
270+
EventsPerSock& events_per_sock,
271+
SOCKET kqueue_fd,
272+
SocketEventsParams::wrap_fn wrap_func)
264273
{
265274
std::array<struct kevent, MAX_EVENTS> events{};
266275
struct timespec ts = MillisToTimespec(timeout);
267276

268-
int ret = kevent(kqueue_fd, nullptr, 0, events.data(), events.size(), &ts);
277+
int ret{SOCKET_ERROR};
278+
wrap_func([&](){
279+
ret = kevent(kqueue_fd, nullptr, 0, events.data(), events.size(), &ts);
280+
});
269281
if (ret == SOCKET_ERROR) {
270282
return false;
271283
}
@@ -299,7 +311,9 @@ bool Sock::WaitManyKQueue(std::chrono::milliseconds timeout, EventsPerSock& even
299311
#endif /* USE_KQUEUE */
300312

301313
#ifdef USE_POLL
302-
bool Sock::WaitManyPoll(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock)
314+
bool Sock::WaitManyPoll(std::chrono::milliseconds timeout,
315+
EventsPerSock& events_per_sock,
316+
SocketEventsParams::wrap_fn wrap_func)
303317
{
304318
if (events_per_sock.empty()) return true;
305319

@@ -316,7 +330,11 @@ bool Sock::WaitManyPoll(std::chrono::milliseconds timeout, EventsPerSock& events
316330
}
317331
}
318332

319-
if (poll(pfds.data(), pfds.size(), count_milliseconds(timeout)) == SOCKET_ERROR) {
333+
int ret{SOCKET_ERROR};
334+
wrap_func([&](){
335+
ret = poll(pfds.data(), pfds.size(), count_milliseconds(timeout));
336+
});
337+
if (ret == SOCKET_ERROR) {
320338
return false;
321339
}
322340

@@ -341,7 +359,9 @@ bool Sock::WaitManyPoll(std::chrono::milliseconds timeout, EventsPerSock& events
341359
}
342360
#endif /* USE_POLL */
343361

344-
bool Sock::WaitManySelect(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock)
362+
bool Sock::WaitManySelect(std::chrono::milliseconds timeout,
363+
EventsPerSock& events_per_sock,
364+
SocketEventsParams::wrap_fn wrap_func)
345365
{
346366
if (events_per_sock.empty()) return true;
347367

@@ -370,7 +390,11 @@ bool Sock::WaitManySelect(std::chrono::milliseconds timeout, EventsPerSock& even
370390

371391
timeval tv = MillisToTimeval(timeout);
372392

373-
if (select(socket_max + 1, &recv, &send, &err, &tv) == SOCKET_ERROR) {
393+
int ret{SOCKET_ERROR};
394+
wrap_func([&](){
395+
ret = select(socket_max + 1, &recv, &send, &err, &tv);
396+
});
397+
if (ret == SOCKET_ERROR) {
374398
return false;
375399
}
376400

src/util/sock.h

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <util/time.h>
1111

1212
#include <chrono>
13+
#include <functional>
1314
#include <memory>
1415
#include <string>
1516
#include <unordered_map>
@@ -33,10 +34,13 @@ enum class SocketEventsMode : int8_t {
3334

3435
struct SocketEventsParams
3536
{
37+
using wrap_fn = std::function<void(std::function<void()>&&)>;
38+
3639
SocketEventsParams() = default;
37-
SocketEventsParams(SocketEventsMode event_mode, SOCKET event_fd) :
40+
SocketEventsParams(SocketEventsMode event_mode, SOCKET event_fd, wrap_fn wrap_func) :
3841
m_event_mode{event_mode},
39-
m_event_fd{event_fd}
42+
m_event_fd{event_fd},
43+
m_wrap_func{wrap_func}
4044
{}
4145
~SocketEventsParams() = default;
4246

@@ -51,6 +55,8 @@ struct SocketEventsParams
5155
};
5256
/* File descriptor for event triggered SEMs (and INVALID_SOCKET for the rest) */
5357
SOCKET m_event_fd{INVALID_SOCKET};
58+
/* Function that wraps itself around WakeMany()'s API call */
59+
wrap_fn m_wrap_func{[](std::function<void()>&& func){func();}};
5460
};
5561

5662
/* Converts SocketEventsMode value to string with additional check to report modes not compiled for as unknown */
@@ -301,15 +307,25 @@ class Sock
301307
EventsPerSock& events_per_sock,
302308
SocketEventsParams event_params = SocketEventsParams());
303309
#ifdef USE_EPOLL
304-
static bool WaitManyEPoll(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SOCKET epoll_fd);
310+
static bool WaitManyEPoll(std::chrono::milliseconds timeout,
311+
EventsPerSock& events_per_sock,
312+
SOCKET epoll_fd,
313+
SocketEventsParams::wrap_fn wrap_func);
305314
#endif /* USE_EPOLL */
306315
#ifdef USE_KQUEUE
307-
static bool WaitManyKQueue(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SOCKET kqueue_fd);
316+
static bool WaitManyKQueue(std::chrono::milliseconds timeout,
317+
EventsPerSock& events_per_sock,
318+
SOCKET kqueue_fd,
319+
SocketEventsParams::wrap_fn wrap_func);
308320
#endif /* USE_KQUEUE */
309321
#ifdef USE_POLL
310-
static bool WaitManyPoll(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock);
322+
static bool WaitManyPoll(std::chrono::milliseconds timeout,
323+
EventsPerSock& events_per_sock,
324+
SocketEventsParams::wrap_fn wrap_func);
311325
#endif /* USE_POLL */
312-
static bool WaitManySelect(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock);
326+
static bool WaitManySelect(std::chrono::milliseconds timeout,
327+
EventsPerSock& events_per_sock,
328+
SocketEventsParams::wrap_fn wrap_func);
313329

314330
/* Higher level, convenience, methods. These may throw. */
315331

0 commit comments

Comments
 (0)