No means for secure stopping async generators #126
Description
Let's assume a very typical reactive programming use case, say, I need to combine a few streams into a single one, yielding value each time any of original streams yields new value.
For example, I use it for UI and convert DOM events into an async iterable with a function subject
(I don't include its sources here, it is not significant for the problem - it may be checked in this gist). The function returns an object with AsyncIterable interface and additional method send
to pass the event into the stream.
And there is a function combine
simply merging all async iterables in arguments into a single one.
async function* combine(...args) {
const threads = args.map(i => i[Symbol.asyncIterator]())
const sparks = new Set(threads.map(i => ({thread:i,step:i.next()})))
try {
while(sparks.size) {
const v = await Promise.race([...sparks]
.map(i => i.step.then(({done,value}) => ({done,value,spark:i}))))
sparks.delete(v.spark)
if (!v.done) {
sparks.add({...v.spark,step:v.spark.thread.next()})
yield v.value
}
}
} finally {
await Promise.all([...threads].map((i) => i.return()))
}
}
And now I iterate two such sources combined and exit the loop on some condition.
const subj1 = subject()
const subj2 = subject()
async function test() {
let cnt = 0
for await(const i of combine(subj1,subj2)) {
console.log("value:",i)
if (cnt++)
break
}
}
//.....
subj1.send(1)
subj2.send(2)
Everything is pretty fine, I added console.log
in exit function in subject
all loops to handle subjects exit. But now if I wrap one of the sources with any simple async generator, like with (or just yield*
)
async function* copy(input) {
for await(const i of input)
yield i
}
async function test() {
let cnt = 0
for await(const i of combine(subj1,copy(subj2))) {
console.log("value:",i)
if (cnt++)
break
}
}
Now test never exits (never calls next then
), it waits in await in the combine's finally block. The reason is pretty simple and obvious, the copy
generator is blocked on next
and generators use the same queue for return
.
I searched issues in this repository and found these tickets: #5 and #55. There are no clean reasons for the single queue decision, except probably:
writer.next(bufferA);
writer.next(bufferB);
await writer.return(); // Only wait for the file to close
This is maybe better to be done this way:
await Promise.all([writer.next(bufferA), writer.next(bufferB)]);
await writer.return(); // Only wait for the file to close
And how would we convert async generator into observable? For example, the solution from #20 won't work.
function toObservable(asyncIterable) {
return new Observable(observer => {
let stop = false;
async function drain() {
for await (let x of asyncIterable) {
if (stop) break;
observer.next(x);
}
}
drain().then(x => observer.complete(x), err => observer.error(err));
return _=> { stop = true; }
});
}
The cleanup function won't stop the source generator (even if we add return
call for object returned by drain
) if there are no more values sent by original iterable, it is forever locked in next
.
Current status
There is nothing to do in this proposal to fix this. This will be fixed automatically after some form of async cancelation operation is added as part of some another proposal. This issue is left open as a warning about the problem.
Workarounds
Using Transducers (lifted generators)
Transducers are functions taking a stream as input and producing another stream.
Generators can read its input passed as an argument and it is there we can send some specific stop
signal if needed.
See more detailed description here:
- Decouple Business Logic using Async Generators
- Async Generators as an alternative to State Management
There is fork
function which merges streams and stops well, but it merges lifted streams - transducers
Transpiler
@effetful/es transpiler adds lightweight cancelation support to async function.
Resulting Promises of async functions transpiled by this are amended with M.cancelSymbol
which is a function which tries to cancel execution of this function. It executes finally
blocks, and propagates cancelation to current await
expression. It doesn't do anything more (e.g. no propagation to children).