Skip to content

Commit

Permalink
fix(ChangeStream): whitelist resumable errors (#2337)
Browse files Browse the repository at this point in the history
- Changes which errors are considered resumable on change streams,
    adding support for the new ResumableChangeStreamError label.
  - Updates ChangeStream prose tests which described startAfter
    behavior for unsupported server versions.
  - Fixes use of startAfter/resumeAfter when resuming from an
    invalidate event. Implement prose tests #17 and #18.

NODE-2478
  • Loading branch information
emadum authored Apr 30, 2020
1 parent 922c3ab commit a9d3965
Show file tree
Hide file tree
Showing 19 changed files with 6,080 additions and 744 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ language: node_js
branches:
only:
- master
- next
- 3.6

before_install:
# we have to intstall mongo-orchestration ourselves to get around permissions issues in subshells
Expand Down
60 changes: 34 additions & 26 deletions lib/change_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,9 @@ class ChangeStreamCursor extends Cursor {
['resumeAfter', 'startAfter', 'startAtOperationTime'].forEach(key => delete result[key]);

if (this.resumeToken) {
result.resumeAfter = this.resumeToken;
const resumeKey =
this.options.startAfter && !this.hasReceived ? 'startAfter' : 'resumeAfter';
result[resumeKey] = this.resumeToken;
} else if (this.startAtOperationTime && maxWireVersion(this.server) >= 7) {
result.startAtOperationTime = this.startAtOperationTime;
}
Expand All @@ -296,6 +298,26 @@ class ChangeStreamCursor extends Cursor {
return result;
}

cacheResumeToken(resumeToken) {
if (this.bufferedCount() === 0 && this.cursorState.postBatchResumeToken) {
this.resumeToken = this.cursorState.postBatchResumeToken;
} else {
this.resumeToken = resumeToken;
}
this.hasReceived = true;
}

_processBatch(batchName, response) {
const cursor = response.cursor;
if (cursor.postBatchResumeToken) {
this.cursorState.postBatchResumeToken = cursor.postBatchResumeToken;

if (cursor[batchName].length === 0) {
this.resumeToken = cursor.postBatchResumeToken;
}
}
}

_initializeCursor(callback) {
super._initializeCursor((err, result) => {
if (err) {
Expand All @@ -314,15 +336,9 @@ class ChangeStreamCursor extends Cursor {
this.startAtOperationTime = response.operationTime;
}

const cursor = response.cursor;
if (cursor.postBatchResumeToken) {
this.cursorState.postBatchResumeToken = cursor.postBatchResumeToken;

if (cursor.firstBatch.length === 0) {
this.resumeToken = cursor.postBatchResumeToken;
}
}
this._processBatch('firstBatch', response);

this.emit('init', result);
this.emit('response');
callback(err, result);
});
Expand All @@ -335,15 +351,9 @@ class ChangeStreamCursor extends Cursor {
return;
}

const cursor = response.cursor;
if (cursor.postBatchResumeToken) {
this.cursorState.postBatchResumeToken = cursor.postBatchResumeToken;

if (cursor.nextBatch.length === 0) {
this.resumeToken = cursor.postBatchResumeToken;
}
}
this._processBatch('nextBatch', response);

this.emit('more', response);
this.emit('response');
callback(err, response);
});
Expand All @@ -366,6 +376,7 @@ function createChangeStreamCursor(self, options) {

const pipeline = [{ $changeStream: changeStreamStageOptions }].concat(self.pipeline);
const cursorOptions = applyKnownOptions({}, options, CURSOR_OPTIONS);

const changeStreamCursor = new ChangeStreamCursor(
self.topology,
new AggregateOperation(self.parent, pipeline, options),
Expand Down Expand Up @@ -464,9 +475,10 @@ function processNewChange(args) {
const change = args.change;
const callback = args.callback;
const eventEmitter = args.eventEmitter || false;
const cursor = changeStream.cursor;

// If the changeStream is closed, then it should not process a change.
if (changeStream.isClosed()) {
// If the cursor is null, then it should not process a change.
if (cursor == null) {
// We do not error in the eventEmitter case.
if (eventEmitter) {
return;
Expand All @@ -478,12 +490,12 @@ function processNewChange(args) {
: changeStream.promiseLibrary.reject(error);
}

const cursor = changeStream.cursor;
const topology = changeStream.topology;
const options = changeStream.cursor.options;
const wireVersion = maxWireVersion(cursor.server);

if (error) {
if (isResumableError(error) && !changeStream.attemptingResume) {
if (isResumableError(error, wireVersion) && !changeStream.attemptingResume) {
changeStream.attemptingResume = true;

// stop listening to all events from old cursor
Expand Down Expand Up @@ -549,11 +561,7 @@ function processNewChange(args) {
}

// cache the resume token
if (cursor.bufferedCount() === 0 && cursor.cursorState.postBatchResumeToken) {
cursor.resumeToken = cursor.cursorState.postBatchResumeToken;
} else {
cursor.resumeToken = change._id;
}
cursor.cacheResumeToken(change._id);

// wipe the startAtOperationTime if there was one so that there won't be a conflict
// between resumeToken and startAtOperationTime if we need to reconnect the cursor
Expand Down
5 changes: 0 additions & 5 deletions lib/core/cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ const Logger = require('./connection/logger');
const retrieveBSON = require('./connection/utils').retrieveBSON;
const MongoError = require('./error').MongoError;
const MongoNetworkError = require('./error').MongoNetworkError;
const mongoErrorContextSymbol = require('./error').mongoErrorContextSymbol;
const collationNotSupported = require('./utils').collationNotSupported;
const ReadPreference = require('./topologies/read_preference');
const isUnifiedTopology = require('./utils').isUnifiedTopology;
Expand Down Expand Up @@ -774,10 +773,6 @@ function nextFunction(self, callback) {
// Execute the next get more
self._getMore(function(err, doc, connection) {
if (err) {
if (err instanceof MongoError) {
err[mongoErrorContextSymbol].isGetMore = true;
}

return handleCallback(callback, err);
}

Expand Down
4 changes: 0 additions & 4 deletions lib/core/error.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
'use strict';

const mongoErrorContextSymbol = Symbol('mongoErrorContextSymbol');

/**
* Creates a new MongoError
*
Expand Down Expand Up @@ -29,7 +27,6 @@ class MongoError extends Error {
}

this.name = 'MongoError';
this[mongoErrorContextSymbol] = this[mongoErrorContextSymbol] || {};
}

/**
Expand Down Expand Up @@ -262,7 +259,6 @@ module.exports = {
MongoTimeoutError,
MongoServerSelectionError,
MongoWriteConcernError,
mongoErrorContextSymbol,
isRetryableError,
isSDAMUnrecoverableError,
isNodeShuttingDownError,
Expand Down
1 change: 0 additions & 1 deletion lib/core/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ module.exports = {
MongoTimeoutError: require('./error').MongoTimeoutError,
MongoServerSelectionError: require('./error').MongoServerSelectionError,
MongoWriteConcernError: require('./error').MongoWriteConcernError,
mongoErrorContextSymbol: require('./error').mongoErrorContextSymbol,
// Core
Connection: require('./connection/connection'),
Server: require('./topologies/server'),
Expand Down
24 changes: 13 additions & 11 deletions lib/core/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,24 @@ function retrieveEJSON() {
* @param {(Topology|Server)} topologyOrServer
*/
function maxWireVersion(topologyOrServer) {
if (topologyOrServer.ismaster) {
return topologyOrServer.ismaster.maxWireVersion;
}
if (topologyOrServer) {
if (topologyOrServer.ismaster) {
return topologyOrServer.ismaster.maxWireVersion;
}

if (typeof topologyOrServer.lastIsMaster === 'function') {
const lastIsMaster = topologyOrServer.lastIsMaster();
if (lastIsMaster) {
return lastIsMaster.maxWireVersion;
if (typeof topologyOrServer.lastIsMaster === 'function') {
const lastIsMaster = topologyOrServer.lastIsMaster();
if (lastIsMaster) {
return lastIsMaster.maxWireVersion;
}
}
}

if (topologyOrServer.description) {
return topologyOrServer.description.maxWireVersion;
if (topologyOrServer.description) {
return topologyOrServer.description.maxWireVersion;
}
}

return null;
return 0;
}

/*
Expand Down
59 changes: 26 additions & 33 deletions lib/error.js
Original file line number Diff line number Diff line change
@@ -1,45 +1,38 @@
'use strict';

const MongoNetworkError = require('./core').MongoNetworkError;
const mongoErrorContextSymbol = require('./core').mongoErrorContextSymbol;

const GET_MORE_NON_RESUMABLE_CODES = new Set([
136, // CappedPositionLost
237, // CursorKilled
11601 // Interrupted
// From spec@https://github.com/mongodb/specifications/blob/f93d78191f3db2898a59013a7ed5650352ef6da8/source/change-streams/change-streams.rst#resumable-error
const GET_MORE_RESUMABLE_CODES = new Set([
6, // HostUnreachable
7, // HostNotFound
89, // NetworkTimeout
91, // ShutdownInProgress
189, // PrimarySteppedDown
262, // ExceededTimeLimit
9001, // SocketException
10107, // NotMaster
11600, // InterruptedAtShutdown
11602, // InterruptedDueToReplStateChange
13435, // NotMasterNoSlaveOk
13436, // NotMasterOrSecondary
63, // StaleShardVersion
150, // StaleEpoch
13388, // StaleConfig
234, // RetryChangeStream
133 // FailedToSatisfyReadPreference
]);

// From spec@https://github.com/mongodb/specifications/blob/7a2e93d85935ee4b1046a8d2ad3514c657dc74fa/source/change-streams/change-streams.rst#resumable-error:
//
// An error is considered resumable if it meets any of the following criteria:
// - any error encountered which is not a server error (e.g. a timeout error or network error)
// - any server error response from a getMore command excluding those containing the error label
// NonRetryableChangeStreamError and those containing the following error codes:
// - Interrupted: 11601
// - CappedPositionLost: 136
// - CursorKilled: 237
//
// An error on an aggregate command is not a resumable error. Only errors on a getMore command may be considered resumable errors.

function isGetMoreError(error) {
if (error[mongoErrorContextSymbol]) {
return error[mongoErrorContextSymbol].isGetMore;
}
}

function isResumableError(error) {
if (!isGetMoreError(error)) {
return false;
}

function isResumableError(error, wireVersion) {
if (error instanceof MongoNetworkError) {
return true;
}

return !(
GET_MORE_NON_RESUMABLE_CODES.has(error.code) ||
error.hasErrorLabel('NonRetryableChangeStreamError')
);
if (wireVersion >= 9) {
return error.hasErrorLabel('ResumableChangeStreamError');
}

return GET_MORE_RESUMABLE_CODES.has(error.code);
}

module.exports = { GET_MORE_NON_RESUMABLE_CODES, isResumableError };
module.exports = { GET_MORE_RESUMABLE_CODES, isResumableError };
Loading

0 comments on commit a9d3965

Please sign in to comment.