forked from isaacs/minipass
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Flush buffer before emitting 'data' during write()
It takes quite a convoluted set of streams to reproduce this error, but it's possible to end up in a state where there is a chunk in the buffer, AND we're in a flowing state, during a write() call. Since the assumption was that a flowing state means that the buffer must be empty, this resulted in sending data out of order, breaking the streaming in make-fetch-happen and npm-registry-fetch, due to how cacache was proxying writes to a slow tmp file writer, and how make-fetch-happen was creating a Pipeline to produce the caching response body. Fix: npm/npm-registry-fetch#23 Re: npm/make-fetch-happen@7f896be
- Loading branch information
Showing
3 changed files
with
96 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
// this is a minimal reproduction of a pretty complex interaction between | ||
// minipass-pipeline and a slow-draining proxy stream, which occurred in | ||
// make-fetch-happen. https://github.com/npm/npm-registry-fetch/issues/23 | ||
// The pipeline in question was a wrapper that tee'd data into the cache, | ||
// which is a slow-draining sink stream. When multiple chunks come through, | ||
// the Pipeline's buffer is holding a chunk, but the Pipeline itself is in | ||
// flowing mode. The solution is to always drain the buffer before emitting | ||
// 'data', if there is other data waiting to be emitted. | ||
const Minipass = require('../') | ||
const t = require('tap') | ||
|
||
const src = new Minipass({ encoding: 'utf8' }) | ||
const mid = new Minipass({ encoding: 'utf8' }) | ||
const proxy = new Minipass({ encoding: 'utf8' }) | ||
mid.write = function (chunk, encoding, cb) { | ||
Minipass.prototype.write.call(this, chunk, encoding, cb) | ||
return proxy.write(chunk, encoding, cb) | ||
} | ||
proxy.on('drain', chunk => mid.emit('drain')) | ||
proxy.on('readable', () => setTimeout(() => proxy.read())) | ||
|
||
const dest = new Minipass({ encoding: 'utf8' }) | ||
src.write('a') | ||
src.write('b') | ||
|
||
const pipeline = new (class Pipeline extends Minipass { | ||
constructor (opt) { | ||
super(opt) | ||
dest.on('data', c => super.write(c)) | ||
dest.on('end', () => super.end()) | ||
} | ||
emit (ev, ...args) { | ||
if (ev === 'resume') | ||
dest.resume() | ||
return super.emit(ev, ...args) | ||
} | ||
})({ encoding: 'utf8'}) | ||
|
||
mid.pipe(dest) | ||
src.pipe(mid) | ||
t.test('get all data', t => pipeline.concat().then(d => t.equal(d, 'abcd'))) | ||
src.write('c') | ||
src.write('d') | ||
src.end() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
// if you do s.on('readable', s => s.pipe(d)), then s.write() should return | ||
// true, because even though s is not flowing at the START of the write(), | ||
// it IS flowing by the END of the write call. | ||
|
||
const Minipass = require('../') | ||
const t = require('tap') | ||
|
||
t.test('empty write', async t => { | ||
const s = new Minipass({ encoding: 'utf8' }) | ||
const dest = new Minipass({ encoding: 'utf8' }) | ||
const p = dest.concat().then(d => t.equal(d, 'a', 'got data')) | ||
t.equal(s.write('a'), false, 'first write returns false') | ||
t.equal(s.write(''), false, 'empty write returns false') | ||
s.on('readable', () => s.pipe(dest)) | ||
t.equal(s.flowing, false, 'src is not flowing yet') | ||
t.equal(s.write(''), true, 'return true, now flowing') | ||
s.end() | ||
await p | ||
}) | ||
|
||
t.test('non-empty write', async t => { | ||
const s = new Minipass({ encoding: 'utf8' }) | ||
const dest = new Minipass({ encoding: 'utf8' }) | ||
const p = dest.concat().then(d => t.equal(d, 'ab', 'got data')) | ||
t.equal(s.write('a'), false, 'first write returns false') | ||
t.equal(s.write(''), false, 'empty write returns false') | ||
s.on('readable', () => s.pipe(dest)) | ||
t.equal(s.flowing, false, 'src is not flowing yet') | ||
t.equal(s.write('b'), true, 'return true, now flowing') | ||
s.end() | ||
await p | ||
}) |