Skip to content

Commit 50e259f

Browse files
authored
Merge pull request #7836 from thess/rfc-web-server
RFC: Web Workflow reliability and performance improvements
2 parents 57c8485 + 9825b7f commit 50e259f

File tree

7 files changed

+198
-181
lines changed

7 files changed

+198
-181
lines changed

ports/espressif/common-hal/socketpool/Socket.c

Lines changed: 126 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -45,70 +45,79 @@
4545

4646
StackType_t socket_select_stack[2 * configMINIMAL_STACK_SIZE];
4747

48-
STATIC int open_socket_fds[CONFIG_LWIP_MAX_SOCKETS];
48+
/* Socket state table:
49+
* 0 := Closed (unused)
50+
* 1 := Open
51+
* 2 := Closing (remove from rfds)
52+
* Index into socket_fd_state is calculated from actual lwip fd. idx := fd - LWIP_SOCKET_OFFSET
53+
*/
54+
#define FDSTATE_CLOSED 0
55+
#define FDSTATE_OPEN 1
56+
#define FDSTATE_CLOSING 2
57+
STATIC uint8_t socket_fd_state[CONFIG_LWIP_MAX_SOCKETS];
58+
4959
STATIC socketpool_socket_obj_t *user_socket[CONFIG_LWIP_MAX_SOCKETS];
50-
StaticTask_t socket_select_task_handle;
60+
StaticTask_t socket_select_task_buffer;
61+
TaskHandle_t socket_select_task_handle;
5162
STATIC int socket_change_fd = -1;
5263

