Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proper way to clone a stream? #202

Closed
phated opened this issue Apr 15, 2016 · 19 comments
Closed

proper way to clone a stream? #202

phated opened this issue Apr 15, 2016 · 19 comments

Comments

@phated
Copy link

phated commented Apr 15, 2016

I'm sorry if this is a support question but I've dug through everything I could find + reached out to people on twitter.

We've had an issue reported on the vinyl repo for quite some time that I am unable to solve: gulpjs/vinyl#55

The basics of the issue is that we are currently "cloning" a stream by T-ing it to another through stream but that seems to put it into paused mode.

Is there a proper way to clone a stream pipeline (and keep all the streams in the pipeline)?

@mafintosh
Copy link
Member

Could you provide an example that reproduces this error? My guess is that if your streams ends up in paused mode you probably aren't draining your cloning stream.

@mcollina
Copy link
Member

Let's just understand what clone means, because it is definitely not in the stream terminology (and in OO-world it means cloning the object, which I don't think it's the case).

Reading the original, it seems you want to pipe the same source stream to two destinations, and you want backpressure handled. Is it correct?

Would you prefer the conversation to continue here or there?

Anyway, this is the example ported from coffee-script:

var fs = require('fs')
var stream = require('stream')
var contents = fs.createReadStream('./bigfile') // greater than 65KB
var stream1 = contents.pipe(new stream.PassThrough())
var stream2 = contents.pipe(new stream.PassThrough())

stream1.on('data', function (data) { console.log('s1', data.length) })
stream1.on('end', function () {
  stream2.on('data', function (data) { console.log('s2', data.length) })
})

which is broken on node v0.10 and v0.12, but it is working as expected on node v4 and v5 (streams 3).

@phated
Copy link
Author

phated commented Apr 18, 2016

@mcollina oh, that's interesting. I didn't realize streams 3 changed (fixed?) the way this was handled.

Specifically, we are cloning a Vinyl object (which represents an in memory file) using file.clone(). The contents property of the vinyl object can be a buffer (easy to clone), null (no need to clone) or a stream.

Someone could do something like:

var file = new Vinyl({
  contents: fs.createReadStream('./somefile').pipe(gzip())
});
var file2 = file.clone();

Currently we are piping that same source to 2 destinations but the original reporter of the issue says the contents stream is not drained when we write to the filesystem (in the gulp.dest() stream of the original issue).

If this is fixed in streams 3, is there a way for us to wrap or convert a stream created by no 0.10 or 0.12 into streams 3? We use through2 and/or readable-stream (although they need to be update) almost everywhere except the fs calls.

@mcollina
Copy link
Member

So, this works in node v4 and v5 (not in 0.10 or 0.12):

var Vinyl = require('vinyl')
var fs = require('fs')
var gzip = require('zlib').createGzip
var file = new Vinyl({
    contents: fs.createReadStream('../node.tar')
});
var file2 = file.clone();
file2.pipe(gzip()).pipe(fs.createWriteStream('./cloned'))

But this don't:

var Vinyl = require('vinyl')
var fs = require('fs')
var gzip = require('zlib').createGzip
var file = new Vinyl({
    contents: fs.createReadStream('../node.tar').pipe(gzip())
});
var file2 = file.clone();
file2.pipe(fs.createWriteStream('./cloned'))
// uncommenting the following line makes everything work
// file.pipe(fs.createWriteStream('./other'))

I would also note that by not piping file somewhere, you are creating a memory leak because of https://github.com/gulpjs/vinyl/blob/master/index.js#L70. Why? Also note that removing that line makes everything work again in the given example.

The following works everywhere:

var through2 = require('through2')
var Vinyl = require('vinyl')
var fs = require('fs')
var gzip = require('zlib').createGzip
var count = 0
var file = new Vinyl({
    contents: fs.createReadStream('../node.tar').pipe(through2(function (buf, enc, cb) {
      count += buf.length
      this.push(buf)
      cb()
    }))
});
var file2 = file.clone();
file2.pipe(fs.createWriteStream('./cloned2'))

It seems this is a bug in zlib.createGzip(), which I guess it was not ported to stream 3 (or something more nasty). Can anyone else have a look and confirm my suspicion? @phated do you know of other cases like this, or gzip is a special one?

BTW, this issue is extremely interesting.

(I think Vinyl should depend on readable-stream to normalize all the behavior regarding streams anyway).

@tunnckoCore
Copy link

I would also note that by not piping file somewhere, you are creating a memory leak because of https://github.com/gulpjs/vinyl/blob/master/index.js#L70.

Yea and that's the whole thing leading to this bugs. I can't get why not just remove it?

I think Vinyl should depend on readable-stream to normalize all the behavior regarding streams anyway

Absolutely.

It seems this is a bug in zlib.createGzip()

In your last example you are not using gzip anywhere. What you mean with "this is a bug in"?

@phated
Copy link
Author

phated commented Apr 26, 2016

@mcollina thanks for all your advice on this. I think I am on the verge of having something working. actually, scratch that, by removing https://github.com/gulpjs/vinyl/blob/master/index.js#L70, everything breaks on newer node versions.

@phated
Copy link
Author

phated commented Apr 26, 2016

In the context of these Vinyl objects being passed through the vinyl-fs stream, and the vfs.dest method piping the content to an fs.createWriteStream(), if both streams aren't piped to passthrough streams in the clone method, only 1 stream successfully writes.

@phated
Copy link
Author

phated commented Apr 26, 2016

The example being:

'use strict';

var vfs = require('vinyl-fs');
var through = require('through2');

vfs.src('../node.tar', { buffer: false })
  .pipe(through.obj(function(file, enc, next) {
    var clone = file.clone();
    clone.stem = 'cloned';
    this.push(clone);

    next(null, file);
  }))
  .pipe(vfs.dest('./dest'))
  .on('end', function() {
      console.log('done!');
  });

If you use the latest vinyl-fs in the above example on node >= 4, it works fine but if you remove Line 70 from the vinyl dependency, it no longer works.

@tunnckoCore
Copy link

tunnckoCore commented Apr 26, 2016

on node >= 4, it works fine but if you remove Line 70 from the vinyl dependency, it no longer works.

I don't know what means "it works fine" here, because on node 5 and if L70 is removed, it only populates one file (because there are created two files in ./dest - cloned, and original) and the original is empty.

And end is not called if we remove L70, even on node 5, I don't know if that's correct.

@phated
Copy link
Author

phated commented Apr 26, 2016

@tunnckoCore yep, you are seeing the same thing as me. Line 70 has to stay.

@tunnckoCore
Copy link

It seems using through2 it works? Huh. Tested in 0.12 and 5.8

var contents;
if (this.isStream()) {
  contents = this.contents.pipe(through2())
  this.contents = this.contents.pipe(through2())
}

@mcollina
Copy link
Member

Folks, it's really hard to solve this problem from vinyl point of view. If it's a problem in one (or more) versions of streams, or in another submodules, we need to tackle those separately. Even in they are specific to a node version lines.

Using vinyl as an example makes things way more complicated for us to help you (we need to dig into Vinyl code) and there is the chance that the actual way you are using streams might not be the correct way to use them. All of this looks like heisenbugs.

@tunnckoCore this is the example about gzip that I was talking about:

var Vinyl = require('vinyl')
var fs = require('fs')
var gzip = require('zlib').createGzip
var file = new Vinyl({
    contents: fs.createReadStream('../node.tar').pipe(gzip())
});
var file2 = file.clone();
file2.pipe(fs.createWriteStream('./cloned'))

@phated is the orignal problem only related to gzip in node v4+/streams 3? Or is there something else this issue manifest itself using node v4+ (streams 3). If that is the case, the bug its on the gzip side rather than the vinyl or stream.
I don't think the problem is easily solvable in node < 4, because the issue you are describing is the very reason streams 3 were written.

@phated regarding the "double through", I still think it's a really bad idea, because you are possibly slowing everything down (a chain of pipes has some overhead) or creating a memory leak (if one of those cloned Vinyl object is not collected properly). I think the best way to handle this is to delay creating the new streams up to the moment it's needed, rather than inside clone. However all of this is not directly related to this bug, and more an optimization/safer way of doing the same thing that might solve your problem as well.

@phated
Copy link
Author

phated commented Apr 26, 2016

@mcollina nothing is related to gzip. This problem occurs when contents is simply becoming fs.createReadStream().pipe(new Stream.PassThrough()).pipe(fs.createWriteStream()). However, this is done as stream of objects that contain streams. The problem with removing the "double through" is that the original stream is flushed (might be the wrong terminology here) before it is actually piped to the write stream, since it is done asynchronously. So maybe the simple example would be to use some setTimeouts instead of streams of streams, what do you think?

@phated
Copy link
Author

phated commented Apr 26, 2016

@mcollina I'm also very interest in the pattern of delaying the creation until it is needed. Do you have some examples of that pattern?

@mcollina
Copy link
Member

@phated the first example in #202 (comment) without gzip in contents works ok in node v4 and v5. Please confirm, if the problem is there you should be able to replicate this just by using stream classes.

@phated more a general idea, and possibly a major bump in vinyl. Basically what you need is for the original entry to keep a reference on all the clones, and vice versa. Then you start flowing when all the clones have been piped (possibly in the nextTick, just in case there is one more clone coming). You will need to error cloning a vinyl file entry that has a flowing stream as well. What I mean is that file.pipe does not pipe straight away, but accumulates the destinations, and then pipe all of them in the same tick when all the clones are there. Let's move this discussion in vinyl repo. I'll be happy to review or contribute.

@phated
Copy link
Author

phated commented Apr 26, 2016

@mcollina with no changes to vinyl or vinyl-fs, all of my tests & samples seem to work on node v4 and v5. So I think Streams3 outright fixed this problem. I'll open a new issue on the vinyl repo about a potential memory leak and cc you. Thanks again for all the help.

@joshxyzhimself
Copy link

https://www.npmjs.com/package/readable-stream-clone looks good

@mcollina
Copy link
Member

I would recommend against using that, as it leaks memory.

http://npm.im/cloneable-readable is what I recommend.

@mfrye
Copy link

mfrye commented Oct 16, 2020

I second this. readable-stream-clone has a bad memory leak.

I would recommend against using that, as it leaks memory.

http://npm.im/cloneable-readable is what I recommend.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants