Skip to content

Commit 54aa881

Browse files
committed
WIP: stream: Implement cancellation support for uv_write_t
The ability to request cancellation of a pending or in-progress write has been requested several times in libuv (at least joyent/libuv#1393 and libuv#2051), but was not yet implemented. I am currently on a mission to make Ctrl-C work nicely and reliably in Julia (JuliaLang/julia#60281). Of course, this would require the ability to cancel in-progress writes for sane semantics, so I have a renewed interest in this feature, which this PR attempts to implement. One primary problem with the API is that the existing callback does not provide support for passing in the number of written bytes (since it is expected to always complete). I see two options: 1. Create a new uv_write3 that takes a new cb type that does take this argument, or, 2. Create a new public `uv_write_t` field that this information can be read from. This PR takes the first approach, but as it turns out the number of bytes written needs to be stored anyway, so maybe there is not much point to this choice. As mentioned, we do need to store two extra bits of state (the total number of bytes written and which kind of callback we have). To maintain ABI compatibility we steal two pointers from the generic req_t reserve pool. This preserves ABI compatibility, but is a bit awkward, because it puts these new private fields into a different place than they would otherwise be. I don't see that we have much choice though, other than creating a completely new req_t subtype (which does not seem worth it). The `uv_cancel` function returns `0` on success or `UV_EBUSY` if the request was submitted using the `uv_write`/`uv_write2` API that does not take the new callback signature (unless no bytes have been written, in which case cancellation succeeds). It also returns `0` if the request is already done. The thought here was that it would be too racy to return an error code here, but I think it would be fine to return EALREADY (although I don't know what the caller would do with that information. `uv_write3` also takes a flags parameter reserved for future use (just in case). The windows side is relatively straightforward in that the only thing we really need to do is call `CancelIoEx` on the overlapped structure that's already inside of our req. We do of course need the appropriate accounting for the number of bytes written. Discloure: Claude Code was used in the creation of this PR, although dumb design decisions are probably by me. I have done some minimal integrated testing of this on Linux. On windows, I have run the included test, but have not tested in-situ. I consider this WIP until I've had a chance to run our full test suite on all platforms, but I wanted to make sure to open this early for API feedback/concerns.
1 parent d7dda9e commit 54aa881

File tree

17 files changed

+693
-89
lines changed

17 files changed

+693
-89
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -662,6 +662,7 @@ if(LIBUV_BUILD_TESTS)
662662
test/test-tcp-unexpected-read.c
663663
test/test-tcp-write-after-connect.c
664664
test/test-tcp-write-fail.c
665+
test/test-tcp-write-cancel.c
665666
test/test-tcp-write-queue-order.c
666667
test/test-tcp-write-to-half-open-connection.c
667668
test/test-tcp-writealot.c

Makefile.am

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \
289289
test/test-tcp-try-write.c \
290290
test/test-tcp-write-in-a-row.c \
291291
test/test-tcp-try-write-error.c \
292+
test/test-tcp-write-cancel.c \
292293
test/test-tcp-write-queue-order.c \
293294
test/test-test-macros.c \
294295
test/test-thread-equal.c \

include/uv.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,7 @@ typedef void (*uv_read_cb)(uv_stream_t* stream,
329329
ssize_t nread,
330330
const uv_buf_t* buf);
331331
typedef void (*uv_write_cb)(uv_write_t* req, int status);
332+
typedef void (*uv_write3_cb)(uv_write_t* req, int status, size_t nwritten);
332333
typedef void (*uv_connect_cb)(uv_connect_t* req, int status);
333334
typedef void (*uv_shutdown_cb)(uv_shutdown_t* req, int status);
334335
typedef void (*uv_connection_cb)(uv_stream_t* server, int status);
@@ -437,7 +438,7 @@ UV_EXTERN char* uv_err_name_r(int err, char* buf, size_t buflen);
437438
/* read-only */ \
438439
uv_req_type type; \
439440
/* private */ \
440-
void* reserved[6]; \
441+
void* reserved[4]; \
441442
UV_REQ_PRIVATE_FIELDS \
442443

443444
/* Abstract base class of all requests. */
@@ -567,6 +568,13 @@ UV_EXTERN int uv_try_write2(uv_stream_t* handle,
567568
const uv_buf_t bufs[],
568569
unsigned int nbufs,
569570
uv_stream_t* send_handle);
571+
UV_EXTERN int uv_write3(uv_write_t* req,
572+
uv_stream_t* handle,
573+
const uv_buf_t bufs[],
574+
unsigned int nbufs,
575+
uv_stream_t* send_handle,
576+
unsigned int flags,
577+
uv_write3_cb cb);
570578

571579
/* uv_write_t is a subclass of uv_req_t. */
572580
struct uv_write_s {

include/uv/unix.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,14 @@ typedef struct {
249249

250250
#define UV_REQ_TYPE_PRIVATE /* empty */
251251

252-
#define UV_REQ_PRIVATE_FIELDS /* empty */
252+
#define UV_REQ_PRIVATE_FIELDS \
253+
union { \
254+
void* reserved2[2]; \
255+
struct { \
256+
size_t bytes_written; \
257+
unsigned int write_flags; \
258+
} write_extra; \
259+
};
253260

254261
#define UV_PRIVATE_REQ_TYPES /* empty */
255262

@@ -260,6 +267,10 @@ typedef struct {
260267
unsigned int nbufs; \
261268
int error; \
262269
uv_buf_t bufsml[4]; \
270+
union { \
271+
uv_write_cb cb; \
272+
uv_write3_cb cb3; \
273+
} write_cb; \
263274

264275
#define UV_CONNECT_PRIVATE_FIELDS \
265276
struct uv__queue queue; \

include/uv/win.h

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -379,13 +379,25 @@ typedef struct {
379379
WCHAR* name; \
380380
} connect; \
381381
} u; \
382-
struct uv_req_s* next_req;
382+
struct uv_req_s* next_req; \
383+
/* Extra fields for uv_write_t cancellation support */ \
384+
union { \
385+
void* reserved2[2]; \
386+
struct { \
387+
size_t bytes_written; \
388+
unsigned int write_flags; \
389+
} write_extra; \
390+
};
383391

384392
#define UV_WRITE_PRIVATE_FIELDS \
385393
int coalesced; \
386394
uv_buf_t write_buffer; \
387395
HANDLE event_handle; \
388-
HANDLE wait_handle;
396+
HANDLE wait_handle; \
397+
union { \
398+
uv_write_cb cb; \
399+
uv_write3_cb cb3; \
400+
} write_cb;
389401

390402
#define UV_CONNECT_PRIVATE_FIELDS \
391403
/* empty */

src/queue.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,16 @@ static inline void uv__queue_remove(struct uv__queue* q) {
8787
q->next->prev = q->prev;
8888
}
8989

90+
static inline int uv__queue_contains(const struct uv__queue* h,
91+
const struct uv__queue* q) {
92+
const struct uv__queue* e;
93+
94+
uv__queue_foreach(e, h) {
95+
if (e == q)
96+
return 1;
97+
}
98+
99+
return 0;
100+
}
101+
90102
#endif /* QUEUE_H_ */

src/threadpool.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121

2222
#include "uv-common.h"
2323

24-
#if !defined(_WIN32)
24+
#if defined(_WIN32)
25+
# include "win/internal.h"
26+
#else
2527
# include "unix/internal.h"
2628
#endif
2729

@@ -421,6 +423,8 @@ int uv_cancel(uv_req_t* req) {
421423
loop = ((uv_work_t*) req)->loop;
422424
wreq = &((uv_work_t*) req)->work_req;
423425
break;
426+
case UV_WRITE:
427+
return uv__write_cancel((uv_write_t*) req);
424428
default:
425429
return UV_EINVAL;
426430
}

src/unix/internal.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,7 @@ void uv__stream_init(uv_loop_t* loop, uv_stream_t* stream,
345345
uv_handle_type type);
346346
int uv__stream_open(uv_stream_t*, int fd, int flags);
347347
void uv__stream_destroy(uv_stream_t* stream);
348+
int uv__write_cancel(uv_write_t* req);
348349
#if defined(__APPLE__)
349350
int uv__stream_try_select(uv_stream_t* stream, int* fd);
350351
#endif /* defined(__APPLE__) */

src/unix/stream.c

Lines changed: 89 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -692,6 +692,7 @@ static int uv__write_req_update(uv_stream_t* stream,
692692

693693
assert(n <= stream->write_queue_size);
694694
stream->write_queue_size -= n;
695+
req->write_extra.bytes_written += n;
695696

696697
buf = req->bufs + req->write_index;
697698

@@ -736,6 +737,46 @@ static void uv__write_req_finish(uv_write_t* req) {
736737
}
737738

738739

740+
int uv__write_cancel(uv_write_t* req) {
741+
uv_stream_t* stream;
742+
size_t remaining;
743+
744+
stream = req->handle;
745+
if (stream == NULL)
746+
return UV_EINVAL;
747+
748+
/* If the provided callback does not allow passing nwritten,
749+
* don't allow cancellation of partial writes. Return EBUSY instead.
750+
*/
751+
if (!(req->write_extra.write_flags & UV__WRITE_ALLOW_PARTIAL)) {
752+
if (req->write_extra.bytes_written > 0)
753+
return UV_EBUSY;
754+
}
755+
756+
/* Already completed, treat as success */
757+
if (!uv__queue_contains(&stream->write_queue, &req->queue))
758+
return 0;
759+
760+
uv__queue_remove(&req->queue);
761+
762+
remaining = uv__count_bufs(req->bufs + req->write_index,
763+
req->nbufs - req->write_index);
764+
assert(remaining <= stream->write_queue_size);
765+
stream->write_queue_size -= remaining;
766+
767+
if (req->bufs != req->bufsml)
768+
uv__free(req->bufs);
769+
req->bufs = NULL;
770+
771+
req->error = UV_ECANCELED;
772+
uv__queue_insert_tail(&stream->write_completed_queue, &req->queue);
773+
774+
uv__io_feed(stream->loop, &stream->io_watcher);
775+
776+
return 0;
777+
}
778+
779+
739780
static int uv__handle_fd(uv_handle_t* handle) {
740781
switch (handle->type) {
741782
case UV_NAMED_PIPE:
@@ -922,8 +963,13 @@ static void uv__write_callbacks(uv_stream_t* stream) {
922963
}
923964

924965
/* NOTE: call callback AFTER freeing the request data. */
925-
if (req->cb)
926-
req->cb(req, req->error);
966+
if (req->write_extra.write_flags & UV__WRITE_ALLOW_PARTIAL) {
967+
if (req->write_cb.cb3 != NULL)
968+
req->write_cb.cb3(req, req->error, req->write_extra.bytes_written);
969+
} else {
970+
if (req->write_cb.cb != NULL)
971+
req->write_cb.cb(req, req->error);
972+
}
927973
}
928974
}
929975

@@ -1329,12 +1375,12 @@ static int uv__check_before_write(uv_stream_t* stream,
13291375
return 0;
13301376
}
13311377

1332-
int uv_write2(uv_write_t* req,
1333-
uv_stream_t* stream,
1334-
const uv_buf_t bufs[],
1335-
unsigned int nbufs,
1336-
uv_stream_t* send_handle,
1337-
uv_write_cb cb) {
1378+
static int uv_write_internal(uv_write_t* req,
1379+
uv_stream_t* stream,
1380+
const uv_buf_t bufs[],
1381+
unsigned int nbufs,
1382+
uv_stream_t* send_handle,
1383+
unsigned int write_flags) {
13381384
int empty_queue;
13391385
int err;
13401386

@@ -1352,7 +1398,8 @@ int uv_write2(uv_write_t* req,
13521398

13531399
/* Initialize the req */
13541400
uv__req_init(stream->loop, req, UV_WRITE);
1355-
req->cb = cb;
1401+
req->write_extra.bytes_written = 0;
1402+
req->write_extra.write_flags = write_flags;
13561403
req->handle = stream;
13571404
req->error = 0;
13581405
req->send_handle = send_handle;
@@ -1398,6 +1445,39 @@ int uv_write2(uv_write_t* req,
13981445
}
13991446

14001447

1448+
int uv_write2(uv_write_t* req,
1449+
uv_stream_t* stream,
1450+
const uv_buf_t bufs[],
1451+
unsigned int nbufs,
1452+
uv_stream_t* send_handle,
1453+
uv_write_cb cb) {
1454+
int err;
1455+
1456+
req->write_cb.cb = cb;
1457+
err = uv_write_internal(req, stream, bufs, nbufs, send_handle, 0);
1458+
1459+
return err;
1460+
}
1461+
1462+
int uv_write3(uv_write_t* req,
1463+
uv_stream_t* stream,
1464+
const uv_buf_t bufs[],
1465+
unsigned int nbufs,
1466+
uv_stream_t* send_handle,
1467+
unsigned int flags,
1468+
uv_write3_cb cb) {
1469+
int err;
1470+
1471+
if (flags != 0)
1472+
return UV_EINVAL;
1473+
1474+
req->write_cb.cb3 = cb;
1475+
err = uv_write_internal(req, stream, bufs, nbufs, send_handle,
1476+
UV__WRITE_ALLOW_PARTIAL);
1477+
1478+
return err;
1479+
}
1480+
14011481
/* The buffers to be written must remain valid until the callback is called.
14021482
* This is not required for the uv_buf_t array.
14031483
*/
@@ -1409,7 +1489,6 @@ int uv_write(uv_write_t* req,
14091489
return uv_write2(req, handle, bufs, nbufs, NULL, cb);
14101490
}
14111491

1412-
14131492
int uv_try_write(uv_stream_t* stream,
14141493
const uv_buf_t bufs[],
14151494
unsigned int nbufs) {

src/uv-common.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,11 @@ enum {
141141
UV_HANDLE_REAP = 0x10000000
142142
};
143143

144+
/* Internal flags for uv_write_t requests */
145+
enum {
146+
UV__WRITE_ALLOW_PARTIAL = 0x1 /* Request allows partial write on cancel */
147+
};
148+
144149
static inline int uv__is_raw_tty_mode(uv_tty_mode_t m) {
145150
return m == UV_TTY_MODE_RAW || m == UV_TTY_MODE_RAW_VT;
146151
}

0 commit comments

Comments
 (0)