Skip to content

Commit

Permalink
fix(ChangeStream): handle null changes
Browse files Browse the repository at this point in the history
  • Loading branch information
emadum authored Jul 13, 2020
1 parent f262c59 commit 9db8369
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 41 deletions.
9 changes: 7 additions & 2 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,8 @@ class ChangeStreamCursor extends Cursor {

_initializeCursor(callback: Function) {
super._initializeCursor((err?: any, result?: any) => {
if (err) {
callback(err);
if (err || result == null) {
callback(err, result);
return;
}

Expand Down Expand Up @@ -504,6 +504,11 @@ function waitForTopologyConnected(topology: any, options: any, callback: Functio
function processNewChange(changeStream: any, change: any, callback?: Function) {
const cursor = changeStream.cursor;

// a null change means the cursor has been notified, implicitly closing the change stream
if (change == null) {
changeStream.closed = true;
}

if (changeStream.closed) {
if (callback) callback(new MongoError('ChangeStream is closed'));
return;
Expand Down
69 changes: 30 additions & 39 deletions src/cursor/core_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -473,50 +473,41 @@ class CoreCursor extends Readable {
}

const result = r.message;
if (result.queryFailure) {
return done(new MongoError(result.documents[0]), null);
}

// Check if we have a command cursor
if (
Array.isArray(result.documents) &&
result.documents.length === 1 &&
(!cursor.cmd.find || (cursor.cmd.find && cursor.cmd.virtual === false)) &&
(typeof result.documents[0].cursor !== 'string' ||
result.documents[0]['$err'] ||
result.documents[0]['errmsg'] ||
Array.isArray(result.documents[0].result))
) {
// We have an error document, return the error
if (result.documents[0]['$err'] || result.documents[0]['errmsg']) {
return done(new MongoError(result.documents[0]), null);
if (Array.isArray(result.documents) && result.documents.length === 1) {
const document = result.documents[0];

if (result.queryFailure) {
return done(new MongoError(document), null);
}

// We have a cursor document
if (result.documents[0].cursor != null && typeof result.documents[0].cursor !== 'string') {
const id = result.documents[0].cursor.id;
// If we have a namespace change set the new namespace for getmores
if (result.documents[0].cursor.ns) {
cursor.ns = result.documents[0].cursor.ns;
// Check if we have a command cursor
if (!cursor.cmd.find || (cursor.cmd.find && cursor.cmd.virtual === false)) {
// We have an error document, return the error
if (document.$err || document.errmsg) {
return done(new MongoError(document), null);
}
// Promote id to long if needed
cursor.cursorState.cursorId = typeof id === 'number' ? Long.fromNumber(id) : id;
cursor.cursorState.lastCursorId = cursor.cursorState.cursorId;
cursor.cursorState.operationTime = result.documents[0].operationTime;

// If we have a firstBatch set it
if (Array.isArray(result.documents[0].cursor.firstBatch)) {
cursor.cursorState.documents = result.documents[0].cursor.firstBatch; //.reverse();
}

// Return after processing command cursor
return done(null, result);
}

if (Array.isArray(result.documents[0].result)) {
cursor.cursorState.documents = result.documents[0].result;
cursor.cursorState.cursorId = Long.ZERO;
return done(null, result);
// We have a cursor document
if (document.cursor != null && typeof document.cursor !== 'string') {
const id = document.cursor.id;
// If we have a namespace change set the new namespace for getmores
if (document.cursor.ns) {
cursor.ns = document.cursor.ns;
}
// Promote id to long if needed
cursor.cursorState.cursorId = typeof id === 'number' ? Long.fromNumber(id) : id;
cursor.cursorState.lastCursorId = cursor.cursorState.cursorId;
cursor.cursorState.operationTime = document.operationTime;

// If we have a firstBatch set it
if (Array.isArray(document.cursor.firstBatch)) {
cursor.cursorState.documents = document.cursor.firstBatch;
}

// Return after processing command cursor
return done(null, result);
}
}
}

Expand Down
46 changes: 46 additions & 0 deletions test/functional/change_stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2799,3 +2799,49 @@ describe('Change Stream Resume Error Tests', function() {
})
});
});
context('NODE-2626 - handle null changes without error', function() {
let mockServer;
afterEach(() => mock.cleanup());
beforeEach(() => mock.createServer().then(server => (mockServer = server)));
it('changeStream should close if cursor id for initial aggregate is Long.ZERO', function(done) {
mockServer.setMessageHandler(req => {
const doc = req.document;
if (doc.ismaster) {
return req.reply(mock.DEFAULT_ISMASTER_36);
}
if (doc.aggregate) {
return req.reply({
ok: 1,
cursor: {
id: Long.ZERO,
firstBatch: []
}
});
}
if (doc.getMore) {
return req.reply({
ok: 1,
cursor: {
id: new Long(1407, 1407),
nextBatch: []
}
});
}
req.reply({ ok: 1 });
});
const client = this.configuration.newClient(`mongodb://${mockServer.uri()}/`, {
useUnifiedTopology: true
});
client.connect(err => {
expect(err).to.not.exist;
const collection = client.db('cs').collection('test');
const changeStream = collection.watch();
changeStream.next((err, doc) => {
expect(err).to.exist;
expect(doc).to.not.exist;
expect(err.message).to.equal('ChangeStream is closed');
changeStream.close(() => client.close(done));
});
});
});
});

0 comments on commit 9db8369

Please sign in to comment.