Closed
Description
Bug Report
Current Behavior
When using from()
with an async generator, the generator is never finalized.
import { async, concat, from, Observable, Subscriber } from "rxjs";
import { take, tap } from "rxjs/operators";
async function* gen() {
try {
let i = 0;
while (true) {
yield i++;
// added or Chrome hangs
await new Promise((resolve) => setTimeout(resolve, 100));
}
} finally {
console.log("Finalizing generator");
}
}
const source = from(gen()).pipe(
take(3),
tap({ complete: () => console.log("Done") })
);
source.subscribe(console.log);
yields:
0
1
2
Done
Expected behavior
I would expect for the async generator to be finalized, just as is done for the non-async generators.
The above code should yield:
0
1
2
Done
Finalizing generator
Reproduction
https://stackblitz.com/edit/from-async-generator?file=index.ts
Environment
- Runtime: both Node v12 and Chrome v88
- RxJS version: 7.0.0-beta.10
Possible Solution
The fromAsyncIterable()
function in rxjs/src/internal/observable/from.ts
should handle this case, as does fromIterable()
.
Something along the lines:
function fromAsyncIterable<T>(asyncIterable: AsyncIterable<T> | AsyncGenerator<T>) {
return new Observable((subscriber: Subscriber<T>) => {
process(asyncIterable, subscriber).catch(err => subscriber.error(err));
return () => {
if (isFunction(asyncIterable?.return)) asyncIterable.return(null);
};
});
}
async function process<T>(
asyncIterable: AsyncIterable<T> | AsyncGenerator<T>,
subscriber: Subscriber<T>
) {
for await (const value of asyncIterable) {
subscriber.next(value);
}
subscriber.complete();
}
Metadata
Assignees
Labels
No labels
Activity