-
-
Notifications
You must be signed in to change notification settings - Fork 34.6k
Closed
Labels
invalidIssues and PRs that are invalid.Issues and PRs that are invalid.streamIssues and PRs related to the stream subsystem.Issues and PRs related to the stream subsystem.
Description
- Version: v11.10.0
- Platform: Darwin Ivans-MacBook-Pro.local 18.2.0 Darwin Kernel Version 18.2.0: Thu Dec 20 20:46:53 PST 2018; root:xnu-4903.241.1~1/RELEASE_X86_64 x86_64
Hello, I have written a simple Transform stream to getting some extended user info:
import { Transform } from 'stream';
import { getUsersPublicInfo } from '../../user/services';
const USERS_BATCH_SIZE = 100;
/**
* Class (extended from Transform stream)
* for getting minimum user info for diff
* types of reports. Returns `id`, `email`
* and `name` in a stream as an object.
*/
export class FillUserTransform extends Transform {
constructor(options) {
super({
readableObjectMode: true,
writableObjectMode: true,
...(options || {}),
});
this._userIds = [];
}
/**
* @param {{user_id: string}} chunk
* @param {string} encoding
* @param {Function} cb
* @return {*}
* @private
*/
_transform(chunk, encoding, cb) {
this._userIds.push(chunk.user_id);
if (this._userIds.length >= USERS_BATCH_SIZE) {
this._writeUserInfo(cb);
} else {
return void cb();
}
}
_flush(cb) {
if (this._userIds.length > 0) {
this._writeUserInfo(cb);
} else {
return void cb();
}
}
/**
* @param {Function} cb
* @return {*}
* @private
*/
_writeUserInfo(cb) {
this.pause();
getUsersPublicInfo(this._userIds)
.then(users => {
if (users.length > 0) {
this.push(...users);
}
this._userIds.length = 0;
this.resume();
cb();
})
.catch(err => {
this._userIds.length = 0;
this.resume();
cb(err);
});
}
}but this.push(...users); does not work as expected, replaced spread operator with forEach - works
users.forEach(user => {
this.push(user);
});Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
invalidIssues and PRs that are invalid.Issues and PRs that are invalid.streamIssues and PRs related to the stream subsystem.Issues and PRs related to the stream subsystem.