A small utility to create a ReadableStream with a resolver-style interface,
similar to Promise.withResolvers().
This library provides both a low-level resolver and a safe, defensive variant; the safe variant never throws due to invalid stream state.
npm install readable-stream-with-safe-resolvers
# or
yarn add readable-stream-with-safe-resolversThis library provides two resolver variants:
A low-level resolver that directly controls a ReadableStream.
- Operations do not perform internal safety checks
- Calling methods after the stream is finalized may throw
- Intended for strict, controller-like use cases
Returned methods:
enqueue(chunk: T): voidclose(): voiderror(reason: unknown): voidcompleted: boolean— whether the stream has been finalized This flag can be used to avoid invalid operations, but it is not enforced internally.
A defensive resolver that safely ignores invalid operations.
- All operations are safe and never throw due to stream state
- Each method returns a boolean indicating whether it was applied
Returned methods:
enqueue(chunk: T): booleanclose(): booleanerror(reason: unknown): boolean
import { withSafeResolvers } from "readable-stream-with-safe-resolvers";
async function example() {
const { stream, enqueue, close, error } = withSafeResolvers<number>();
// Push values
enqueue(1);
// -> true
enqueue(2);
// -> true
// Close the stream
close();
// -> true
// Reading from the stream
const result = [];
for await (const value of stream) {
result.push(value);
}
console.log(result); // → [1, 2]
}
example();In typical usage, the return value can be ignored unless you need to detect whether the stream has already been finalized.
import { withResolvers } from "readable-stream-with-safe-resolvers";
const { stream, enqueue, close, completed } = withResolvers<number>();
if (!completed) {
enqueue(1);
close();
}Calling methods after the stream is finalized may throw.
- You want safety over strict control
- Multiple async contexts may call
enqueue/close - You want to avoid runtime errors from invalid stream state
- You want strict, controller-like behavior
- Stream lifecycle is fully controlled in one place
- You prefer errors over silent ignoring
The caller is responsible for avoiding invalid operations.
Returns an object containing:
stream: ReadableStream<T>— the underlying stream.enqueue(chunk: T): boolean— pushes a new chunk into the stream.
Returnsfalseif the stream has already been finalized.close(): boolean— gracefully closes the stream.
Returnsfalseif already finalized.error(reason: unknown): boolean— terminates the stream with an error.
Returnsfalseif already finalized.
Once the stream is finalized—via close(), error(), or consumer cancel()—
all subsequent calls to enqueue, close, or error are silently ignored
and return false.
Returns an object containing:
stream: ReadableStream<T>— the underlying stream.enqueue(chunk: T): void— pushes a new chunk into the stream.close(): void— closes the stream.error(reason: unknown): void— terminates the stream with an error.completed: boolean— whether the stream has been finalized.
No internal safety checks are performed.
The caller is responsible for avoiding invalid operations.
import { withSafeResolvers } from "readable-stream-with-safe-resolvers";
async function exampleError() {
const { stream, enqueue, error } = withSafeResolvers<number>();
enqueue(10);
// -> true
enqueue(20);
// -> true
error(new Error("Something went wrong"));
// -> true
try {
for await (const value of stream) {
console.log(value);
}
} catch (err) {
console.error(err); // → Error: Something went wrong
}
}
exampleError();const { stream, enqueue, close, error } = withSafeResolvers<number>();
enqueue(1);
// -> true
close();
// -> true
enqueue(2); // ignored
// -> false
close(); // ignored
// -> false
error(new Error("oops")); // ignored
// -> falseThis demonstrates that operations after the stream is finalized are safe, idempotent, and never throw due to invalid stream state.
- Creating pushable streams controlled outside the consumer loop
- Wrapping async generators or event emitters as streams
- Building streaming APIs that need explicit lifecycle control
MIT