Description
Version
v20.2.0
Platform
Darwin MacBook-Air.local 19.6.0 Darwin Kernel Version 19.6.0: Mon Aug 31 22:12:52 PDT 2020; root:xnu-6153.141.2~1/RELEASE_X86_64 x86_64
Subsystem
stream
What steps will reproduce the bug?
Run this with ts-jest
:
import { Readable, Transform, TransformCallback } from 'stream';
import { pipeline } from 'stream/promises';
import NullWritable from 'null-writable';
describe('stream.Transform', () => {
it('should emit drain events while flushing', async () => {
const elements = range(3);
const transform = new DelayTransform()
const sink = new NullWritable({ objectMode: true, highWaterMark: 1 })
sink.cork();
const testPipeline = pipeline(
Readable.from(elements, { highWaterMark: 1 }),
transform,
sink,
);
await new Promise(resolve => setTimeout(resolve, 10));
console.log('sink.uncork()');
sink.uncork();
await testPipeline;
});
});
class DelayTransform extends Transform {
private readonly storedElements: any = [];
constructor() {
super({ objectMode: true, highWaterMark: 1 });
this.on('data', (d) => { console.log(`emit '${d}'`); });
this.on('drain', () => { console.log('drain'); });
}
_transform(chunk: any, encoding: string | null, callback: TransformCallback) {
this.storedElements.push(chunk);
callback();
}
_flush(callback: TransformCallback) {
this.flushStoredElements()
.then(() => callback())
.catch((e) => callback(e));
}
private async flushStoredElements(): Promise<void> {
for (const item of this.storedElements) {
await this.safePush(item);
}
}
/** Push and resolve when it is safe to push again */
private safePush(item: string): Promise<void> {
console.log(`push '${item}'`);
if (this.push(item)) {
console.log('continue');
return Promise.resolve();
} else {
console.log('wait');
return new Promise(resolve => {
console.log('setting drain listener');
this.once('drain', () => {
console.log('resuming');
resolve();
});
});
}
}
}
function range(n: number): number[] {
const elements = [];
for (let i = 0; i < n; i++) {
elements.push(i);
}
return elements;
}
How often does it reproduce? Is there a required condition?
100% reproducible with Jest.
What is the expected behavior? Why is that the expected behavior?
We have two expectations in conflict here:
- When a stream emits a
drain
event, it should be safe for the upstream source to continue writing data. - When a call to
push
returns false, the stream implementer should be able to wait for adrain
event and then resume pushing data.
These two requirements are incompatible in the _flush()
method. Emitting a drain event breaks condition 1 while failing to emit one breaks condition 2.
What do you see instead?
Currently the stream fails to emit a drain event. See output from the test above:
console.log
drain
at DelayTransform.<anonymous> (test/node.spec.ts:29:38)
console.log
drain
at DelayTransform.<anonymous> (test/node.spec.ts:29:38)
console.log
push '0'
at DelayTransform.safePush (test/node.spec.ts:51:12)
console.log
emit '0'
at DelayTransform.<anonymous> (test/node.spec.ts:28:38)
console.log
continue
at DelayTransform.safePush (test/node.spec.ts:53:14)
console.log
push '1'
at DelayTransform.safePush (test/node.spec.ts:51:12)
console.log
wait
at DelayTransform.safePush (test/node.spec.ts:56:14)
console.log
setting drain listener
at test/node.spec.ts:58:16
console.log
sink.uncork()
at Object.<anonymous> (test/node.spec.ts:17:13)
console.log
emit '1'
at DelayTransform.<anonymous> (test/node.spec.ts:28:38)
FAIL test/node.spec.ts (7.834 s)
stream.Transform
✕ should emit drain events while flushing (5004 ms)
● stream.Transform › should emit drain events while flushing
thrown: "Exceeded timeout of 5000 ms for a test.
Add a timeout value to this test to increase the timeout, if this is a long-running test. See https://jestjs.io/docs/api#testname-fn-timeout."
4 |
5 | describe('stream.Transform', () => {
> 6 | it('should emit drain events while flushing', async () => {
| ^
7 | const elements = range(3);
8 | const transform = new DelayTransform()
9 | const sink = new NullWritable({ objectMode: true, highWaterMark: 1 })
at test/node.spec.ts:6:3
at Object.<anonymous> (test/node.spec.ts:5:1)
Test Suites: 1 failed, 1 total
Tests: 1 failed, 1 total
Snapshots: 0 total
Time: 7.917 s, estimated 8 s
Additional information
I'm implementing a utility for shuffling large data sets. My stream has to be sure the source has ended before it can emit any values; any data that comes in after the first emit will bias the shuffle. Therefore, all of my calls to push
occur during _flush
. Would appreciate a workaround if you can think of one.