Skip to content
This repository was archived by the owner on May 4, 2018. It is now read-only.

Commit 4189122

Browse files
committed
unix: try to write immediately in uv_udp_send
1 parent bf6e90f commit 4189122

File tree

1 file changed

+74
-95
lines changed

1 file changed

+74
-95
lines changed

src/unix/udp.c

Lines changed: 74 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,9 @@
3838

3939

4040
static void uv__udp_run_completed(uv_udp_t* handle);
41-
static void uv__udp_run_pending(uv_udp_t* handle);
4241
static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
43-
static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
44-
static void uv__udp_sendmsg(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
42+
static void uv__udp_recvmsg(uv_udp_t* handle);
43+
static void uv__udp_sendmsg(uv_udp_t* handle);
4544
static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
4645
int domain,
4746
unsigned int flags);
@@ -65,25 +64,19 @@ void uv__udp_finish_close(uv_udp_t* handle) {
6564
assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT));
6665
assert(handle->io_watcher.fd == -1);
6766

68-
uv__udp_run_completed(handle);
69-
7067
while (!QUEUE_EMPTY(&handle->write_queue)) {
7168
q = QUEUE_HEAD(&handle->write_queue);
7269
QUEUE_REMOVE(q);
7370

7471
req = QUEUE_DATA(q, uv_udp_send_t, queue);
75-
uv__req_unregister(handle->loop, req);
76-
77-
if (req->bufs != req->bufsml)
78-
free(req->bufs);
79-
req->bufs = NULL;
80-
81-
if (req->send_cb != NULL)
82-
req->send_cb(req, -ECANCELED);
72+
req->status = -ECANCELED;
73+
QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
8374
}
8475

85-
handle->send_queue_size = 0;
86-
handle->send_queue_count = 0;
76+
uv__udp_run_completed(handle);
77+
78+
assert(handle->send_queue_size == 0);
79+
assert(handle->send_queue_count == 0);
8780

8881
/* Now tear down the handle. */
8982
handle->recv_cb = NULL;
@@ -92,52 +85,6 @@ void uv__udp_finish_close(uv_udp_t* handle) {
9285
}
9386

9487