5364
STATIC void socket_select_task(void *arg) {
5465
uint64_t signal;
66+
fd_set readfds;
67+
fd_set excptfds;
5568

5669
while (true) {
57-
fd_set readfds;
58-
fd_set errfds;
5970
FD_ZERO(&readfds);
60-
FD_ZERO(&errfds);
71+
FD_ZERO(&excptfds);
6172
FD_SET(socket_change_fd, &readfds);
62-
FD_SET(socket_change_fd, &errfds);
6373
int max_fd = socket_change_fd;
64-
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
65-
int sockfd = open_socket_fds[i];
66-
if (sockfd < 0) {
67-
continue;
74+
for (size_t i = 0; i < MP_ARRAY_SIZE(socket_fd_state); i++) {
75+
if ((socket_fd_state[i] == FDSTATE_OPEN) && (user_socket[i] == NULL)) {
76+
int sockfd = i + LWIP_SOCKET_OFFSET;
77+
max_fd = MAX(max_fd, sockfd);
78+
FD_SET(sockfd, &readfds);
79+
FD_SET(sockfd, &excptfds);
6880
}
69-
max_fd = MAX(max_fd, sockfd);
70-
FD_SET(sockfd, &readfds);
71-
FD_SET(sockfd, &errfds);
7281
}
7382

74-
int num_triggered = select(max_fd + 1, &readfds, NULL, &errfds, NULL);
75-
// Check for bad file descriptor and queue up the background task before
76-
// circling around.
77-
if (num_triggered == -1 && errno == EBADF) {
78-
// One for the change fd and one for the closed socket.
79-
num_triggered = 2;
80-
}
81-
// Try and find the bad file and remove it from monitoring.
82-
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
83-
int sockfd = open_socket_fds[i];
84-
if (sockfd < 0) {
85-
continue;
86-
}
87-
int err;
88-
int optlen = sizeof(int);
89-
int ret = getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &err, (socklen_t *)&optlen);
90-
if (ret < 0) {
91-
open_socket_fds[i] = -1;
92-
// Raise num_triggered so that we skip the assert and queue the background task.
93-
num_triggered = 2;
94-
}
83+
int num_triggered = select(max_fd + 1, &readfds, NULL, &excptfds, NULL);
84+
// Hard error (or someone closed a socket on another thread)
85+
if (num_triggered == -1) {
86+
assert(errno == EBADF);
87+
continue;
9588
}
96-
assert(num_triggered >= 0);
9789

90+
assert(num_triggered > 0);
91+
assert(!FD_ISSET(socket_change_fd, &excptfds));
92+
93+
// Notice event trigger
9894
if (FD_ISSET(socket_change_fd, &readfds)) {
9995
read(socket_change_fd, &signal, sizeof(signal));
100-
num_triggered -= 1;
96+
num_triggered--;
10197
}
102-
if (num_triggered > 0) {
103-
supervisor_workflow_request_background();
10498

105-
// Wake up CircuitPython. We know it is asleep because we are lower
106-
// priority.
107-
port_wake_main_task();
99+
// Handle active FDs, close the dead ones
100+
for (size_t i = 0; i < MP_ARRAY_SIZE(socket_fd_state); i++) {
101+
int sockfd = i + LWIP_SOCKET_OFFSET;
102+
if (socket_fd_state[i] != FDSTATE_CLOSED) {
103+
if (FD_ISSET(sockfd, &readfds) || FD_ISSET(sockfd, &excptfds)) {
104+
if (socket_fd_state[i] == FDSTATE_CLOSING) {
105+
socket_fd_state[i] = FDSTATE_CLOSED;
106+
num_triggered--;
107+
}
108+
}
109+
}
108110
}
109111

112+
if (num_triggered > 0) {
113+
// Wake up CircuitPython by queuing request
114+
supervisor_workflow_request_background();
115+
ulTaskNotifyTake(pdTRUE, portMAX_DELAY);
116+
}
110117
}
118+
111119
close(socket_change_fd);
120+
socket_change_fd = -1;
112121
vTaskDelete(NULL);
113122
}
114123

@@ -117,85 +126,74 @@ void socket_user_reset(void) {
117126
esp_vfs_eventfd_config_t config = ESP_VFS_EVENTD_CONFIG_DEFAULT();
118127
ESP_ERROR_CHECK(esp_vfs_eventfd_register(&config));
119128

120-
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
121-
open_socket_fds[i] = -1;
129+
// Clear initial socket states
130+
for (size_t i = 0; i < MP_ARRAY_SIZE(socket_fd_state); i++) {
131+
socket_fd_state[i] = FDSTATE_CLOSED;
122132
user_socket[i] = NULL;
123133
}
124134
socket_change_fd = eventfd(0, 0);
125135
// Run this at the same priority as CP so that the web workflow background task can be
126136
// queued while CP is running. Both tasks can still sleep and, therefore, sleep overall.
127-
(void)xTaskCreateStaticPinnedToCore(socket_select_task,
137+
socket_select_task_handle = xTaskCreateStaticPinnedToCore(socket_select_task,
128138
"socket_select",
129139
2 * configMINIMAL_STACK_SIZE,
130140
NULL,
131141
uxTaskPriorityGet(NULL),
132142
socket_select_stack,
133-
&socket_select_task_handle,
143+
&socket_select_task_buffer,
134144
xPortGetCoreID());
145+
} else {
146+
// Not init - close open user sockets
147+
for (size_t i = 0; i < MP_ARRAY_SIZE(socket_fd_state); i++) {
148+
if ((socket_fd_state[i] == FDSTATE_OPEN) && user_socket[i]) {
149+
common_hal_socketpool_socket_close(user_socket[i]);
150+
}
151+
}
135152
}
153+
}
136154

137-
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
138-
if (open_socket_fds[i] >= 0 && user_socket[i]) {
139-
common_hal_socketpool_socket_close(user_socket[i]);
140-
int num = open_socket_fds[i];
141-
// Close automatically clears socket handle
142-
lwip_shutdown(num, SHUT_RDWR);
143-
lwip_close(num);
144-
open_socket_fds[i] = -1;
145-
user_socket[i] = NULL;
146-
}
155+
// Unblock select task (ok if not blocked yet)
156+
void socketpool_socket_poll_resume(void) {
157+
if (socket_select_task_handle) {
158+
xTaskNotifyGive(socket_select_task_handle);
147159
}
148160
}
149161

150162
// The writes below send an event to the socket select task so that it redoes the
151163
// select with the new open socket set.
152164

153165
STATIC bool register_open_socket(int fd) {
154-
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
155-
if (open_socket_fds[i] == -1) {
156-
open_socket_fds[i] = fd;
157-
user_socket[i] = false;
158-
uint64_t signal = 1;
159-
write(socket_change_fd, &signal, sizeof(signal));
160-
return true;
161-
}
162-
}
163-
return false;
164-
}
166+
if (fd < FD_SETSIZE) {
167+
socket_fd_state[fd - LWIP_SOCKET_OFFSET] = FDSTATE_OPEN;
168+
user_socket[fd - LWIP_SOCKET_OFFSET] = NULL;
165169

166-
STATIC void unregister_open_socket(int fd) {
167-
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
168-
if (open_socket_fds[i] == fd) {
169-
open_socket_fds[i] = -1;
170-
user_socket[i] = false;
171-
// Write must be 8 bytes for an eventfd.
172-
uint64_t signal = 1;
173-
write(socket_change_fd, &signal, sizeof(signal));
174-
return;
175-
}
170+
uint64_t signal = 1;
171+
write(socket_change_fd, &signal, sizeof(signal));
172+
socketpool_socket_poll_resume();
173+
return true;
176174
}
175+
return false;
177176
}
178177

179178
STATIC void mark_user_socket(int fd, socketpool_socket_obj_t *obj) {
180-
for (size_t i = 0; i < MP_ARRAY_SIZE(open_socket_fds); i++) {
181-
if (open_socket_fds[i] == fd) {
182-
user_socket[i] = obj;
183-
return;
184-
}
185-
}
179+
socket_fd_state[fd - LWIP_SOCKET_OFFSET] = FDSTATE_OPEN;
180+
user_socket[fd - LWIP_SOCKET_OFFSET] = obj;
181+
// No need to wakeup select task
186182
}
187183

188-
bool socketpool_socket(socketpool_socketpool_obj_t *self,
184+
STATIC bool _socketpool_socket(socketpool_socketpool_obj_t *self,
189185
socketpool_socketpool_addressfamily_t family, socketpool_socketpool_sock_t type,
190186
socketpool_socket_obj_t *sock) {
191187
int addr_family;
192188
int ipproto;
193189
if (family == SOCKETPOOL_AF_INET) {
194190
addr_family = AF_INET;
195191
ipproto = IPPROTO_IP;
192+
#if LWIP_IPV6
196193
} else { // INET6
197194
addr_family = AF_INET6;
198195
ipproto = IPPROTO_IPV6;
196+
#endif
199197
}
200198

201199
int socket_type;
@@ -218,14 +216,28 @@ bool socketpool_socket(socketpool_socketpool_obj_t *self,
218216
if (socknum < 0) {
219217
return false;
220218
}
221-
// This shouldn't happen since we have room for the same number of sockets as LWIP.
222-
if (!register_open_socket(socknum)) {
223-
lwip_close(socknum);
224-
return false;
225-
}
219+
226220
sock->num = socknum;
227221
// Sockets should be nonblocking in most cases
228222
lwip_fcntl(socknum, F_SETFL, O_NONBLOCK);
223+
224+
return true;
225+
}
226+
227+
// special entry for workflow listener (register system socket)
228+
bool socketpool_socket(socketpool_socketpool_obj_t *self,
229+
socketpool_socketpool_addressfamily_t family, socketpool_socketpool_sock_t type,
230+
socketpool_socket_obj_t *sock) {
231+
232+
if (!_socketpool_socket(self, family, type, sock)) {
233+
return false;
234+
}
235+
236+
// This shouldn't happen since we have room for the same number of sockets as LWIP.
237+
if (!register_open_socket(sock->num)) {
238+
lwip_close(sock->num);
239+
return false;
240+
}
229241
return true;
230242
}
231243

@@ -238,7 +250,7 @@ socketpool_socket_obj_t *common_hal_socketpool_socket(socketpool_socketpool_obj_
238250
socketpool_socket_obj_t *sock = m_new_obj_with_finaliser(socketpool_socket_obj_t);
239251
sock->base.type = &socketpool_socket_type;
240252

241-
if (!socketpool_socket(self, family, type, sock)) {
253+
if (!_socketpool_socket(self, family, type, sock)) {
242254
mp_raise_RuntimeError(translate("Out of sockets"));
243255
}
244256
mark_user_socket(sock->num, sock);
@@ -279,17 +291,16 @@ int socketpool_socket_accept(socketpool_socket_obj_t *self, uint8_t *ip, uint32_
279291
// We got a socket. New client socket will not be non-blocking by default, so make it non-blocking.
280292
lwip_fcntl(newsoc, F_SETFL, O_NONBLOCK);
281293

282-
if (!register_open_socket(newsoc)) {
283-
lwip_close(newsoc);
284-
return -MP_EBADF;
285-
}
286-
287-
288294
if (accepted != NULL) {
289-
// Close the active socket because we have another we accepted.
290-
if (!common_hal_socketpool_socket_get_closed(accepted)) {
291-
common_hal_socketpool_socket_close(accepted);
295+
// Error if called with open socket object.
296+
assert(common_hal_socketpool_socket_get_closed(accepted));
297+
298+
// Register if system socket
299+
if (!register_open_socket(newsoc)) {
300+
lwip_close(newsoc);
301+
return -MP_EBADF;
292302
}
303+
293304
// Replace the old accepted socket with the new one.
294305
accepted->num = newsoc;
295306
accepted->pool = self->pool;
@@ -353,12 +364,21 @@ void socketpool_socket_close(socketpool_socket_obj_t *self) {
353364
return;
354365
}
355366
self->connected = false;
356-
if (self->num >= 0) {
357-
lwip_shutdown(self->num, SHUT_RDWR);
358-
lwip_close(self->num);
359-
unregister_open_socket(self->num);
360-
self->num = -1;
367+
int fd = self->num;
368+
// Ignore bogus/closed sockets
369+
if (fd >= LWIP_SOCKET_OFFSET) {
370+
if (user_socket[fd - LWIP_SOCKET_OFFSET] == NULL) {
371+
socket_fd_state[fd - LWIP_SOCKET_OFFSET] = FDSTATE_CLOSING;
372+
lwip_shutdown(fd, SHUT_RDWR);
373+
lwip_close(fd);
374+
} else {
375+
lwip_shutdown(fd, SHUT_RDWR);
376+
lwip_close(fd);
377+
socket_fd_state[fd - LWIP_SOCKET_OFFSET] = FDSTATE_CLOSED;
378+
user_socket[fd - LWIP_SOCKET_OFFSET] = NULL;
379+
}
361380
}
381+
self->num = -1;
362382
}
363383

364384
void common_hal_socketpool_socket_close(socketpool_socket_obj_t *self) {
@@ -420,7 +440,7 @@ bool common_hal_socketpool_socket_get_connected(socketpool_socket_obj_t *self) {
420440
}
421441

422442
bool common_hal_socketpool_socket_listen(socketpool_socket_obj_t *self, int backlog) {
423-
return lwip_listen(self->num, backlog);
443+
return lwip_listen(self->num, backlog) == 0;
424444
}
425445

426446
mp_uint_t common_hal_socketpool_socket_recvfrom_into(socketpool_socket_obj_t *self,
@@ -479,10 +499,9 @@ int socketpool_socket_recv_into(socketpool_socket_obj_t *self,
479499
}
480500
RUN_BACKGROUND_TASKS;
481501
received = lwip_recv(self->num, (void *)buf, len, 0);
482-
483502
// In non-blocking mode, fail instead of looping
484-
if (received == -1 && self->timeout_ms == 0) {
485-
if (errno == ENOTCONN) {
503+
if (received < 1 && self->timeout_ms == 0) {
504+
if ((received == 0) || (errno == ENOTCONN)) {
486505
self->connected = false;
487506
return -MP_ENOTCONN;
488507
}

ports/espressif/common-hal/socketpool/Socket.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,5 @@ typedef struct {
4848
} socketpool_socket_obj_t;
4949

5050
void socket_user_reset(void);
51+
// Unblock workflow socket select thread (platform specific)
52+
void socketpool_socket_poll_resume(void);

ports/raspberrypi/common-hal/socketpool/Socket.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,4 +75,7 @@ typedef struct _lwip_socket_obj_t {
7575
socketpool_socketpool_obj_t *pool;
7676
} socketpool_socket_obj_t;
7777

78+
// Not required for RPi socket positive callbacks
79+
#define socketpool_socket_poll_resume(x)
80+
7881
void socket_user_reset(void);

0 commit comments

Comments
 (0)