Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: unref #48007

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open

stream: unref #48007

wants to merge 11 commits into from

Conversation

rluvaton
Copy link
Member

@rluvaton rluvaton commented May 14, 2023

Current WIP unref implementation that was talked about in:

Cc @benjamingr, @ronag WDYT?


This would allow the following code:

const csvParsedStream = fs
	.createReadStream('file.csv')
	.compose(csvParse({ columns: false }));

const [columns] = await unref(csvParsedStream)
	.take(1)
	.toArray();

const parsed = await csvParsedStream.map((row) => parseRowByColumns(row, columns)).toArray();

@nodejs-github-bot
Copy link
Collaborator

Review requested:

  • @nodejs/streams

@nodejs-github-bot nodejs-github-bot added the needs-ci PRs that need a full CI run. label May 14, 2023
Comment on lines 1174 to 1176
// stream.once('close', () => {
// destroyImpl.destroyer(newStream, null);
// });
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uncommenting this fixes the original stream close should close unref one test but fails the Should close original stream when unref one consume all data test because the newStream is not finished yet for some reason so it will get an abort error

@mscdex mscdex added the wip Issues and PRs that are still a work in progress. label May 14, 2023
@ronag
Copy link
Member

ronag commented May 14, 2023

I'm lost. What does this do and what problem does it solve? I don't follow the relation to #46980.

@rluvaton
Copy link
Member Author

The purpose for this is to allow using operators without closing the original stream, this started as the need to read only 1 line from a CSV file using take and then use map to iterate over the records, then @benjamingr said that this is the second time something like this is needed (destroyOnReturn in readable iterator option was the first one) so let's create something generic

@ronag
Copy link
Member

ronag commented May 14, 2023

I'm very split on this. Not sure what to think atm.

Comment on lines +1160 to +1162
// highWaterMark 0 as unref is basically a proxy, so don't consume more data
// as we would lose it when continue consuming from the original stream
highWaterMark: 0,
Copy link
Member Author

