Skip to content

Commit 945d60c

Browse files
authored
Merge pull request zeromq#606 from stephanschim/ssc/active-poller-fd
Add file descriptor support for poller
2 parents f9f6b79 + 2533a7e commit 945d60c

File tree

3 files changed

+200
-5
lines changed

3 files changed

+200
-5
lines changed

tests/active_poller.cpp

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@
66

77
#include <array>
88
#include <memory>
9+
#include <cstring>
10+
11+
#if !defined(_WIN32)
12+
#include <unistd.h>
13+
#endif // !_WIN32
914

1015
TEST_CASE("create destroy", "[active_poller]")
1116
{
@@ -86,6 +91,85 @@ TEST_CASE("add handler", "[active_poller]")
8691
active_poller.add(socket, zmq::event_flags::pollin, no_op_handler));
8792
}
8893

94+
TEST_CASE("add fd handler", "[active_poller]")
95+
{
96+
int fd = 1;
97+
zmq::active_poller_t active_poller;
98+
CHECK_NOTHROW(
99+
active_poller.add(fd, zmq::event_flags::pollin, no_op_handler));
100+
}
101+
102+
TEST_CASE("remove fd handler", "[active_poller]")
103+
{
104+
int fd = 1;
105+
zmq::active_poller_t active_poller;
106+
CHECK_NOTHROW(
107+
active_poller.add(fd, zmq::event_flags::pollin, no_op_handler));
108+
CHECK_NOTHROW(
109+
active_poller.remove(fd));
110+
CHECK_THROWS_ZMQ_ERROR(EINVAL, active_poller.remove(100));
111+
}
112+
113+
#if !defined(_WIN32)
114+
// On Windows, these functions can only be used with WinSock sockets.
115+
116+
TEST_CASE("mixed socket and fd handlers", "[active_poller]")
117+
{
118+
int pipefd[2];
119+
::pipe(pipefd);
120+
121+
zmq::context_t context;
122+
constexpr char inprocSocketAddress[] = "inproc://mixed-handlers";
123+
zmq::socket_t socket_rcv{context, zmq::socket_type::pair};
124+
zmq::socket_t socket_snd{context, zmq::socket_type::pair};
125+
socket_rcv.bind(inprocSocketAddress);
126+
socket_snd.connect(inprocSocketAddress);
127+
128+
unsigned eventsFd = 0;
129+
unsigned eventsSocket = 0;
130+
131+
constexpr char messageText[] = "message";
132+
constexpr size_t messageSize = sizeof(messageText);
133+
134+
zmq::active_poller_t active_poller;
135+
CHECK_NOTHROW(
136+
active_poller.add(pipefd[0], zmq::event_flags::pollin, [&](zmq::event_flags flags) {
137+
if (flags == zmq::event_flags::pollin)
138+
{
139+
char buffer[256];
140+
CHECK(messageSize == ::read(pipefd[0], buffer, messageSize));
141+
CHECK(0 == std::strcmp(buffer, messageText));
142+
++eventsFd;
143+
}
144+
}));
145+
CHECK_NOTHROW(
146+
active_poller.add(socket_rcv, zmq::event_flags::pollin, [&](zmq::event_flags flags) {
147+
if (flags == zmq::event_flags::pollin)
148+
{
149+
zmq::message_t msg;
150+
CHECK(socket_rcv.recv(msg, zmq::recv_flags::dontwait).has_value());
151+
CHECK(messageSize == msg.size());
152+
CHECK(0 == std::strcmp(messageText, msg.data<const char>()));
153+
++eventsSocket;
154+
}
155+
}));
156+
157+
// send/rcv socket pair
158+
zmq::message_t msg{messageText, messageSize};
159+
socket_snd.send(msg, zmq::send_flags::dontwait);
160+
CHECK(1 == active_poller.wait(std::chrono::milliseconds{100}));
161+
CHECK(0 == eventsFd);
162+
CHECK(1 == eventsSocket);
163+
164+
// send/rcv pipe
165+
::write(pipefd[1], messageText, messageSize);
166+
CHECK(1 == active_poller.wait(std::chrono::milliseconds{100}));
167+
CHECK(1 == eventsFd);
168+
CHECK(1 == eventsSocket);
169+
}
170+
171+
#endif // !_WIN32
172+
89173
TEST_CASE("add null handler fails", "[active_poller]")
90174
{
91175
zmq::context_t context;

zmq.hpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2681,6 +2681,13 @@ template<typename T = no_user_data> class poller_t
26812681
}
26822682
}
26832683

2684+
void remove(fd_t fd)
2685+
{
2686+
if (0 != zmq_poller_remove_fd(poller_ptr.get(), fd)) {
2687+
throw error_t();
2688+
}
2689+
}
2690+
26842691
void modify(zmq::socket_ref socket, event_flags events)
26852692
{
26862693
if (0
@@ -2690,6 +2697,15 @@ template<typename T = no_user_data> class poller_t
26902697
}
26912698
}
26922699

