Description
The background to this is that I found a bug in deno's TextDecoderStream
implementation that results in it failing to clean up a resource it holds if its stream pipeline aborts with an error. The implementation holds a TextDecoder
which it uses in streaming mode. The TextDecoder
holds a resource handle to a native object handling text decoding for it, which it closes when decode()
is called without {stream: true}
. When the deno TextDecoderStream
implementation's transformer gets a flush()
call, it calls decode()
to close the TextDecoder
. However, if the stream aborts, flush()
is not called, so the native resource handle is not closed, and gets leaked.
I've looked through the Streams spec, and as I understand it there's no built-in way for a transformer to be notified of a stream error.
It is possible to work around this as an API user, as I mention in the deno issue:
I played around with the Streams API a bit and came up with a fairly straightforward way to implement a TransformStream whose Transformer gets notified of stream aborts. Basically two parts:
- A WritableStream can be monitored for errors by wrapping it with another WritableStream that opens a reader on the monitored stream, exposes the reader's closed promise (which rejects if the monitored stream is aborted), and forwards start/write/close/abort calls to the monitored stream.
That looks like this: https://deno.land/x/shutdown_aware_transform_stream@1.0.0/shutdown_monitor_writable_stream.ts- Then a TransformStream can react to stream aborts by monitoring its writable side with the monitor stream, and using the closed promise to be notified when the stream aborts.
That looks like this: https://deno.land/x/shutdown_aware_transform_stream@1.0.0/shutdown_aware_transform_stream.ts#L98
Although I say "fairly straightforward", it's not exactly trivial. And another alternative of not using TransformStream
and instead tying together a readable and writable stream manually to create a (readable, writable) pair is even more fiddly to do correctly.
As an API user, it seems like there should be an idiomatic way to handle stream errors in a transformer. The underlying sink of a WritableStream
can do so either with its abort()
method, or via the AbortSignal
on WritableStreamDefaultController
's signal
property.
What do you think about giving transformers similar capabilities to handle aborts as underlying sinks?
Even just giving TransformStreamDefaultController
an AbortSignal
would be helpful (I presume that's simpler to spec than a method on transformer, as it can't affect the error propagation behaviour). Although I suppose a method would allow for asynchronous cleanup...