@rluvaton rluvaton May 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the following test show why 0 (it's part of the tests suite also)

it('Should continue consuming the original stream data from where the unref stopped', async () => {
  const originalStream = from([1, 2, 3, 4, 5]);

  const firstItem = await unref(originalStream).take(1).toArray();
  deepStrictEqual(firstItem, [1]);

  const restOfData = await originalStream.toArray();
  deepStrictEqual(restOfData, [2, 3, 4, 5]);
});

@rluvaton
Copy link
Member Author

rluvaton commented May 15, 2023

I'm very split on this. Not sure what to think atm.

With the implementation or the feature?

Copy link
Member

@mcollina mcollina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately calling unref() on a stream could result on a out of memory error if the following conditions are met:

  1. the source stream is reading enough data that would not fit the current process
  2. the unref' stream is never consumed
  3. the source stream is fully consumed

The result would be that all data is accumulated inside the unref stream.

The solution for this is to implement what https://github.com/mcollina/cloneable-readable does: the stream is consumed at the speed of its slowest consumer.

If anybody is willing to do the work, I'm happy for cloneable-readable to be included in Node.js core.

Copy link
Member

@mcollina mcollina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately calling unref() on a stream could result on a out of memory error if the following conditions are met:

  1. the source stream is reading enough data that would not fit the current process
  2. the unref' stream is never consumed
  3. the source stream is fully consumed

The result would be that all data is accumulated inside the unref stream.

The solution for this is to implement what https://github.com/mcollina/cloneable-readable does: the stream is consumed at the speed of its slowest consumer.

If anybody is willing to do the work, I'm happy for cloneable-readable to be included in Node.js core.

@rluvaton
Copy link
Member Author

rluvaton commented May 15, 2023

I see why it happen here, thanks!

it('test memory', async () => {
  const naturals = () => from(async function*() {
    let i = 1;
    while (true) {
      yield i++;
      await new Promise((resolve) => setTimeout(resolve, 1));
    }
  }());
  const originalStream = pipeline(naturals(), new PassThrough(), () => {});
  const unrefStream = unref(originalStream);

  setInterval(() => {
    const formatMemoryUsage = (data) => `${Math.round(data / 1024 / 1024 * 100) / 100} MB`;

    const memoryData = process.memoryUsage();

    const memoryUsage = {
      rss: `${formatMemoryUsage(memoryData.rss)} -> Resident Set Size - total memory allocated for the process execution`,
      heapTotal: `${formatMemoryUsage(memoryData.heapTotal)} -> total size of the allocated heap`,
      heapUsed: `${formatMemoryUsage(memoryData.heapUsed)} -> actual memory used during the execution`,
      external: `${formatMemoryUsage(memoryData.external)} -> V8 external memory`,
    };

    console.log(memoryUsage);
  }, 200);
  for await(const _ of originalStream) {
  }
});

@rluvaton
Copy link
Member Author

The solution for this is to implement what mcollina/cloneable-readable does: the stream is consumed at the speed of its slowest consumer.

Maybe I'm missing something but I don't think it answer the need of this PR:

I can't find a way to make this work using the cloneable-readable because you must pipe all clones:
Should continue consuming the original stream data from where the unref stopped

the unref one does this:

it('Should continue consuming the original stream data from where the unref stopped', async () => {
  const originalStream = from([1, 2, 3, 4, 5]);

  const firstItem = await unref(originalStream).take(1).toArray();
  deepStrictEqual(firstItem, [1]);

  const restOfData = await originalStream.toArray();
  deepStrictEqual(restOfData, [2, 3, 4, 5]);
});

what is the equivalent in cloneable-readable

Copy link
Member

@mcollina mcollina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! I understand. That kind of API is hazardous and a source of massive memory leaks. It should be avoided at all cost: if you do not call toArray() or .take(), it will accumulate all data if the stream is piped to another destination, creating a memory leak.

What we want is something like:

async * function take ({ stream, num, destroyOnReturn: true }) {
 for await (let chunk of stream.iterator({ destroyOnReturn })) {
    yield chunk
    if (--num <= 0) {
      break
    }
  }
}

Note that all the async iterators helpers would be available at that point because they will be part of the async iterator protocol, so everything should work as expected.

@rluvaton
Copy link
Member Author

That was the original PR

Unref shouldn't be problematic in case we make it a proxy - no chunk saved

@mcollina
Copy link
Member

what PR?

@rluvaton
Copy link
Member Author

what PR?

@mcollina #47023

@mcollina
Copy link
Member

I don't think there is any other way to implement this feature rather than that.

@rluvaton
Copy link
Member Author

I'm really confused, so what to do with the original pr and this one?

@mcollina, @ronag @benjamingr

@benjamingr
Copy link
Member

@mcollina the ask is for a more generic stream.iterator which is "give me a copy of this stream but don't take ownership of it, so I can consume parts of it in a for await or with iterator helpers and it wouldn't close it".

Doing .take without closing is a special case of doing it and so is doing stream.iterator.

@mcollina
Copy link
Member

Here are a few additional requirements as laid out in the various issues:

  1. no out-of-the-box memory leaks (no full stream accumulation if not consumed)
  2. take could be called after the stream start flowing
  3. multiple destinations should be possible, e.g. we can "take" a few bytes while the stream is piped to another downstream.
  4. no spec changes in .take()

These are a conflicting set of requirements. Given that the only objection to #47023 is about the an in-flux spec, the only solution I see is to add a nextN accessor with the behavior we want.

@benjamingr
Copy link
Member

benjamingr commented May 22, 2023

The problem is that then you'd have to add a dropN as well rather than have a solution for "I want to work with this stream and explicitly opt out of the destroying behavior of a method".

no out-of-the-box memory leaks (no full stream accumulation if not consumed)
take could be called after the stream start flowing
multiple destinations should be possible, e.g. we can "take" a few bytes while the stream is piped to another downstream.

That's doable, you basically can do:

function unref(stream) {
  const unrefed = Object.create(stream); // so we have all the methdos
  unrefed.destroy = () => {}; // but calling .destroy does nothing
  // in a future iteration we can also keep track of them and notify of leaks: that is if .destroy is called
  // on the unrefed stream but never on the source stream. 
  return unrefed;
}

(edit: and probably call the callback, and intentionally not _destroy)

@mcollina
Copy link
Member

I'm a bit scared of the possible side effects of that, but it might work.

@benjamingr
Copy link
Member

@mcollina can you elaborate on the potential side effects so we know we're aware of all dangers when considering the feature?

(other than obviously well, explicitly opting out of being able to destroy a stream)

@mcollina
Copy link
Member

@mcollina can you elaborate on the potential side effects so we know we're aware of all dangers when considering the feature?

EventEmitter store things attached to the object:

const EE = require('events');

const a = new EE()
a.foo = 'bar'

const b = Object.create(a)
b.foo = 'baz'

a.on('event', function () {
  console.log(this.foo)
})

a.emit('event') // bar
b.emit('event') // baz

@benjamingr
Copy link
Member

Oh you meant the fact it's doing prototypical inheritance? That was just an example I am content with any solution really for example - with a proxy that forwards all calls except destroy to the original stream and overrides just destroy.

I see the footgun with the Object.create implementation with regards to setters potentially though (and them not impacting the original). Would a proxy resolve that in your opinion?

@mcollina
Copy link
Member

I think the proxy solution would suffer from the same problem of the prototypical inheritance.

I don't understand why using the async iterator is problematic (minus the spec issue)

@benjamingr
Copy link
Member

I think the proxy solution would suffer from the same problem of the prototypical inheritance.

Why? Setting a property would set it on the target (the stream) directly

I don't understand why using the async iterator is problematic (minus the spec issue)

You mean readable.iterator({ destroyOnReturn: false }).take(3) (when that eventually lands)? The issue is that works on an async iterator so in order to get a stream back you would have to convert it to a readable again, which is very roundabout and footgunny on its own.

@mcollina
Copy link
Member

Why? Setting a property would set it on the target (the stream) directly

I think there would be some slight issue with instanceof inside events handlers.

You mean readable.iterator({ destroyOnReturn: false }).take(3) (when that eventually lands)? The issue is that works on an async iterator so in order to get a stream back you would have to convert it to a readable again, which is very roundabout and footgunny on its own.

That seems the one that have fewer footguns and potential issues down the road. It would also be the slowest one.
Anyhow, any implementation that are based on for await or an active on('readable')/read() cycle would prevent the memory leak from happening.

@benjamingr
Copy link
Member

Actually doing a proxy wouldn't work precisely because we need polymorphism for this to work (since the destroy call wouldn't go to the proxy). We need to combine both I guess.

@benjamingr
Copy link
Member

I need to think more about this

@rluvaton
Copy link
Member Author

rluvaton commented Jun 9, 2023

tried something ignore the last commit

@rluvaton
Copy link
Member Author

rluvaton commented Jun 9, 2023

Unfortunately calling unref() on a stream could result on a out of memory error if the following conditions are met:

  1. the source stream is reading enough data that would not fit the current process
  2. the unref' stream is never consumed
  3. the source stream is fully consumed

The result would be that all data is accumulated inside the unref stream.

The solution for this is to implement what mcollina/cloneable-readable does: the stream is consumed at the speed of its slowest consumer.

If anybody is willing to do the work, I'm happy for cloneable-readable to be included in Node.js core.

Actually I checked and you have the same memory leak if you use pipeline to other stream but never consume it and only consume the original stream, here's an example:

'use strict';
require('../common');

const {
  Readable,
  pipeline,
  PassThrough
} = require('stream');
const { it } = require('node:test');
const { strictEqual } = require('assert');
const { from } = Readable;

it('make sure not leaking memory', async () => {
  function getMemoryAllocatedInMB() {
    return Math.round(process.memoryUsage().rss / 1024 / 1024 * 100) / 100;
  }

  const bigData = () => from(async function* () {
    const obj = Array.from({ length: 100000 }, () => (Array.from({ length: 15 }, (_, i) => i)));
    while (true) {
      yield obj.map((item) => item.slice(0));
      await new Promise((resolve) => setTimeout(resolve, 1));
    }
  }());

  const originalStream = pipeline(bigData(), new PassThrough({ objectMode: true }), () => {
  });
  pipeline(originalStream, new PassThrough({ objectMode: true }), () => {
  });
  originalStream.iterator({ destroyOnReturn: true });

  // Making sure some data passed so we won't catch something that is related to the infra
  const iterator = originalStream.iterator({ destroyOnReturn: true });
  for (let j = 0; j < 10; j++) {
    await iterator.next();
  }

  const currentMemory = getMemoryAllocatedInMB();

  for (let j = 0; j < 10; j++) {
    await iterator.next();
  }

  const newMemory = getMemoryAllocatedInMB();

  originalStream.destroy(null);
  strictEqual(newMemory - currentMemory < 100, true, `After consuming 10 items the memory increased by ${Math.floor(newMemory - currentMemory)}MB`);
});

in that case, how is unref any different than pipeline?

@rluvaton
Copy link
Member Author

Making it a proxy (not doing any buffering) is near impossible in the current design...

@rluvaton
Copy link
Member Author

Ping @mcollina

@mcollina
Copy link
Member

Actually I checked and you have the same memory leak if you use pipeline to other stream but never consume it and only consume the original stream.

Your example is mixing on('data') (used by .pipe() and pipeline) and on('readable') events. on('readable') takes precedence, meaning that the backpressure generated by the PassThrough (via the false value returned by .write()) will be disregarded.

in that case, how is unref any different than pipeline?

Currently, if you consume your streams using only async iterators and the new APIs, you are shielded from all those problems. The whole goal of supporting unref() (in this form) is to make forking the stream easier. I'm -1 in adding new hazards in streams, all these new APIs should be safe to use.

@benjamingr
Copy link
Member

I think this is currently blocked on us not yet figuring out how to actually do it correctly. Both the proxy solution and the prototype solution don't work particularly well.

@rluvaton
Copy link
Member Author

We can recreate a readable that is actually a proxy without intermediate state

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs-ci PRs that need a full CI run. wip Issues and PRs that are still a work in progress.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants