Skip to content

from(AsyncIterable) doesn't finalize async generators #5998

Closed
@ptitjes

Description

Bug Report

Current Behavior
When using from() with an async generator, the generator is never finalized.

import { async, concat, from, Observable, Subscriber } from "rxjs";
import { take, tap } from "rxjs/operators";

async function* gen() {
  try {
    let i = 0;
    while (true) {
      yield i++;
      // added or Chrome hangs
      await new Promise((resolve) => setTimeout(resolve, 100));
    }
  } finally {
    console.log("Finalizing generator");
  }
}

const source = from(gen()).pipe(
  take(3),
  tap({ complete: () => console.log("Done") })
);

source.subscribe(console.log);

yields:

0
1
2
Done

Expected behavior
I would expect for the async generator to be finalized, just as is done for the non-async generators.

The above code should yield:

0
1
2
Done
Finalizing generator

Reproduction

https://stackblitz.com/edit/from-async-generator?file=index.ts

Environment

  • Runtime: both Node v12 and Chrome v88
  • RxJS version: 7.0.0-beta.10

Possible Solution
The fromAsyncIterable() function in rxjs/src/internal/observable/from.ts should handle this case, as does fromIterable().

Something along the lines:

function fromAsyncIterable<T>(asyncIterable: AsyncIterable<T> | AsyncGenerator<T>) {
  return new Observable((subscriber: Subscriber<T>) => {
    process(asyncIterable, subscriber).catch(err => subscriber.error(err));
    return () => {
      if (isFunction(asyncIterable?.return)) asyncIterable.return(null);
    };
  });
}

async function process<T>(
  asyncIterable: AsyncIterable<T> | AsyncGenerator<T>,
  subscriber: Subscriber<T>
) {
  for await (const value of asyncIterable) {
    subscriber.next(value);
  }
  subscriber.complete();
}

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions