-
Notifications
You must be signed in to change notification settings - Fork 31k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream: unref #48007
base: main
Are you sure you want to change the base?
stream: unref #48007
Conversation
Review requested:
|
lib/internal/streams/readable.js
Outdated
// stream.once('close', () => { | ||
// destroyImpl.destroyer(newStream, null); | ||
// }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uncommenting this fixes the original stream close should close unref one
test but fails the Should close original stream when unref one consume all data
test because the newStream
is not finished yet for some reason so it will get an abort error
I'm lost. What does this do and what problem does it solve? I don't follow the relation to #46980. |
The purpose for this is to allow using operators without closing the original stream, this started as the need to read only 1 line from a CSV file using |
I'm very split on this. Not sure what to think atm. |
// highWaterMark 0 as unref is basically a proxy, so don't consume more data | ||
// as we would lose it when continue consuming from the original stream | ||
highWaterMark: 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the following test show why 0 (it's part of the tests suite also)
it('Should continue consuming the original stream data from where the unref stopped', async () => {
const originalStream = from([1, 2, 3, 4, 5]);
const firstItem = await unref(originalStream).take(1).toArray();
deepStrictEqual(firstItem, [1]);
const restOfData = await originalStream.toArray();
deepStrictEqual(restOfData, [2, 3, 4, 5]);
});
With the implementation or the feature? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately calling unref()
on a stream could result on a out of memory error if the following conditions are met:
- the source stream is reading enough data that would not fit the current process
- the
unref
' stream is never consumed - the source stream is fully consumed
The result would be that all data is accumulated inside the unref
stream.
The solution for this is to implement what https://github.com/mcollina/cloneable-readable does: the stream is consumed at the speed of its slowest consumer.
If anybody is willing to do the work, I'm happy for cloneable-readable to be included in Node.js core.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately calling unref()
on a stream could result on a out of memory error if the following conditions are met:
- the source stream is reading enough data that would not fit the current process
- the
unref
' stream is never consumed - the source stream is fully consumed
The result would be that all data is accumulated inside the unref
stream.
The solution for this is to implement what https://github.com/mcollina/cloneable-readable does: the stream is consumed at the speed of its slowest consumer.
If anybody is willing to do the work, I'm happy for cloneable-readable to be included in Node.js core.
I see why it happen here, thanks! it('test memory', async () => {
const naturals = () => from(async function*() {
let i = 1;
while (true) {
yield i++;
await new Promise((resolve) => setTimeout(resolve, 1));
}
}());
const originalStream = pipeline(naturals(), new PassThrough(), () => {});
const unrefStream = unref(originalStream);
setInterval(() => {
const formatMemoryUsage = (data) => `${Math.round(data / 1024 / 1024 * 100) / 100} MB`;
const memoryData = process.memoryUsage();
const memoryUsage = {
rss: `${formatMemoryUsage(memoryData.rss)} -> Resident Set Size - total memory allocated for the process execution`,
heapTotal: `${formatMemoryUsage(memoryData.heapTotal)} -> total size of the allocated heap`,
heapUsed: `${formatMemoryUsage(memoryData.heapUsed)} -> actual memory used during the execution`,
external: `${formatMemoryUsage(memoryData.external)} -> V8 external memory`,
};
console.log(memoryUsage);
}, 200);
for await(const _ of originalStream) {
}
}); |
Maybe I'm missing something but I don't think it answer the need of this PR: I can't find a way to make this work using the cloneable-readable because you must pipe all clones: the unref one does this: it('Should continue consuming the original stream data from where the unref stopped', async () => {
const originalStream = from([1, 2, 3, 4, 5]);
const firstItem = await unref(originalStream).take(1).toArray();
deepStrictEqual(firstItem, [1]);
const restOfData = await originalStream.toArray();
deepStrictEqual(restOfData, [2, 3, 4, 5]);
}); what is the equivalent in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah! I understand. That kind of API is hazardous and a source of massive memory leaks. It should be avoided at all cost: if you do not call toArray()
or .take()
, it will accumulate all data if the stream is piped to another destination, creating a memory leak.
What we want is something like:
async * function take ({ stream, num, destroyOnReturn: true }) {
for await (let chunk of stream.iterator({ destroyOnReturn })) {
yield chunk
if (--num <= 0) {
break
}
}
}
Note that all the async iterators helpers would be available at that point because they will be part of the async iterator protocol, so everything should work as expected.
That was the original PR Unref shouldn't be problematic in case we make it a proxy - no chunk saved |
what PR? |
I don't think there is any other way to implement this feature rather than that. |
I'm really confused, so what to do with the original pr and this one? |
@mcollina the ask is for a more generic Doing |
Here are a few additional requirements as laid out in the various issues:
These are a conflicting set of requirements. Given that the only objection to #47023 is about the an in-flux spec, the only solution I see is to add a |
The problem is that then you'd have to add a
That's doable, you basically can do: function unref(stream) {
const unrefed = Object.create(stream); // so we have all the methdos
unrefed.destroy = () => {}; // but calling .destroy does nothing
// in a future iteration we can also keep track of them and notify of leaks: that is if .destroy is called
// on the unrefed stream but never on the source stream.
return unrefed;
} (edit: and probably call the callback, and intentionally not _destroy) |
I'm a bit scared of the possible side effects of that, but it might work. |
@mcollina can you elaborate on the potential side effects so we know we're aware of all dangers when considering the feature? (other than obviously well, explicitly opting out of being able to destroy a stream) |
const EE = require('events');
const a = new EE()
a.foo = 'bar'
const b = Object.create(a)
b.foo = 'baz'
a.on('event', function () {
console.log(this.foo)
})
a.emit('event') // bar
b.emit('event') // baz |
Oh you meant the fact it's doing prototypical inheritance? That was just an example I am content with any solution really for example - with a proxy that forwards all calls except destroy to the original stream and overrides just destroy. I see the footgun with the Object.create implementation with regards to setters potentially though (and them not impacting the original). Would a proxy resolve that in your opinion? |
I think the proxy solution would suffer from the same problem of the prototypical inheritance. I don't understand why using the async iterator is problematic (minus the spec issue) |
Why? Setting a property would set it on the target (the stream) directly
You mean |
I think there would be some slight issue with instanceof inside events handlers.
That seems the one that have fewer footguns and potential issues down the road. It would also be the slowest one. |
Actually doing a proxy wouldn't work precisely because we need polymorphism for this to work (since the destroy call wouldn't go to the proxy). We need to combine both I guess. |
I need to think more about this |
tried something ignore the last commit |
Actually I checked and you have the same memory leak if you use pipeline to other stream but never consume it and only consume the original stream, here's an example: 'use strict';
require('../common');
const {
Readable,
pipeline,
PassThrough
} = require('stream');
const { it } = require('node:test');
const { strictEqual } = require('assert');
const { from } = Readable;
it('make sure not leaking memory', async () => {
function getMemoryAllocatedInMB() {
return Math.round(process.memoryUsage().rss / 1024 / 1024 * 100) / 100;
}
const bigData = () => from(async function* () {
const obj = Array.from({ length: 100000 }, () => (Array.from({ length: 15 }, (_, i) => i)));
while (true) {
yield obj.map((item) => item.slice(0));
await new Promise((resolve) => setTimeout(resolve, 1));
}
}());
const originalStream = pipeline(bigData(), new PassThrough({ objectMode: true }), () => {
});
pipeline(originalStream, new PassThrough({ objectMode: true }), () => {
});
originalStream.iterator({ destroyOnReturn: true });
// Making sure some data passed so we won't catch something that is related to the infra
const iterator = originalStream.iterator({ destroyOnReturn: true });
for (let j = 0; j < 10; j++) {
await iterator.next();
}
const currentMemory = getMemoryAllocatedInMB();
for (let j = 0; j < 10; j++) {
await iterator.next();
}
const newMemory = getMemoryAllocatedInMB();
originalStream.destroy(null);
strictEqual(newMemory - currentMemory < 100, true, `After consuming 10 items the memory increased by ${Math.floor(newMemory - currentMemory)}MB`);
}); in that case, how is unref any different than pipeline? |
Making it a proxy (not doing any buffering) is near impossible in the current design... |
Ping @mcollina |
Your example is mixing
Currently, if you consume your streams using only async iterators and the new APIs, you are shielded from all those problems. The whole goal of supporting |
I think this is currently blocked on us not yet figuring out how to actually do it correctly. Both the proxy solution and the prototype solution don't work particularly well. |
We can recreate a readable that is actually a proxy without intermediate state |
Current WIP unref implementation that was talked about in:
Readable.take
operator to not close the stream #46980Cc @benjamingr, @ronag WDYT?
This would allow the following code: