7
7
#include < cassert>
8
8
#include < functional>
9
9
#include < iostream>
10
+ #include < map>
10
11
#include < mutex>
11
12
#include < optional>
12
13
#include < thread>
@@ -43,7 +44,8 @@ namespace netlib {
43
44
callback_error_t _cb_on_error{};
44
45
std::thread _accept_thread;
45
46
std::thread _processor_thread;
46
- netlib::thread_pool _thread_pool;
47
+ netlib::thread_pool _thread_pool = netlib::thread_pool::create<1 ,1 >();
48
+ std::map<socket_t , std::atomic<bool >> _busy_map;
47
49
48
50
inline void processing_func () {
49
51
while (_server_active) {
@@ -54,9 +56,14 @@ namespace netlib {
54
56
{
55
57
std::lock_guard<std::mutex> lock (_mutex);
56
58
local_clients = _clients;
57
- for (auto & client : local_clients) {
59
+ for (auto & client : local_clients) {
58
60
socket_t fd = client.socket .get_raw ().value ();
59
- if (highest_fd < fd){
61
+ assert (_busy_map.contains (fd));
62
+ if (_busy_map[fd]) {
63
+ // this fd is currently being handled by a task in threadpool
64
+ continue ;
65
+ }
66
+ if (highest_fd < fd) {
60
67
highest_fd = fd;
61
68
}
62
69
FD_SET (fd, &fdset);
@@ -69,20 +76,31 @@ namespace netlib {
69
76
int32_t select_res = ::select (highest_fd + 1 , &fdset, nullptr , nullptr , &tv);
70
77
if (select_res > 0 ) {
71
78
std::vector<client_endpoint> client_refs (select_res);
72
- uint32_t index = 0 ;
79
+ int32_t index = 0 ;
73
80
{
81
+ std::lock_guard<std::mutex> lock (_mutex);
74
82
for (auto & client : local_clients) {
75
83
socket_t fd = client.socket .get_raw ().value ();
76
84
if (FD_ISSET (fd, &fdset)){
77
85
client_refs[index++] = client;
86
+ assert (_busy_map.contains (fd));
87
+ _busy_map[fd] = true ;
78
88
}
79
89
}
80
90
}
81
91
assert (index == select_res);
82
92
// add callback tasks to threadpool for processing
83
93
for (auto & client_to_recv : client_refs) {
84
94
_thread_pool.add_task ([&](client_endpoint ce){
85
- this ->handle_client (ce);
95
+ socket_t id = ce.socket .get_raw ().value ();
96
+ std::error_condition error = this ->handle_client (ce);
97
+ if ((error) && (_cb_on_error)) {
98
+ _cb_on_error (ce, error);
99
+ }
100
+ std::lock_guard<std::mutex> lock (_mutex);
101
+ if (_busy_map.contains (id)) {
102
+ _busy_map[id] = false ;
103
+ }
86
104
}, client_to_recv);
87
105
}
88
106
@@ -100,7 +118,7 @@ namespace netlib {
100
118
while (_server_active) {
101
119
std::this_thread::sleep_for (std::chrono::milliseconds (10 ));
102
120
new_endpoint.addr_len = sizeof (addrinfo);
103
- int32_t status = ::accept (_listener_sock->get_raw ().value (), &new_endpoint.addr , &new_endpoint.addr_len );
121
+ socket_t status = ::accept (_listener_sock->get_raw ().value (), &new_endpoint.addr , &new_endpoint.addr_len );
104
122
if (status > 0 ) {
105
123
new_endpoint.socket .set_raw (status);
106
124
new_endpoint.socket .set_nonblocking (true );
@@ -120,86 +138,82 @@ namespace netlib {
120
138
if (_cb_on_error) {
121
139
_cb_on_error (new_endpoint, std::errc::connection_aborted);
122
140
}
123
- new_endpoint. socket . close ( );
141
+ remove_client (new_endpoint );
124
142
std::cout << " server kicked client accept" << std::endl;
125
143
continue ;
126
144
}
127
145
}
128
146
if (new_endpoint.socket .is_valid ()) {
129
147
std::lock_guard<std::mutex> lock (_mutex);
148
+ std::cout << " server added new client, id " << status << std::endl;
130
149
_clients.push_back (new_endpoint);
150
+ _busy_map[status] = false ;
131
151
}
132
152
}
133
153
}
134
154
}
135
155
136
156
inline std::error_condition handle_client (client_endpoint endpoint) {
137
157
std::vector<uint8_t > total_buffer;
138
- std::array<uint8_t , 2048 > buffer{};
158
+ std::array<uint8_t , 1024 > buffer{};
139
159
ssize_t recv_res = 0 ;
140
- ssize_t recv_res_cycle = 0 ;
141
- while ((recv_res_cycle = ::recv (endpoint.socket .get_raw ().value (), buffer.data (), buffer.size (), 0 )) > 0 ) {
142
- total_buffer.insert (total_buffer.end (), buffer.begin (), buffer.begin () + recv_res_cycle);
143
- recv_res += recv_res_cycle;
144
- }
145
- if (recv_res == 0 ){
146
- std::cout << " server recv_res == 0" << std::endl;
147
- if (_cb_on_error) {
148
- _cb_on_error (endpoint, std::errc::connection_aborted);
149
- }
150
- remove_client (endpoint.socket .get_raw ().value ());
151
- endpoint.socket .close ();
152
- std::cout << " server kicked client recv 0" << std::endl;
153
- return std::errc::connection_aborted;
154
- } else if (recv_res < 0 ) {
155
- // error
156
- std::cout << " server recv_res == -1" << std::endl;
157
- std::error_condition recv_error = socket_get_last_error ();
158
- // we do not want to spam callback with wouldblock messages
159
- // for portability we shall check both EAGAIN and EWOULDBLOCK
160
- if ((recv_error != std::errc::resource_unavailable_try_again) &&
161
- (recv_error != std::errc::operation_would_block)) {
162
- if (_cb_on_error) {
163
- _cb_on_error (endpoint, recv_error);
164
- }
165
- }
166
- return recv_error;
167
- } else {
168
- std::cout << " server recv_res > 0: " << recv_res << " " << total_buffer.size () << std::endl;
169
- // we got data
170
- if (_cb_on_recv) {
171
- netlib::server_response response = _cb_on_recv (endpoint, total_buffer);
172
- if (!response.answer .empty ()) {
173
- ssize_t send_result = ::send (endpoint.socket .get_raw ().value (),
174
- response.answer .data (),
175
- response.answer .size (),
176
- 0 );
177
- if ((send_result != response.answer .size ()) && (_cb_on_error)) {
178
- _cb_on_error (endpoint, socket_get_last_error ());
179
- }
180
- }
181
- if (response.terminate ) {
182
- if (_cb_on_error) {
183
- _cb_on_error (endpoint, std::errc::connection_aborted);
160
+ while (true ) {
161
+ recv_res = ::recv (endpoint.socket .get_raw ().value (),
162
+ buffer.data (), buffer.size (), 0 );
163
+ if (recv_res > 0 ) {
164
+ total_buffer.insert (total_buffer.end (), buffer.begin (),
165
+ buffer.begin () + recv_res);
166
+ } else if (recv_res == 0 ) {
167
+ std::cout << " server recv_res == 0" << std::endl;
168
+ remove_client (endpoint);
169
+ std::cout << " server kicked client recv 0" << std::endl;
170
+ return std::errc::connection_aborted;
171
+ } else if (recv_res < 0 ) {
172
+ // error
173
+ std::error_condition recv_error = socket_get_last_error ();
174
+ if ((recv_error == std::errc::resource_unavailable_try_again) ||
175
+ (recv_error == std::errc::operation_would_block)) {
176
+ // no more data, return what we got
177
+ if (_cb_on_recv) {
178
+ netlib::server_response response =
179
+ _cb_on_recv (endpoint, total_buffer);
180
+ if (!response.answer .empty ()) {
181
+ ssize_t send_result = ::send (
182
+ endpoint.socket .get_raw ().value (),
183
+ response.answer .data (), response.answer .size (), 0 );
184
+ if (send_result != response.answer .size ()) {
185
+ return socket_get_last_error ();
186
+ }
187
+ }
188
+ if (response.terminate ) {
189
+ remove_client (endpoint);
190
+ std::cout << " server kicked client because wanted"
191
+ << std::endl;
192
+ return std::errc::connection_aborted;
193
+ }
184
194
}
185
- remove_client (endpoint.socket .get_raw ().value ());
186
- endpoint.socket .close ();
187
- std::cout << " server kicked client because wanted" << std::endl;
195
+ return {};
196
+ } else {
197
+ std::cout << " server recv_res == -1" << std::endl;
198
+ return recv_error;
188
199
}
189
-
190
200
}
191
- return {};
192
201
}
193
-
194
202
}
195
203
196
- bool remove_client (socket_t socket_id ) {
204
+ bool remove_client (client_endpoint& ce ) {
197
205
// the remove_if-> erase idiom is perhaps my most hated part about std containers
198
206
std::lock_guard<std::mutex> lock (_mutex);
199
- return _clients.erase (
200
- std::remove_if (_clients.begin (), _clients.end (), [&](const client_endpoint& ce) {
201
- return ce.socket .get_raw () == socket_id;
202
- }),_clients.end ()) != _clients.end ();
207
+ std::size_t client_count = _clients.size ();
208
+ _busy_map.erase (ce.socket .get_raw ().value ());
209
+ std::cout << " remove_client, client count at start " << _clients.size () << std::endl;
210
+ std::erase_if (_clients, [&](const client_endpoint& single_endpoint){
211
+ return (ce.socket .get_raw ().value () == single_endpoint.socket .get_raw ().value ());
212
+ });
213
+ std::cout << " removed client, id " << ce.socket .get_raw ().value () << std::endl;
214
+ ce.socket .close ();
215
+ assert ((client_count - _clients.size ()) == 1 );
216
+ return true ;
203
217
}
204
218
205
219
public:
@@ -233,9 +247,7 @@ namespace netlib {
233
247
close_and_free ();
234
248
continue ;
235
249
}
236
- _listener_sock->set_nonblocking (true );
237
- // _listener_sock->set_reuseaddr(true);
238
-
250
+ _listener_sock->set_nonblocking (true ); // we want to be able to join
239
251
int32_t res = ::bind (_listener_sock->get_raw ().value (), res_addrinfo->ai_addr , res_addrinfo->ai_addrlen );
240
252
if (res < 0 ) {
241
253
close_and_free ();
0 commit comments