Skip to content

Commit

Permalink
feat(cursor): new cursor.transformStream method
Browse files Browse the repository at this point in the history
Created a new method, cursor.transformStream, which
returns a readable stream with the transformation
applied. Does not mutate the cursor.

Fixes NODE-1208
  • Loading branch information
rweinberger authored Jun 14, 2018
1 parent fbfcb8d commit 397fcd2
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 2 deletions.
31 changes: 29 additions & 2 deletions lib/cursor.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
'use strict';

const Transform = require('stream').Transform;
const PassThrough = require('stream').PassThrough;
const inherits = require('util').inherits;
const f = require('util').format;
const deprecate = require('util').deprecate;
Expand Down Expand Up @@ -1124,15 +1126,40 @@ Cursor.prototype.destroy = function(err) {
/**
* Return a modified Readable stream including a possible transform method.
* @method
* @param {object} [options=null] Optional settings.
* @param {function} [options.transform=null] A transformation method applied to each document emitted by the stream.
* @param {object} [options] Optional settings.
* @param {function} [options.transform] A transformation method applied to each document emitted by the stream.
* @return {Cursor}
* TODO: replace this method with transformStream in next major release
*/
Cursor.prototype.stream = function(options) {
this.s.streamOptions = options || {};
return this;
};

/**
* Return a modified Readable stream that applies a given transform function, if supplied. If none supplied,
* returns a stream of unmodified docs.
* @method
* @param {object} [options] Optional settings.
* @param {function} [options.transform] A transformation method applied to each document emitted by the stream.
* @return {stream}
*/
Cursor.prototype.transformStream = function(options) {
const streamOptions = options || {};
if (typeof streamOptions.transform === 'function') {
const stream = new Transform({
objectMode: true,
transform: function(chunk, encoding, callback) {
this.push(streamOptions.transform(chunk));
callback();
}
});

return this.pipe(stream);
}
return this.pipe(new PassThrough({ objectMode: true }));
};

/**
* Execute the explain for the cursor
* @method
Expand Down
78 changes: 78 additions & 0 deletions test/functional/cursor_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -4423,4 +4423,82 @@ describe('Cursor', function() {
});
});
});

function testTransformStream(config, done) {
const client = config.client;
const configuration = config.configuration;
const collectionName = config.collectionName;
const transformFunc = config.transformFunc;
const expectedSet = config.expectedSet;

client.connect(function(err, client) {
const db = client.db(configuration.db);
let collection, cursor;
const docs = [
{ _id: 0, a: { b: 1, c: 0 } },
{ _id: 1, a: { b: 1, c: 0 } },
{ _id: 2, a: { b: 1, c: 0 } }
];
const resultSet = new Set();
const transformParam = transformFunc != null ? { transform: transformFunc } : null;
const close = e => cursor.close(() => client.close(() => done(e)));

Promise.resolve()
.then(() => db.createCollection(collectionName))
.then(() => (collection = db.collection(collectionName)))
.then(() => collection.insertMany(docs))
.then(() => collection.find())
.then(_cursor => (cursor = _cursor))
.then(() => cursor.transformStream(transformParam))
.then(stream => {
stream.on('data', function(doc) {
resultSet.add(doc);
});

stream.once('end', function() {
expect(resultSet).to.deep.equal(expectedSet);
close();
});

stream.once('error', function(e) {
close(e);
});
})
.catch(e => close(e));
});
}

it('transformStream should apply the supplied transformation function to each document in the stream', function(done) {
const configuration = this.configuration;
const client = configuration.newClient({ w: 1 }, { poolSize: 1, auto_reconnect: false });
const expectedDocs = [{ _id: 0, b: 1, c: 0 }, { _id: 1, b: 1, c: 0 }, { _id: 2, b: 1, c: 0 }];
const config = {
client: client,
configuration: configuration,
collectionName: 'transformStream-test-transform',
transformFunc: doc => ({ _id: doc._id, b: doc.a.b, c: doc.a.c }),
expectedSet: new Set(expectedDocs)
};

testTransformStream(config, done);
});

it('transformStream should return a stream of unmodified docs if no transform function applied', function(done) {
const configuration = this.configuration;
const client = configuration.newClient({ w: 1 }, { poolSize: 1, auto_reconnect: false });
const expectedDocs = [
{ _id: 0, a: { b: 1, c: 0 } },
{ _id: 1, a: { b: 1, c: 0 } },
{ _id: 2, a: { b: 1, c: 0 } }
];
const config = {
client: client,
configuration: configuration,
collectionName: 'transformStream-test-notransform',
transformFunc: null,
expectedSet: new Set(expectedDocs)
};

testTransformStream(config, done);
});
});

0 comments on commit 397fcd2

Please sign in to comment.