Skip to content

stream.Transform limitations on backpressure handling after stream is ended #49302

Open
@dmurvihill

Description

@dmurvihill

Version

v20.2.0

Platform

Darwin MacBook-Air.local 19.6.0 Darwin Kernel Version 19.6.0: Mon Aug 31 22:12:52 PDT 2020; root:xnu-6153.141.2~1/RELEASE_X86_64 x86_64

Subsystem

stream

What steps will reproduce the bug?

Run this with ts-jest:

import { Readable, Transform, TransformCallback } from 'stream';
import { pipeline } from 'stream/promises';
import NullWritable from 'null-writable';

describe('stream.Transform', () => {
  it('should emit drain events while flushing', async () => {
    const elements = range(3);
    const transform = new DelayTransform()
    const sink = new NullWritable({ objectMode: true, highWaterMark: 1 })
    sink.cork();
    const testPipeline = pipeline(
      Readable.from(elements, { highWaterMark: 1 }),
      transform,
      sink,
    );
    await new Promise(resolve => setTimeout(resolve, 10));
    console.log('sink.uncork()');
    sink.uncork();
    await testPipeline;
  });
});

class DelayTransform extends Transform {
  private readonly storedElements: any = [];

  constructor() {
    super({ objectMode: true, highWaterMark: 1 });
    this.on('data', (d) => { console.log(`emit '${d}'`); });
    this.on('drain', () => { console.log('drain'); });
  }

 _transform(chunk: any, encoding: string | null, callback: TransformCallback) {
   this.storedElements.push(chunk);
   callback();
 }

 _flush(callback: TransformCallback) {
   this.flushStoredElements()
     .then(() => callback())
     .catch((e) => callback(e));
 }

 private async flushStoredElements(): Promise<void> {
   for (const item of this.storedElements) {
     await this.safePush(item);
   }
 }

 /** Push and resolve when it is safe to push again */
 private safePush(item: string): Promise<void> {
   console.log(`push '${item}'`);
   if (this.push(item)) {
     console.log('continue');
     return Promise.resolve();
   } else {
     console.log('wait');
     return new Promise(resolve => {
       console.log('setting drain listener');
       this.once('drain', () => {
         console.log('resuming');
         resolve();
       });
     });
   }
  }
}

function range(n: number): number[] {
  const elements = [];
  for (let i = 0; i < n; i++) {
    elements.push(i);
  }
  return elements;
}

How often does it reproduce? Is there a required condition?

100% reproducible with Jest.

What is the expected behavior? Why is that the expected behavior?

We have two expectations in conflict here:

  1. When a stream emits a drain event, it should be safe for the upstream source to continue writing data.
  2. When a call to push returns false, the stream implementer should be able to wait for a drain event and then resume pushing data.

These two requirements are incompatible in the _flush() method. Emitting a drain event breaks condition 1 while failing to emit one breaks condition 2.

What do you see instead?

Currently the stream fails to emit a drain event. See output from the test above:

  console.log
    drain

      at DelayTransform.<anonymous> (test/node.spec.ts:29:38)

  console.log
    drain

      at DelayTransform.<anonymous> (test/node.spec.ts:29:38)

  console.log
    push '0'

      at DelayTransform.safePush (test/node.spec.ts:51:12)

  console.log
    emit '0'

      at DelayTransform.<anonymous> (test/node.spec.ts:28:38)

  console.log
    continue

      at DelayTransform.safePush (test/node.spec.ts:53:14)

  console.log
    push '1'

      at DelayTransform.safePush (test/node.spec.ts:51:12)

  console.log
    wait

      at DelayTransform.safePush (test/node.spec.ts:56:14)

  console.log
    setting drain listener

      at test/node.spec.ts:58:16

  console.log
    sink.uncork()

      at Object.<anonymous> (test/node.spec.ts:17:13)

  console.log
    emit '1'

      at DelayTransform.<anonymous> (test/node.spec.ts:28:38)

 FAIL  test/node.spec.ts (7.834 s)
  stream.Transform
    ✕ should emit drain events while flushing (5004 ms)

  ● stream.Transform › should emit drain events while flushing

    thrown: "Exceeded timeout of 5000 ms for a test.
    Add a timeout value to this test to increase the timeout, if this is a long-running test. See https://jestjs.io/docs/api#testname-fn-timeout."

      4 |
      5 | describe('stream.Transform', () => {
    > 6 |   it('should emit drain events while flushing', async () => {
        |   ^
      7 |     const elements = range(3);
      8 |     const transform = new DelayTransform()
      9 |     const sink = new NullWritable({ objectMode: true, highWaterMark: 1 })

      at test/node.spec.ts:6:3
      at Object.<anonymous> (test/node.spec.ts:5:1)

Test Suites: 1 failed, 1 total
Tests:       1 failed, 1 total
Snapshots:   0 total
Time:        7.917 s, estimated 8 s

Additional information

I'm implementing a utility for shuffling large data sets. My stream has to be sure the source has ended before it can emit any values; any data that comes in after the first emit will bias the shuffle. Therefore, all of my calls to push occur during _flush. Would appreciate a workaround if you can think of one.

Metadata

Metadata

Assignees

No one assigned

    Labels

    streamIssues and PRs related to the stream subsystem.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions