Skip to content

Commit

Permalink
refactor(change-stream): rename data event to change
Browse files Browse the repository at this point in the history
  • Loading branch information
mbroadst committed Aug 30, 2017
1 parent c02d25c commit 6bb7e33
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 107 deletions.
29 changes: 15 additions & 14 deletions lib/change_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var cursorOptionNames = ['maxAwaitTimeMS', 'collation', 'readPreference'];
* @param {object} [options.collation=null] Specify collation settings for operation. See {@link https://docs.mongodb.com/manual/reference/command/aggregate|aggregation documentation}.
* @param {ReadPreference} [options.readPreference=null] The read preference. Defaults to the read preference of the database or collection. See {@link https://docs.mongodb.com/manual/reference/read-preference|read preference documentation}.
* @fires ChangeStream#close
* @fires ChangeStream#data
* @fires ChangeStream#change
* @fires ChangeStream#end
* @fires ChangeStream#error
* @return {ChangeStream} a ChangeStream instance.
Expand All @@ -45,23 +45,23 @@ var ChangeStream = function(collection, pipeline, options) {
self.serverConfig = collection.s.db.serverConfig;

// Determine correct read preference
self.options.readPreference = self.options.readPreference || collection.s.readPreference;
// self.options.readPreference = self.options.readPreference || collection.s.readPreference;

// Create contained Change Stream cursor
self.cursor = createChangeStreamCursor(self);

// Listen for any data listeners being added to ChangeStream
// Listen for any `change` listeners being added to ChangeStream
self.on('newListener', function(eventName) {
if (eventName === 'data' && self.cursor && self.cursor.listenerCount('data') === 0) {
if (eventName === 'change' && self.cursor && self.cursor.listenerCount('change') === 0) {
self.cursor.on('data', function (change) {
processNewChange(self, null, change);
});
}
});

// Listen for all data listeners being removed from ChangeStream
// Listen for all `change` listeners being removed from ChangeStream
self.on('removeListener', function(eventName){
if (eventName === 'data' && self.listenerCount('data') === 0 && self.cursor) {
if (eventName === 'change' && self.listenerCount('change') === 0 && self.cursor) {
self.cursor.removeAllListeners('data');
}
});
Expand All @@ -78,12 +78,12 @@ var createChangeStreamCursor = function (self) {
buildChangeStreamAggregationCommand(self.serverConfig, self.namespace, self.pipeline, self.resumeToken, self.options);

/**
* Fired for each new matching change in the specified namespace. Attaching a 'data' event listener to a Change Stream will switch the stream into flowing mode. Data will then be passed as soon as it is available.
* Fired for each new matching change in the specified namespace. Attaching a `change` event listener to a Change Stream will switch the stream into flowing mode. Data will then be passed as soon as it is available.
*
* @event ChangeStream#data
* @event ChangeStream#change
* @type {object}
*/
if (self.listenerCount('data') > 0) {
if (self.listenerCount('change') > 0) {
changeStreamCursor.on('data', function (change) {
processNewChange(self, null, change);
});
Expand Down Expand Up @@ -123,13 +123,13 @@ var createChangeStreamCursor = function (self) {
};

var buildChangeStreamAggregationCommand = function (serverConfig, namespace, pipeline, resumeToken, options) {
var changeNotificationStageOptions = {};
var changeStreamStageOptions = {};
if (options.fullDocument) {
changeNotificationStageOptions.fullDocument = options.fullDocument;
changeStreamStageOptions.fullDocument = options.fullDocument;
}

if (resumeToken || options.resumeAfter) {
changeNotificationStageOptions.resumeAfter = resumeToken || options.resumeAfter;
changeStreamStageOptions.resumeAfter = resumeToken || options.resumeAfter;
}

// Map cursor options
Expand All @@ -141,7 +141,7 @@ var buildChangeStreamAggregationCommand = function (serverConfig, namespace, pip
});

var changeStreamPipeline = [
{ $changeNotification: changeNotificationStageOptions }
{ $changeStream: changeStreamStageOptions }
];

changeStreamPipeline = changeStreamPipeline.concat(pipeline);
Expand Down Expand Up @@ -280,6 +280,7 @@ var processNewChange = function (self, err, change, callback) {
if (err) {
// Handle resumable MongoNetworkErrors
if (err instanceof MongoNetworkError && !self.attemptingResume) {

self.attemptingResume = true;
return self.cursor.close(function(closeErr) {
if (closeErr) {
Expand Down Expand Up @@ -321,7 +322,7 @@ var processNewChange = function (self, err, change, callback) {

// Return the change
if (typeof callback === 'function') return callback(err, change);
if (self.listenerCount('data')) return self.emit('data', change);
if (self.listenerCount('change')) return self.emit('change', change);
return self.promiseLibrary.resolve(change);
};

Expand Down
Loading

0 comments on commit 6bb7e33

Please sign in to comment.