Transform and writable streams capable of processing chunks concurrently.
A concurrent transform stream
Parameters
work
function a function to process a single chunk. Function signature should beprocess(chunk, enc, callback)
. When finished processing, fire the providedcallback
.options
object options to pass to the transform stream. (optional, defaultundefined
)options.concurrency
number number of chunks to process concurrently. (optional, default1
)
Examples
var parallel = require('parallel-stream');
var transform = parallel.transform(function(chunk, enc, callback) {
processAsync(chunk)
.on('done', function(processedData) {
callback(null, processedData);
});
}, { objectMode: true, concurrency: 15 });
readable.pipe(transform)
.on('data', function(data) {
console.log('got processed data: %j', data);
})
.on('end', function() {
console.log('complete!');
});
Returns object a transform stream. Do not override the ._transform
function.
A concurrent writable stream
Parameters
work
function a function to process a single chunk. Function signature should beprocess(chunk, enc, callback)
. When finished processing, fire the providedcallback
.flush
function a function to run once all chunks have been processed, but before the stream emits afinished
event. Function signature should beflush(callback)
, fire the providedcallback
when complete. (optional, defaultundefined
)options
object options to pass to the writable stream. (optional, defaultundefined
)options.concurrency
number number of chunks to process concurrently. (optional, default1
)
Examples
var parallel = require('parallel-stream');
var writable = parallel.writable(function(chunk, enc, callback) {
processAsync(chunk)
.on('done', callback);
}, { objectMode: true, concurrency: 15 });
readable.pipe(writable)
.on('finish', function() {
console.log('complete!');
});
Returns object a writable stream. Do not override the ._write
function.