Skip to content

stream.compose(...) doesn't destroy all active composed streams when it is destroyed #51987

Closed
@headlessme

Description

@headlessme

Version

20.11.0

Platform

Darwin MacBook-Pro-4.local 23.1.0 Darwin Kernel Version 23.1.0: Mon Oct 9 21:27:27 PDT 2023; root:xnu-10002.41.9~6/RELEASE_X86_64 x86_64

Subsystem

stream

What steps will reproduce the bug?

When calling destroy(err) on a stream created using stream.compose() some streams that may still be actively processing data do not have _destroy called and do not emit an error event. Illustrated by the code below.

If there is a stream that slowly processes data as the last entry in compose(...) and all it's input has been written (finish event has fired) and then destroy() is called on the composed stream, I was expecting the slow processing stream to also be destroyed and emit the error.

Inserting a PassThrough stream as the last entry in the compose chain seems to fix the issue (uncomment lines below in sample code), but it's unclear to me why that is. Either I've misunderstood something about compose()ing streams or there's a bug.

import { PassThrough, Duplex, compose } from 'stream'

class SlowProcessor extends Duplex {
  constructor (options) {
    super({ ...options, objectMode: true })
    this.stuff = []
  }

  _write (message, encoding, callback) {
    this.stuff.push(message)
    callback()
  }

  _destroy(err, cb) {
      console.log('SlowProcessor _destroy called', err)
      cb(err)
  }

  _read () {
      // emulate some slow processing
      setTimeout(() => {
        if (this.stuff.length) {
          this.push(this.stuff.shift())
        } else if (this.writableEnded) {
          this.push(null)
        } else {
          setTimeout(() => this._read(), 100)
        }
      }, 100)
   }
}

const composed = compose(
    new PassThrough({ objectMode:true })
        .on('finish', () => console.log('input passthrough finish'))
        .on('end', () => console.log('input passthrough end'))
        .on('error', err => console.log('input passthrough error', err)),
    new SlowProcessor()
        .on('finish', () => console.log('SlowProcessor finish'))
        .on('end', () => console.log('SlowProcessor end'))
        .on('error', err => console.log('SlowProcessor error', err)),
    //
    // UNCOMMENT THIS AND IT WORKS AS EXPECTED
    //
    // new PassThrough({ objectMode:true })
    //     .on('finish', () => console.log('terminal passthrough finish'))
    //     .on('end', () => console.log('terminal passthrough end'))
    //     .on('error', err => console.log('terminal passthrough error', err))
)
    .on('finish', () => console.log('composed finish'))
    .on('error', err => console.log('composed', err))
    .on('end', () => console.log('composed end'))
    .on('data', d => console.log('data:', d))

composed.write('hello')
composed.write('world')
composed.end()

setTimeout(() => {
    composed.destroy(new Error('Uh-oh'))
}, 100)

How often does it reproduce? Is there a required condition?

Reliably reproduces using sample above.

What is the expected behavior? Why is that the expected behavior?

I'd expect any stream that's part of a compose(...) chain that is still processing data to emit errors when the outer stream is destroyed.

I'd expect output from the above script something like:

input passthrough end
input passthrough finish
SlowProcessor finish
_destroy called Error: Uh-oh
SlowProcessor error Error: Uh-oh
composed Error: Uh-oh

What do you see instead?

The SlowProcessor stream does not get destroyed even though its Readable side has not ended.

input passthrough end
input passthrough finish
SlowProcessor finish
// <=== I'd expect an error event from the SlowProcessor here as it hasn't ended
composed Error: Uh-oh

When there's an extra PassThrough inserted as the last in the compose chain, the results look as I'd expect:

input passthrough end
input passthrough finish
SlowProcessor finish
terminal passthrough error Error: Uh-oh
SlowProcessor _destroy called Error: Uh-oh
SlowProcessor error Error: Uh-oh
composed Error: Uh-oh

Additional information

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    streamIssues and PRs related to the stream subsystem.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions