Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: unref #48007

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
25 changes: 25 additions & 0 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,29 @@ async function* createAsyncIterator(stream, options) {
}
}

function staticUnref(stream) {
const unrefStream = new Readable({
objectMode: stream.readableObjectMode ?? stream.objectMode ?? true,
// highWaterMark 0 as unref is basically a proxy, so don't consume more data
// as we would lose it when continue consuming from the original stream
highWaterMark: 0,
Comment on lines +1171 to +1173
Copy link
Member Author

@rluvaton rluvaton May 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the following test show why 0 (it's part of the tests suite also)

it('Should continue consuming the original stream data from where the unref stopped', async () => {
  const originalStream = from([1, 2, 3, 4, 5]);

  const firstItem = await unref(originalStream).take(1).toArray();
  deepStrictEqual(firstItem, [1]);

  const restOfData = await originalStream.toArray();
  deepStrictEqual(restOfData, [2, 3, 4, 5]);
});

// TODO - what about other options?
destroy(err, callback) {
// Not destroying the stream here as we unref it.
callback(err);
},
}).wrap(stream);

unrefStream.once('error', (e) => {
if (e.name !== 'AbortError') {
destroyImpl.destroyer(stream, e);
}
});

return unrefStream;
}


// Making it explicit these properties are not enumerable
// because otherwise some prototype manipulation in
// userland will fail.
Expand Down Expand Up @@ -1409,6 +1432,8 @@ Readable.from = function(iterable, opts) {
return from(Readable, iterable, opts);
};

Readable.unref = staticUnref;

let webStreamsAdapters;

// Lazy to avoid circular references
Expand Down
142 changes: 142 additions & 0 deletions test/parallel/test-stream-unref.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
'use strict';
require('../common');

const {
Readable,
pipeline,
PassThrough
} = require('stream');
const { it } = require('node:test');
const { strictEqual, deepStrictEqual } = require('assert');

const { from, unref } = Readable;

const nextTick = () => new Promise((resolve) => process.nextTick(resolve));

it('unref stream error should propagate to original one', async () => {
const originalStream = from([1, 2, 3, 4, 5]);

let emittedError;
originalStream.on('error', (e) => {
emittedError = e;
});
const unrefStream = unref(originalStream);

const thrownError = new Error('test');

unrefStream.destroy(thrownError);

await nextTick();
strictEqual(unrefStream.destroyed, true);
strictEqual(originalStream.destroyed, true);

// Need another tick to propagate the error
await nextTick();
strictEqual(emittedError, thrownError);
});

it('Original stream error should propagate to unref one', async () => {
const originalStream = from([1, 2, 3, 4, 5]);
const unrefStream = unref(originalStream);

let emittedError;
unrefStream.on('error', (e) => {
emittedError = e;
});

const thrownError = new Error('test');

originalStream.destroy(thrownError);

await nextTick();
strictEqual(unrefStream.destroyed, true);
strictEqual(originalStream.destroyed, true);

await nextTick();
strictEqual(emittedError, thrownError);
});

it('Should not close original stream when unref one finished but not consumed all data', async () => {
const originalStream = from([1, 2, 3, 4, 5]);

const unrefStream = unref(originalStream);

// eslint-disable-next-line no-unused-vars
for await (const _ of unrefStream) {
break;
}

await nextTick();
strictEqual(unrefStream.destroyed, true);
strictEqual(originalStream.destroyed, false);
});

it('Should continue consuming the original stream data from where the unref stopped', async () => {
const originalStream = from([1, 2, 3, 4, 5]);

const firstItem = await unref(originalStream).take(1).toArray();
deepStrictEqual(firstItem, [1]);

const restOfData = await originalStream.toArray();
deepStrictEqual(restOfData, [2, 3, 4, 5]);
});

it('Should close original stream when unref one consume all data', async () => {
const originalStream = from([1, 2, 3, 4, 5]);

const unrefStream = unref(originalStream);

const data = await unrefStream.toArray();
deepStrictEqual(data, [1, 2, 3, 4, 5]);

await nextTick();

strictEqual(unrefStream.destroyed, true);
strictEqual(originalStream.destroyed, true);
});

it('original stream close should close unref one', async () => {
const originalStream = from([1, 2, 3, 4, 5]);
const unrefStream = unref(originalStream);

await originalStream.toArray();

strictEqual(originalStream.destroyed, true);
strictEqual(unrefStream.destroyed, true);
});

it('make sure not leaking memory', async () => {
function getMemoryAllocatedInMB() {
return Math.round(process.memoryUsage().rss / 1024 / 1024 * 100) / 100;
}

const bigData = () => from(async function* () {
const obj = Array.from({ length: 100000 }, () => (Array.from({ length: 15 }, (_, i) => i)));
while (true) {
yield obj.map((item) => item.slice(0));
await new Promise((resolve) => setTimeout(resolve, 1));
}
}());

const originalStream = pipeline(bigData(), new PassThrough({ objectMode: true }), () => {
});
unref(originalStream);
originalStream.iterator({ destroyOnReturn: true });

// Making sure some data passed so we won't catch something that is related to the infra
const iterator = originalStream.iterator({ destroyOnReturn: true });
for (let j = 0; j < 10; j++) {
await iterator.next();
}

const currentMemory = getMemoryAllocatedInMB();

for (let j = 0; j < 10; j++) {
await iterator.next();
}

const newMemory = getMemoryAllocatedInMB();

originalStream.destroy(null);
strictEqual(newMemory - currentMemory < 100, true, `After consuming 10 items the memory increased by ${Math.floor(newMemory - currentMemory)}MB`);
});