95-
static void uv__udp_run_pending(uv_udp_t* handle) {
96-
uv_udp_send_t* req;
97-
QUEUE* q;
98-
struct msghdr h;
99-
ssize_t size;
100-
101-
while (!QUEUE_EMPTY(&handle->write_queue)) {
102-
q = QUEUE_HEAD(&handle->write_queue);
103-
assert(q != NULL);
104-
105-
req = QUEUE_DATA(q, uv_udp_send_t, queue);
106-
assert(req != NULL);
107-
108-
memset(&h, 0, sizeof h);
109-
h.msg_name = &req->addr;
110-
h.msg_namelen = (req->addr.ss_family == AF_INET6 ?
111-
sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
112-
h.msg_iov = (struct iovec*) req->bufs;
113-
h.msg_iovlen = req->nbufs;
114-
115-
do {
116-
size = sendmsg(handle->io_watcher.fd, &h, 0);
117-
}
118-
while (size == -1 && errno == EINTR);
119-
120-
/* TODO try to write once or twice more in the
121-
* hope that the socket becomes readable again?
122-
*/
123-
if (size == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
124-
break;
125-
126-
req->status = (size == -1 ? -errno : size);
127-
128-
/* Sending a datagram is an atomic operation: either all data
129-
* is written or nothing is (and EMSGSIZE is raised). That is
130-
* why we don't handle partial writes. Just pop the request
131-
* off the write queue and onto the completed queue, done.
132-
*/
133-
handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
134-
handle->send_queue_count--;
135-
QUEUE_REMOVE(&req->queue);
136-
QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
137-
}
138-
}
139-
140-
14188
static void uv__udp_run_completed(uv_udp_t* handle) {
14289
uv_udp_send_t* req;
14390
QUEUE* q;
@@ -149,6 +96,9 @@ static void uv__udp_run_completed(uv_udp_t* handle) {
14996
req = QUEUE_DATA(q, uv_udp_send_t, queue);
15097
uv__req_unregister(handle->loop, req);
15198

99+
handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
100+
handle->send_queue_count--;
101+
152102
if (req->bufs != req->bufsml)
153103
free(req->bufs);
154104
req->bufs = NULL;
@@ -164,33 +114,40 @@ static void uv__udp_run_completed(uv_udp_t* handle) {
164114
else
165115
req->send_cb(req, req->status);
166116
}
117+
118+
if (QUEUE_EMPTY(&handle->write_queue)) {
119+
/* Pending queue and completion queue empty, stop watcher. */
120+
uv__io_stop(handle->loop, &handle->io_watcher, UV__POLLOUT);
121+
if (!uv__io_active(&handle->io_watcher, UV__POLLIN))
122+
uv__handle_stop(handle);
123+
}
167124
}
168125

169126

170127
static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {
128+
uv_udp_t* handle;
129+
130+
handle = container_of(w, uv_udp_t, io_watcher);
131+
assert(handle->type == UV_UDP);
132+
171133
if (revents & UV__POLLIN)
172-
uv__udp_recvmsg(loop, w, revents);
134+
uv__udp_recvmsg(handle);
173135

174-
if (revents & UV__POLLOUT)
175-
uv__udp_sendmsg(loop, w, revents);
136+
if (revents & UV__POLLOUT) {
137+
uv__udp_sendmsg(handle);
138+
uv__udp_run_completed(handle);
139+
}
176140
}
177141

178142

179-
static void uv__udp_recvmsg(uv_loop_t* loop,
180-
uv__io_t* w,
181-
unsigned int revents) {
143+
static void uv__udp_recvmsg(uv_udp_t* handle) {
182144
struct sockaddr_storage peer;
183145
struct msghdr h;
184-
uv_udp_t* handle;
185146
ssize_t nread;
186147
uv_buf_t buf;
187148
int flags;
188149
int count;
189150

190-
handle = container_of(w, uv_udp_t, io_watcher);
191-
assert(handle->type == UV_UDP);
192-
assert(revents & UV__POLLIN);
193-
194151
assert(handle->recv_cb != NULL);
195152
assert(handle->alloc_cb != NULL);
196153

@@ -247,34 +204,46 @@ static void uv__udp_recvmsg(uv_loop_t* loop,
247204
}
248205

249206

250-
static void uv__udp_sendmsg(uv_loop_t* loop,
251-
uv__io_t* w,
252-
unsigned int revents) {
253-
uv_udp_t* handle;
254-
255-
handle = container_of(w, uv_udp_t, io_watcher);
256-
assert(handle->type == UV_UDP);
257-
assert(revents & UV__POLLOUT);
207+
static void uv__udp_sendmsg(uv_udp_t* handle) {
208+
uv_udp_send_t* req;
209+
QUEUE* q;
210+
struct msghdr h;
211+
ssize_t size;
258212

259213
assert(!QUEUE_EMPTY(&handle->write_queue)
260214
|| !QUEUE_EMPTY(&handle->write_completed_queue));
261215

262-
/* Write out pending data first. */
263-
uv__udp_run_pending(handle);
216+
while (!QUEUE_EMPTY(&handle->write_queue)) {
217+
q = QUEUE_HEAD(&handle->write_queue);
218+
assert(q != NULL);
264219

265-
/* Drain 'request completed' queue. */
266-
uv__udp_run_completed(handle);
220+
req = QUEUE_DATA(q, uv_udp_send_t, queue);
221+
assert(req != NULL);
267222

268-
if (!QUEUE_EMPTY(&handle->write_completed_queue)) {
269-
/* Schedule completion callbacks. */
270-
uv__io_feed(handle->loop, &handle->io_watcher);
271-
}
272-
else if (QUEUE_EMPTY(&handle->write_queue)) {
273-
/* Pending queue and completion queue empty, stop watcher. */
274-
uv__io_stop(loop, &handle->io_watcher, UV__POLLOUT);
223+
memset(&h, 0, sizeof h);
224+
h.msg_name = &req->addr;
225+
h.msg_namelen = (req->addr.ss_family == AF_INET6 ?
226+
sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
227+
h.msg_iov = (struct iovec*) req->bufs;
228+
h.msg_iovlen = req->nbufs;
275229

276-
if (!uv__io_active(&handle->io_watcher, UV__POLLIN))
277-
uv__handle_stop(handle);
230+
do {
231+
size = sendmsg(handle->io_watcher.fd, &h, 0);
232+
} while (size == -1 && errno == EINTR);
233+
234+
if (size == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
235+
break;
236+
237+
req->status = (size == -1 ? -errno : size);
238+
239+
/* Sending a datagram is an atomic operation: either all data
240+
* is written or nothing is (and EMSGSIZE is raised). That is
241+
* why we don't handle partial writes. Just pop the request
242+
* off the write queue and onto the completed queue, done.
243+
*/
244+
QUEUE_REMOVE(&req->queue);
245+
QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
246+
uv__io_feed(handle->loop, &handle->io_watcher);
278247
}
279248
}
280249

@@ -415,15 +384,21 @@ int uv__udp_send(uv_udp_send_t* req,
415384
unsigned int addrlen,
416385
uv_udp_send_cb send_cb) {
417386
int err;
387+
int empty_queue;
418388

419389
assert(nbufs > 0);
420390

421391
err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
422392
if (err)
423393
return err;
424394

425-
uv__req_init(handle->loop, req, UV_UDP_SEND);
395+
/* It's legal for send_queue_count > 0 even when the write_queue is empty;
396+
* it means there are error-state requests in the write_completed_queue that
397+
* will touch up send_queue_size/count later.
398+
*/
399+
empty_queue = (handle->send_queue_count == 0);
426400

401+
uv__req_init(handle->loop, req, UV_UDP_SEND);
427402
assert(addrlen <= sizeof(req->addr));
428403
memcpy(&req->addr, addr, addrlen);
429404
req->send_cb = send_cb;
@@ -441,9 +416,13 @@ int uv__udp_send(uv_udp_send_t* req,
441416
handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs);
442417
handle->send_queue_count++;
443418
QUEUE_INSERT_TAIL(&handle->write_queue, &req->queue);
444-
uv__io_start(handle->loop, &handle->io_watcher, UV__POLLOUT);
445419
uv__handle_start(handle);
446420

421+
if (empty_queue)
422+
uv__udp_sendmsg(handle);
423+
else
424+
uv__io_start(handle->loop, &handle->io_watcher, UV__POLLOUT);
425+
447426
return 0;
448427
}
449428

0 commit comments

Comments
 (0)