Skip to content

Commit

Permalink
stream: handle generator destruction from Duplex.from()
Browse files Browse the repository at this point in the history
  • Loading branch information
matthieusieben committed Sep 24, 2024
1 parent 3c5ceff commit 094e571
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 14 deletions.
81 changes: 67 additions & 14 deletions lib/internal/streams/duplexify.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,19 @@ module.exports = function duplexify(body, name) {
}

if (typeof body === 'function') {
const { value, write, final, destroy } = fromAsyncGen(body);
let d;

const { value, write, final, destroy } = fromAsyncGen(body, (err) => {
if (d) destroyer(d, err);
});

// Body might be a constructor function instead of an async generator function.
if (isDuplexNodeStream(value)) {
return value;
return d = value;
}

if (isIterable(value)) {
return from(Duplexify, value, {
return d = from(Duplexify, value, {
// TODO (ronag): highWaterMark?
objectMode: true,
write,
Expand All @@ -102,12 +106,11 @@ module.exports = function duplexify(body, name) {

const then = value?.then;
if (typeof then === 'function') {
let d;

const promise = FunctionPrototypeCall(
then,
value,
(val) => {
destroyer(d, null);
if (val != null) {
throw new ERR_INVALID_RETURN_VALUE('nully', 'body', val);
}
Expand Down Expand Up @@ -208,11 +211,14 @@ module.exports = function duplexify(body, name) {
body);
};

function fromAsyncGen(fn) {
function fromAsyncGen(fn, onAbort) {
let { promise, resolve } = createDeferredPromise();
const ac = new AbortController();
const signal = ac.signal;
const value = fn(async function*() {
let ended = false;
let error = null;

const asyncGenerator = (async function* () {
while (true) {
const _promise = promise;
promise = null;
Expand All @@ -224,21 +230,68 @@ function fromAsyncGen(fn) {
({ promise, resolve } = createDeferredPromise());
yield chunk;
}
}(), { signal });
})();

// Not using try/finally because asyncGenerator.return() should work even
// if the generator was never started.

const originalReturn = asyncGenerator.return;
asyncGenerator.return = async function(value) {
// eslint-disable-next-line node-core/avoid-prototype-pollution
if (ended) return { value, done: true };

const _promise = promise;
promise = null;
const { cb } = await _promise;
process.nextTick(cb);

ended = true;

onAbort(null);
return originalReturn.call(asyncGenerator, value);
};

const originalThrow = asyncGenerator.throw;
asyncGenerator.throw = async function(err) {
if (ended) throw err;

const _promise = promise;
promise = null;
const { cb } = await _promise;
process.nextTick(cb);

ended = true;
error = err;
onAbort(err);

return originalThrow.call(asyncGenerator, err);
};


const value = fn(asyncGenerator, { signal });

return {
value,
write(chunk, encoding, cb) {
const _resolve = resolve;
resolve = null;
_resolve({ chunk, done: false, cb });
if (ended) {
cb(error);
} else {
const _resolve = resolve;
resolve = null;
_resolve({ chunk, done: false, cb });
}
},
final(cb) {
const _resolve = resolve;
resolve = null;
_resolve({ done: true, cb });
if (ended) {
cb(error);
} else {
const _resolve = resolve;
resolve = null;
_resolve({ done: true, cb });
}
},
destroy(err, cb) {
ended = true;
ac.abort();
cb(err);
},
Expand Down
130 changes: 130 additions & 0 deletions test/parallel/test-stream-duplex-from.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const assert = require('assert');
const { Duplex, Readable, Writable, pipeline, PassThrough } = require('stream');
const { ReadableStream, WritableStream } = require('stream/web');
const { Blob } = require('buffer');
const sleep = require('util').promisify(setTimeout);

{
const d = Duplex.from({
Expand Down Expand Up @@ -401,3 +402,132 @@ function makeATestWritableStream(writeFunc) {
assert.strictEqual(d.writable, false);
}));
}

{
const r = Readable.from(['foo', 'bar', 'bar']);
pipeline(
r,
Duplex.from(async function(asyncGenerator) {
// eslint-disable-next-line no-unused-vars
for await (const _ of asyncGenerator);
}),
common.mustSucceed(() => {
assert.strictEqual(r.destroyed, true);
})
);
}

{
const r = Readable.from(['foo', 'bar', 'bar']);
pipeline(
r,
Duplex.from(async function(asyncGenerator) {
// eslint-disable-next-line no-unused-vars
for await (const _ of asyncGenerator) break;
}),
common.mustSucceed(() => {
assert.strictEqual(r.destroyed, true);
})
);
}

{
const r = Readable.from(['foo', 'bar', 'baz']);
pipeline(
r,
Duplex.from(async function(asyncGenerator) {
const a = await asyncGenerator.next();
assert.strictEqual(a.done, false);
assert.strictEqual(a.value.toString(), 'foo');
const b = await asyncGenerator.return();
assert.strictEqual(b.done, true);
}),
common.mustSucceed(() => {
assert.strictEqual(r.destroyed, true);
})
);
}

{
const r = Readable.from(['foo', 'bar', 'baz']);
pipeline(
r,
Duplex.from(async function(asyncGenerator) {
// Note: the generator is not even started at this point
await asyncGenerator.return();
}),
common.mustSucceed(() => {
assert.strictEqual(r.destroyed, true);
})
);
}

{
const r = Readable.from(['foo', 'bar', 'baz']);
pipeline(
r,
Duplex.from(async function(asyncGenerator) {
// Same as before, with a delay
await sleep(100);
await asyncGenerator.return();
}),
common.mustSucceed(() => {
assert.strictEqual(r.destroyed, true);
})
);
}

{
const r = Readable.from(['foo', 'bar', 'baz']);
pipeline(
r,
Duplex.from(async function(asyncGenerator) {}),
common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
assert.strictEqual(r.destroyed, true);
})
);
}

{
const r = Readable.from(['foo', 'bar', 'baz']);
pipeline(
r,
Duplex.from(function(asyncGenerator) {
return sleep(100);
}),
common.mustCall((err) => {
assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
assert.strictEqual(r.destroyed, true);
})
);
}

{
const r = Readable.from(['foo']);
pipeline(
r,
Duplex.from(async function(asyncGenerator) {
await asyncGenerator.throw(new Error('my error'));
}),
common.mustCall((err) => {
assert.strictEqual(err.message, 'my error');
assert.strictEqual(r.destroyed, true);
})
);
}

{
const r = Readable.from(['foo', 'bar']);
pipeline(
r,
Duplex.from(async function(asyncGenerator) {
await asyncGenerator.next();
await asyncGenerator.throw(new Error('my error'));
}),
common.mustCall((err) => {
assert.strictEqual(err.message, 'my error');
assert.strictEqual(r.destroyed, true);
})
);
}

0 comments on commit 094e571

Please sign in to comment.