2700+
void modify(fd_t fd, event_flags events)
2701+
{
2702+
if (0
2703+
!= zmq_poller_modify_fd(poller_ptr.get(), fd,
2704+
static_cast<short>(events))) {
2705+
throw error_t();
2706+
}
2707+
}
2708+
26932709
size_t wait_all(std::vector<event_type> &poller_events,
26942710
const std::chrono::milliseconds timeout)
26952711
{

zmq_addon.hpp

Lines changed: 100 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,65 @@
3434
#include <limits>
3535
#include <functional>
3636
#include <unordered_map>
37-
#endif
37+
38+
namespace zmq
39+
{
40+
// socket ref or native file descriptor for poller
41+
class poller_ref_t
42+
{
43+
public:
44+
enum RefType
45+
{
46+
RT_SOCKET,
47+
RT_FD
48+
};
49+
50+
poller_ref_t() : poller_ref_t(socket_ref{})
51+
{}
52+
53+
poller_ref_t(const zmq::socket_ref& socket) : data{RT_SOCKET, socket, {}}
54+
{}
55+
56+
poller_ref_t(zmq::fd_t fd) : data{RT_FD, {}, fd}
57+
{}
58+
59+
size_t hash() const ZMQ_NOTHROW
60+
{
61+
std::size_t h = 0;
62+
hash_combine(h, std::get<0>(data));
63+
hash_combine(h, std::get<1>(data));
64+
hash_combine(h, std::get<2>(data));
65+
return h;
66+
}
67+
68+
bool operator == (const poller_ref_t& o) const ZMQ_NOTHROW
69+
{
70+
return data == o.data;
71+
}
72+
73+
private:
74+
template <class T>
75+
static void hash_combine(std::size_t& seed, const T& v) ZMQ_NOTHROW
76+
{
77+
std::hash<T> hasher;
78+
seed ^= hasher(v) + 0x9e3779b9 + (seed<<6) + (seed>>2);
79+
}
80+
81+
std::tuple<int, zmq::socket_ref, zmq::fd_t> data;
82+
83+
}; // class poller_ref_t
84+
85+
} // namespace zmq
86+
87+
// std::hash<> specialization for std::unordered_map
88+
template <> struct std::hash<zmq::poller_ref_t>
89+
{
90+
size_t operator()(const zmq::poller_ref_t& ref) const ZMQ_NOTHROW
91+
{
92+
return ref.hash();
93+
}
94+
};
95+
#endif // ZMQ_CPP11
3896

3997
namespace zmq
4098
{
@@ -683,10 +741,12 @@ class active_poller_t
683741

684742
void add(zmq::socket_ref socket, event_flags events, handler_type handler)
685743
{
744+
const poller_ref_t ref{socket};
745+
686746
if (!handler)
687-
throw std::invalid_argument("null handler in active_poller_t::add");
747+
throw std::invalid_argument("null handler in active_poller_t::add (socket)");
688748
auto ret = handlers.emplace(
689-
socket, std::make_shared<handler_type>(std::move(handler)));
749+
ref, std::make_shared<handler_type>(std::move(handler)));
690750
if (!ret.second)
691751
throw error_t(EINVAL); // already added
692752
try {
@@ -695,7 +755,28 @@ class active_poller_t
695755
}
696756
catch (...) {
697757
// rollback
698-
handlers.erase(socket);
758+
handlers.erase(ref);
759+
throw;
760+
}
761+
}
762+
763+
void add(fd_t fd, event_flags events, handler_type handler)
764+
{
765+
const poller_ref_t ref{fd};
766+
767+
if (!handler)
768+
throw std::invalid_argument("null handler in active_poller_t::add (fd)");
769+
auto ret = handlers.emplace(
770+
ref, std::make_shared<handler_type>(std::move(handler)));
771+
if (!ret.second)
772+
throw error_t(EINVAL); // already added
773+
try {
774+
base_poller.add(fd, events, ret.first->second.get());
775+
need_rebuild = true;
776+
}
777+
catch (...) {
778+
// rollback
779+
handlers.erase(ref);
699780
throw;
700781
}
701782
}
@@ -707,11 +788,23 @@ class active_poller_t
707788
need_rebuild = true;
708789
}
709790

791+
void remove(fd_t fd)
792+
{
793+
base_poller.remove(fd);
794+
handlers.erase(fd);
795+
need_rebuild = true;
796+
}
797+
710798
void modify(zmq::socket_ref socket, event_flags events)
711799
{
712800
base_poller.modify(socket, events);
713801
}
714802

803+
void modify(fd_t fd, event_flags events)
804+
{
805+
base_poller.modify(fd, events);
806+
}
807+
715808
size_t wait(std::chrono::milliseconds timeout)
716809
{
717810
if (need_rebuild) {
@@ -741,7 +834,9 @@ class active_poller_t
741834
bool need_rebuild{false};
742835

743836
poller_t<handler_type> base_poller{};
744-
std::unordered_map<socket_ref, std::shared_ptr<handler_type>> handlers{};
837+
838+
std::unordered_map<zmq::poller_ref_t, std::shared_ptr<handler_type>> handlers{};
839+
745840
std::vector<decltype(base_poller)::event_type> poller_events{};
746841
std::vector<std::shared_ptr<handler_type>> poller_handlers{};
747842
}; // class active_poller_t

0 commit comments

Comments
 (0)