Skip to content
This repository was archived by the owner on Jan 26, 2022. It is now read-only.
This repository was archived by the owner on Jan 26, 2022. It is now read-only.

No means for secure stopping async generators #126

Open
@awto

Description

@awto

Let's assume a very typical reactive programming use case, say, I need to combine a few streams into a single one, yielding value each time any of original streams yields new value.

For example, I use it for UI and convert DOM events into an async iterable with a function subject (I don't include its sources here, it is not significant for the problem - it may be checked in this gist). The function returns an object with AsyncIterable interface and additional method send to pass the event into the stream.

And there is a function combine simply merging all async iterables in arguments into a single one.

async function* combine(...args) {
  const threads = args.map(i => i[Symbol.asyncIterator]())
  const sparks = new Set(threads.map(i => ({thread:i,step:i.next()})))
  try {
    while(sparks.size) {
      const v = await Promise.race([...sparks]
                                   .map(i => i.step.then(({done,value}) => ({done,value,spark:i}))))
      sparks.delete(v.spark)
      if (!v.done) {
        sparks.add({...v.spark,step:v.spark.thread.next()})
        yield v.value
      }
    }
  } finally {
    await Promise.all([...threads].map((i) => i.return()))
  }
}

And now I iterate two such sources combined and exit the loop on some condition.

const subj1 = subject()
const subj2 = subject()

async function test() {
  let cnt = 0
  for await(const i of combine(subj1,subj2)) {
    console.log("value:",i)
    if (cnt++)
      break
  }
}
//.....
subj1.send(1)
subj2.send(2)

Everything is pretty fine, I added console.log in exit function in subject all loops to handle subjects exit. But now if I wrap one of the sources with any simple async generator, like with (or just yield*)

async function* copy(input) {
  for await(const i of input)
    yield i
}

async function test() {
  let cnt = 0
  for await(const i of combine(subj1,copy(subj2))) {
    console.log("value:",i)
    if (cnt++)
      break
  }
}

Now test never exits (never calls next then), it waits in await in the combine's finally block. The reason is pretty simple and obvious, the copy generator is blocked on next and generators use the same queue for return.

I searched issues in this repository and found these tickets: #5 and #55. There are no clean reasons for the single queue decision, except probably:

writer.next(bufferA);
writer.next(bufferB);
await writer.return(); // Only wait for the file to close

This is maybe better to be done this way:

await Promise.all([writer.next(bufferA), writer.next(bufferB)]);
await writer.return(); // Only wait for the file to close

And how would we convert async generator into observable? For example, the solution from #20 won't work.

function toObservable(asyncIterable) {
    return new Observable(observer => {
        let stop = false;
        async function drain() {
            for await (let x of asyncIterable) {
                if (stop) break;
                observer.next(x);
            }
        }
        drain().then(x => observer.complete(x), err => observer.error(err));
        return _=> { stop = true; }
    });
}

The cleanup function won't stop the source generator (even if we add return call for object returned by drain) if there are no more values sent by original iterable, it is forever locked in next.

Current status

There is nothing to do in this proposal to fix this. This will be fixed automatically after some form of async cancelation operation is added as part of some another proposal. This issue is left open as a warning about the problem.

Workarounds

Using Transducers (lifted generators)

Transducers are functions taking a stream as input and producing another stream.
Generators can read its input passed as an argument and it is there we can send some specific stop signal if needed.

See more detailed description here:

There is fork function which merges streams and stops well, but it merges lifted streams - transducers

Transpiler

@effetful/es transpiler adds lightweight cancelation support to async function.

Resulting Promises of async functions transpiled by this are amended with M.cancelSymbol which is a function which tries to cancel execution of this function. It executes finally blocks, and propagates cancelation to current await expression. It doesn't do anything more (e.g. no propagation to children).

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions