Skip to content

Stream this.push and spread operator does not work #26582

Closed
@akaNightmare

Description

@akaNightmare
  • Version: v11.10.0
  • Platform: Darwin Ivans-MacBook-Pro.local 18.2.0 Darwin Kernel Version 18.2.0: Thu Dec 20 20:46:53 PST 2018; root:xnu-4903.241.1~1/RELEASE_X86_64 x86_64

Hello, I have written a simple Transform stream to getting some extended user info:

import { Transform } from 'stream';
import { getUsersPublicInfo } from '../../user/services';

const USERS_BATCH_SIZE = 100;

/**
 * Class (extended from Transform stream)
 * for getting minimum user info for diff
 * types of reports. Returns `id`, `email`
 * and `name` in a stream as an object.
 */
export class FillUserTransform extends Transform {
    constructor(options) {
        super({
            readableObjectMode: true,
            writableObjectMode: true,
            ...(options || {}),
        });
        this._userIds = [];
    }

    /**
     * @param {{user_id: string}} chunk
     * @param {string} encoding
     * @param {Function} cb
     * @return {*}
     * @private
     */
    _transform(chunk, encoding, cb) {
        this._userIds.push(chunk.user_id);
        if (this._userIds.length >= USERS_BATCH_SIZE) {
            this._writeUserInfo(cb);
        } else {
            return void cb();
        }
    }

    _flush(cb) {
        if (this._userIds.length > 0) {
            this._writeUserInfo(cb);
        } else {
            return void cb();
        }
    }

    /**
     * @param {Function} cb
     * @return {*}
     * @private
     */
    _writeUserInfo(cb) {
        this.pause();
        getUsersPublicInfo(this._userIds)
            .then(users => {
                if (users.length > 0) {
                    this.push(...users);
                }
                this._userIds.length = 0;
                this.resume();
                cb();
            })
            .catch(err => {
                this._userIds.length = 0;
                this.resume();
                cb(err);
            });
    }
}

but this.push(...users); does not work as expected, replaced spread operator with forEach - works

users.forEach(user => {
    this.push(user);
});

Metadata

Metadata

Assignees

No one assigned

    Labels

    invalidIssues and PRs that are invalid.streamIssues and PRs related to the stream subsystem.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions