Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipeline ends a script before its stream finishes (and leaves the stream with pendingcb > 0) #49052

Closed
petr-motejlek opened this issue Aug 7, 2023 · 3 comments

Comments

@petr-motejlek
Copy link

Version

v18.17.0

Platform

Linux WHITEONE 5.15.62.1-microsoft-standard-WSL2 #1 SMP Wed Aug 24 22:24:20 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux

Subsystem

stream

What steps will reproduce the bug?

If I run this script, it will log just the number "1". Instead, I would expect it to log the number "1" and the "pipeline finished" text.

const { Transform } = require('stream')
const { pipeline } = require('stream/promises')

(async () => {
    await pipeline(
        (() => {
            const t = new Transform({
                objectMode: true,
                transform: (chunk, _, cb) => {
                    setTimeout(() => {
                        t.push(chunk)

                        // cb()
                    }, 1000)
                }
            })
            t.write(1)
            return t
        })(),
        async function (stream) {
            for await(const item of stream) {
                console.log(item)
            }
        }
    )
    console.log("pipeline finished")
})()

(My original use case was slightly different, this is essentially a dumbed down version that makes it simple for me to reproduce the underlying issue).

Do notice the use of setTimeout to delay the push call as well as the commented out cb() call (which would also be delayed by the setTimeout). In fact, the commented out cb() call is interesting in that, it doesn't really matter whether it is commented or not. The outcome is the same with it commented out or left uncommented.

This might (or might not) be related to #46170, because when I add another setTimeout just above the return t, have it wait for say 5 seconds, then let it log the t, the t has the pendingcb set to 1.

Additionally, if I increase the number of t.write(...) calls, the only one that seems to make it is the very first one. The only implication increasing the amount of writes has is that the number of pendingcb on t rises.

As an example, here's an output from a script where I duplicated the t.write(1) to 3 times. Do notice that pendingcb is 3.

Transform {
  _readableState: ReadableState {
    objectMode: true,
    highWaterMark: 16,
    buffer: BufferList { head: null, tail: null, length: 0 },
    length: 0,
    pipes: [],
    flowing: false,
    ended: false,
    endEmitted: false,
    reading: true,
    constructed: true,
    sync: false,
    needReadable: true,
    emittedReadable: false,
    readableListening: true,
    resumeScheduled: false,
    errorEmitted: false,
    emitClose: true,
    autoDestroy: true,
    destroyed: false,
    errored: null,
    closed: false,
    closeEmitted: false,
    defaultEncoding: 'utf8',
    awaitDrainWriters: null,
    multiAwaitDrain: false,
    readingMore: false,
    dataEmitted: true,
    decoder: null,
    encoding: null,
    [Symbol(kPaused)]: null
  },
  _events: [Object: null prototype] {
    prefinish: [Function: prefinish],
    close: [
      [Function (anonymous)],
      [Function: onclose],
      [Function: onclose]
    ],
    end: [ [Function: onend], [Function: onend] ],
    finish: [ [Function: onfinish], [Function: onfinish] ],
    error: [ [Function: onerror], [Function: onError], [Function: onerror] ],
    readable: [Function: next]
  },
  _eventsCount: 6,
  _maxListeners: undefined,
  _writableState: WritableState {
    objectMode: true,
    highWaterMark: 16,
    finalCalled: false,
    needDrain: false,
    ending: false,
    ended: false,
    finished: false,
    destroyed: false,
    decodeStrings: true,
    defaultEncoding: 'utf8',
    length: 3,
    writing: true,
    corked: 0,
    sync: false,
    bufferProcessing: false,
    onwrite: [Function: bound onwrite],
    writecb: [Function: nop],
    writelen: 1,
    afterWriteTickInfo: null,
    buffered: [ [Object], [Object] ],
    bufferedIndex: 0,
    allBuffers: false,
    allNoop: true,
    pendingcb: 3,
    constructed: true,
    prefinished: false,
    errorEmitted: false,
    emitClose: true,
    autoDestroy: true,
    errored: null,
    closed: false,
    closeEmitted: false,
    [Symbol(kOnFinished)]: []
  },
  allowHalfOpen: true,
  _transform: [Function: transform],
  [Symbol(kCapture)]: false,
  [Symbol(kCallback)]: null
}

Setting highWaterMark to 1 or even 0 gets us to this very same behavior as with the default 16.

The docs from stream.Transform lead me to believe that _transform can be asynchronous:

transform._transform() is never called in parallel; streams implement a queue mechanism, and to receive the next chunk, callback must be called, either synchronously or asynchronously.

Why does the script "just exit" without truly waiting for the pipeline to finish? The "pipeline finished" text is never displayed. Unless this is a real bug, I suspect I am simply not seeing an exception because I am not looking for it properly (but why does the script still exit with 0, then -- it would be non-0 in case of an exception, wouldn't it?).

Am I doing it/using it wrong?

How often does it reproduce? Is there a required condition?

always

What is the expected behavior? Why is that the expected behavior?

1
pipeline finished

What do you see instead?

1

Additional information

Cheers :)

@climba03003
Copy link
Contributor

Your problem is related to the never ending Promise with idling event loop which Node.js detected and will exit afterward.

The cause of never ending Promise related to

  1. The Transform never called .end or .write(null) to indicate it's end.
  2. The callback inside transform method is never called, so it will never been able to process next chunk.

Here is the proper implementation of your case and I transform to ESM for readability.

import { Transform } from 'stream';
import { pipeline } from 'stream/promises';

await pipeline(
    (() => {
        const t = new Transform({
            objectMode: true,
            transform: (chunk, _, cb) => {
                setTimeout(() => {
                    t.push(chunk)
                    cb()
                }, 1000)
            }
        })
        t.write(1)
        t.end()
        return t
    })(),
    async function (stream) {
        for await(const item of stream) {
            console.log(item)
        }
    }
)
console.log("pipeline finished")

@petr-motejlek
Copy link
Author

Thank you, @climba03003 . This indeed makes sense. I guess I somehow expected pipeline to call end on the stream by itself. But now, thanks to you, I realize that knowing when exactly to call it, isn't really for pipeline, but for me, to decide/know :).

I shall close the issue, since there doesn't seem to be any bug to report, but will keep posting my progress below, since I guess somebody else might run into the same situation trying to figure out the same/similar algo, so it might help on their voyage :).


I'll now go back to the drawing board and change my original code to take that into account. You see, my original idea, was to try to rewrite my usual tree-walking non-recursive algorithm, which used an array to store its in-progress queue, and have it not use the queue (well, at least not explicitly, but use a stream buffer instead of the queue).

The initial idea for the algo was like this:

    await pipeline(
        (() => {
            const t = new Transform({
                objectMode: true,
                highWaterMark: 5,
                transform: async (node, _, cb) => {
                    console.log("transform:", {item})
            
                    for await(const childNode of await getChildNodes({parentNode: node})) {
                        if (childNode.hasChildren) {
                          t.write(childNode)
                        } else {
                          t.push(childNode)
                        }
                    }
            
                    cb()
                }
            })
            t.write(rootNode)
            return t
        })(),
        async function (stream) {
            for await (const item of stream) {
                console.log("log:", {item})
            }
        },
    )

In a nutshell, it would just go thru every node it gets (expecting it to be a node with children), push any of its leaves out and write all of the nodes with children back to itself (thus avoiding recursion and the queue I would normally use for this).

Of course, when I add the t.end() call to it, it breaks, because I find myself writeing to the stream after it already ended, which fails.


I am not sure if there's a simple (readable) enough construct that would allow a stream to write to itself like this -- I thought I found it, but obviously haven't, because it fails :).

I am now thinking of rewriting my algo to resemble the more common streaming ones, which would yield events for "node with children starts", "child node (with no children)", "node with children ends"... (e.g. I am entering a directory, I am listing a file in that directory, I am leaving the directory -- I am not working with files and directories, but that would be a good example). That should, theoretically, allow me to know exactly when to call t.end() (I guess it will be when "node with children ends" is yielded and when t.pendingcb === 0)

I'll post more code as I have it.

@petr-motejlek
Copy link
Author

petr-motejlek commented Aug 9, 2023

Alrighty then. I think I have a version with self-writing Transform that might actually work :) Not sure if it really handles all stream errors properly, but it very well might, since it uses both pipeline and pipe:

    pipeline(
        Readable.from([rootNode1, rootNode2]), // my scenario is that there's actually no `pipeline` but `compose` here, because anything can stream root nodes and get back their leaves (and they use the `compose`-d Duplex stream in their own `pipeline`s
        (readable) => {
            const t = new Transform({
                objectMode: true,
                highWaterMark: 5,
                transform: async (node, _, cb) => {
                    for await(const child of await getChildNodes({parentNode: node})) {
                        if (child.hasChildren) {
                            t.write(child)
                        } else {
                            t.push(child)
                        }
                    }

                    if(readable.readableEnded && t.writableLength === 1)
                        t.end()

                    cb()
                }
            })

            readable.on("end", () => {
                if(t.writableLength === 0)
                    t.end()
            })

            return readable.pipe(t, {
                end: false,
            })
        },
        async function (stream) {
            for await (const item of stream) {
                console.log("log:", {item})
            }
        },
    )

I am pretty sure the two "events" that lead to t.end() can still be somewhat simplified (merged). I just need to figure out if and where there's one place where I can describe the situation :).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants