Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 31 additions & 21 deletions lib/cursor/changeStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,18 @@ class ChangeStream extends EventEmitter {
);
}

// This wrapper is necessary because of buffering.
changeStreamThunk((err, driverChangeStream) => {
if (err != null) {
this.emit('error', err);
return;
}
this.$driverChangeStreamPromise = new Promise((resolve, reject) => {
// This wrapper is necessary because of buffering.
changeStreamThunk((err, driverChangeStream) => {
if (err != null) {
this.emit('error', err);
return reject(err);
}

this.driverChangeStream = driverChangeStream;
this.emit('ready');
this.driverChangeStream = driverChangeStream;
this.emit('ready');
resolve();
});
});
}

Expand All @@ -53,20 +56,23 @@ class ChangeStream extends EventEmitter {
this.bindedEvents = true;

if (this.driverChangeStream == null) {
this.once('ready', () => {
this.driverChangeStream.on('close', () => {
this.closed = true;
});
this.$driverChangeStreamPromise.then(
() => {
this.driverChangeStream.on('close', () => {
this.closed = true;
});

driverChangeStreamEvents.forEach(ev => {
this.driverChangeStream.on(ev, data => {
if (data != null && data.fullDocument != null && this.options && this.options.hydrate) {
data.fullDocument = this.options.model.hydrate(data.fullDocument);
}
this.emit(ev, data);
driverChangeStreamEvents.forEach(ev => {
this.driverChangeStream.on(ev, data => {
if (data != null && data.fullDocument != null && this.options && this.options.hydrate) {
data.fullDocument = this.options.model.hydrate(data.fullDocument);
}
this.emit(ev, data);
});
});
});
});
},
() => {} // No need to register events if opening change stream failed
);

return;
}
Expand Down Expand Up @@ -142,8 +148,12 @@ class ChangeStream extends EventEmitter {
this.closed = true;
if (this.driverChangeStream) {
return this.driverChangeStream.close();
} else {
return this.$driverChangeStreamPromise.then(
() => this.driverChangeStream.close(),
() => {} // No need to close if opening the change stream failed
);
}
return Promise.resolve();
}
}

Expand Down
5 changes: 5 additions & 0 deletions lib/cursor/queryCursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ function QueryCursor(query) {
this.cursor = null;
this.skipped = false;
this.query = query;
this._closed = false;
const model = query.model;
this._mongooseOptions = {};
this._transforms = [];
Expand Down Expand Up @@ -229,6 +230,7 @@ QueryCursor.prototype.close = async function close() {
}
try {
await this.cursor.close();
this._closed = true;
this.emit('close');
} catch (error) {
this.listeners('error').length > 0 && this.emit('error', error);
Expand Down Expand Up @@ -266,6 +268,9 @@ QueryCursor.prototype.next = async function next() {
if (typeof arguments[0] === 'function') {
throw new MongooseError('QueryCursor.prototype.next() no longer accepts a callback');
}
if (this._closed) {
throw new MongooseError('Cannot call `next()` on a closed cursor');
}
return new Promise((resolve, reject) => {
_next(this, function(error, doc) {
if (error) {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"dependencies": {
"bson": "^6.7.0",
"kareem": "2.6.3",
"mongodb": "6.7.0",
"mongodb": "6.8.0",
"mpath": "0.9.0",
"mquery": "5.0.0",
"ms": "2.1.3",
Expand Down
2 changes: 1 addition & 1 deletion scripts/tsc-diagnostics-check.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const fs = require('fs');

const stdin = fs.readFileSync(0).toString('utf8');
const maxInstantiations = isNaN(process.argv[2]) ? 127500 : parseInt(process.argv[2], 10);
const maxInstantiations = isNaN(process.argv[2]) ? 135000 : parseInt(process.argv[2], 10);

console.log(stdin);

Expand Down
2 changes: 2 additions & 0 deletions test/connection.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,8 @@ describe('connections:', function() {
await nextChange;
assert.equal(changes.length, 1);
assert.equal(changes[0].operationType, 'insert');

await changeStream.close();
await conn.close();
});

Expand Down
48 changes: 41 additions & 7 deletions test/model.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const sinon = require('sinon');
const start = require('./common');

const assert = require('assert');
const { once } = require('events');
const random = require('./util').random;
const util = require('./util');

Expand Down Expand Up @@ -3508,6 +3509,9 @@ describe('Model', function() {
}
changeStream.removeListener('change', listener);
listener = null;
// Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream
// may still poll after close.
changeStream.on('error', () => {});
changeStream.close();
changeStream = null;
});
Expand Down Expand Up @@ -3560,14 +3564,21 @@ describe('Model', function() {
it('fullDocument (gh-11936)', async function() {
const MyModel = db.model('Test', new Schema({ name: String }));

const doc = await MyModel.create({ name: 'Ned Stark' });
const changeStream = await MyModel.watch([], {
fullDocument: 'updateLookup',
hydrate: true
});
await changeStream.$driverChangeStreamPromise;

const doc = await MyModel.create({ name: 'Ned Stark' });

const p = changeStream.next();
const p = new Promise((resolve) => {
changeStream.once('change', change => {
resolve(change);
});
});
// Need to wait for resume token to be set after the event listener,
// otherwise change stream might not pick up the update.
await once(changeStream.driverChangeStream, 'resumeTokenChanged');
await MyModel.updateOne({ _id: doc._id }, { name: 'Tony Stark' });

const changeData = await p;
Expand All @@ -3576,22 +3587,31 @@ describe('Model', function() {
doc._id.toHexString());
assert.ok(changeData.fullDocument.$__);
assert.equal(changeData.fullDocument.get('name'), 'Tony Stark');

await changeStream.close();
});

it('fullDocument with immediate watcher and hydrate (gh-14049)', async function() {
const MyModel = db.model('Test', new Schema({ name: String }));

const doc = await MyModel.create({ name: 'Ned Stark' });

let changeStream = null;
const p = new Promise((resolve) => {
MyModel.watch([], {
changeStream = MyModel.watch([], {
fullDocument: 'updateLookup',
hydrate: true
}).on('change', change => {
});

changeStream.on('change', change => {
resolve(change);
});
});

// Need to wait for cursor to be initialized and for resume token to
// be set, otherwise change stream might not pick up the update.
await changeStream.$driverChangeStreamPromise;
await once(changeStream.driverChangeStream, 'resumeTokenChanged');
await MyModel.updateOne({ _id: doc._id }, { name: 'Tony Stark' });

const changeData = await p;
Expand All @@ -3600,6 +3620,8 @@ describe('Model', function() {
doc._id.toHexString());
assert.ok(changeData.fullDocument.$__);
assert.equal(changeData.fullDocument.get('name'), 'Tony Stark');

await changeStream.close();
});

it('respects discriminators (gh-11007)', async function() {
Expand Down Expand Up @@ -3639,6 +3661,9 @@ describe('Model', function() {
assert.equal(changeData.operationType, 'insert');
assert.equal(changeData.fullDocument.name, 'Ned Stark');

// Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream
// may still poll after close.
changeStream.on('error', () => {});
await changeStream.close();
await db.close();
});
Expand All @@ -3654,11 +3679,16 @@ describe('Model', function() {
setTimeout(resolve, 500, false);
});

changeStream.close();
await db;
// Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream
// may still poll after close.
changeStream.on('error', () => {});

const close = changeStream.close();
await db.asPromise();
const readyCalled = await ready;
assert.strictEqual(readyCalled, false);

await close;
await db.close();
});

Expand All @@ -3675,6 +3705,10 @@ describe('Model', function() {

await MyModel.create({ name: 'Hodor' });

// Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream
// may still poll after close.
changeStream.on('error', () => {});

changeStream.close();
const closedData = await closed;
assert.strictEqual(closedData, true);
Expand Down
12 changes: 10 additions & 2 deletions test/model.watch.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,22 @@ describe('model: watch: ', function() {
const changeData = await changed;
assert.equal(changeData.operationType, 'insert');
assert.equal(changeData.fullDocument.name, 'Ned Stark');
await changeStream.close();
});

it('watch() close() prevents buffered watch op from running (gh-7022)', async function() {
const MyModel = db.model('Test', new Schema({}));
const changeStream = MyModel.watch();
const ready = new global.Promise(resolve => {
const ready = new Promise(resolve => {
changeStream.once('data', () => {
resolve(true);
});
setTimeout(resolve, 500, false);
});

// Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream
// may still poll after close.
changeStream.on('error', () => {});
const close = changeStream.close();
await db.asPromise();
const readyCalled = await ready;
Expand All @@ -64,12 +68,16 @@ describe('model: watch: ', function() {
await MyModel.init();

const changeStream = MyModel.watch();
const closed = new global.Promise(resolve => {
const closed = new Promise(resolve => {
changeStream.once('close', () => resolve(true));
});

await MyModel.create({ name: 'Hodor' });

// Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream
// may still poll after close.
changeStream.on('error', () => {});

await changeStream.close();

const closedData = await closed;
Expand Down
3 changes: 2 additions & 1 deletion test/query.cursor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,8 @@ describe('QueryCursor', function() {
await cursor.next();
assert.ok(false);
} catch (error) {
assert.equal(error.name, 'MongoCursorExhaustedError');
assert.equal(error.name, 'MongooseError');
assert.ok(error.message.includes('closed cursor'), error.message);
}
});
});
Expand Down