Skip to content

Commit

Permalink
fix(NODE-1797): error when ChangeStream used as iterator and emitter …
Browse files Browse the repository at this point in the history
…concurrently (#2871)
  • Loading branch information
W-A-James authored Jul 9, 2021
1 parent 70810d1 commit e0b3afe
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 1 deletion.
26 changes: 26 additions & 0 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const kResumeQueue = Symbol('resumeQueue');
const kCursorStream = Symbol('cursorStream');
/** @internal */
const kClosed = Symbol('closed');
/** @internal */
const kMode = Symbol('mode');

const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument'];
const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat(
Expand Down Expand Up @@ -206,6 +208,8 @@ export class ChangeStream<TSchema extends Document = Document> extends TypedEven
[kCursorStream]?: Readable;
/** @internal */
[kClosed]: boolean;
/** @internal */
[kMode]: false | 'iterator' | 'emitter';

/** @event */
static readonly RESPONSE = 'response' as const;
Expand Down Expand Up @@ -272,6 +276,7 @@ export class ChangeStream<TSchema extends Document = Document> extends TypedEven
this.cursor = createChangeStreamCursor(this, options);

this[kClosed] = false;
this[kMode] = false;

// Listen for any `change` listeners being added to ChangeStream
this.on('newListener', eventName => {
Expand Down Expand Up @@ -299,6 +304,7 @@ export class ChangeStream<TSchema extends Document = Document> extends TypedEven

/** Check if there is any document still available in the Change Stream */
hasNext(callback?: Callback): Promise<void> | void {
setIsIterator(this);
return maybePromise(callback, cb => {
getCursor(this, (err, cursor) => {
if (err || !cursor) return cb(err); // failed to resume, raise an error
Expand All @@ -313,6 +319,7 @@ export class ChangeStream<TSchema extends Document = Document> extends TypedEven
next(
callback?: Callback<ChangeStreamDocument<TSchema>>
): Promise<ChangeStreamDocument<TSchema>> | void {
setIsIterator(this);
return maybePromise(callback, cb => {
getCursor(this, (err, cursor) => {
if (err || !cursor) return cb(err); // failed to resume, raise an error
Expand Down Expand Up @@ -367,6 +374,7 @@ export class ChangeStream<TSchema extends Document = Document> extends TypedEven
tryNext(): Promise<Document | null>;
tryNext(callback: Callback<Document | null>): void;
tryNext(callback?: Callback<Document | null>): Promise<Document | null> | void {
setIsIterator(this);
return maybePromise(callback, cb => {
getCursor(this, (err, cursor) => {
if (err || !cursor) return cb(err); // failed to resume, raise an error
Expand Down Expand Up @@ -535,6 +543,23 @@ const CHANGE_STREAM_EVENTS = [
ChangeStream.CLOSE
];

function setIsEmitter<TSchema>(changeStream: ChangeStream<TSchema>): void {
if (changeStream[kMode] === 'iterator') {
throw new MongoDriverError(
'Cannot use ChangeStream as an EventEmitter after using as an iterator'
);
}
changeStream[kMode] = 'emitter';
}

function setIsIterator<TSchema>(changeStream: ChangeStream<TSchema>): void {
if (changeStream[kMode] === 'emitter') {
throw new MongoDriverError(
'Cannot use ChangeStream as iterator after using as an EventEmitter'
);
}
changeStream[kMode] = 'iterator';
}
/**
* Create a new change stream cursor based on self's configuration
* @internal
Expand Down Expand Up @@ -630,6 +655,7 @@ function streamEvents<TSchema>(
changeStream: ChangeStream<TSchema>,
cursor: ChangeStreamCursor<TSchema>
): void {
setIsEmitter(changeStream);
const stream = changeStream[kCursorStream] || cursor.stream();
changeStream[kCursorStream] = stream;
stream.on('data', change => processNewChange(changeStream, change));
Expand Down
107 changes: 106 additions & 1 deletion test/functional/change_stream.test.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict';
const assert = require('assert');
const { Transform, PassThrough } = require('stream');
const { MongoNetworkError } = require('../../src/error');
const { MongoNetworkError, MongoDriverError } = require('../../src/error');
const { delay, setupDatabase, withClient, withCursor } = require('./shared');
const co = require('co');
const mock = require('../tools/mock');
Expand Down Expand Up @@ -1792,6 +1792,111 @@ describe('Change Streams', function () {
}
});

// FIXME: NODE-1797
describe('should error when used as iterator and emitter concurrently', function () {
let client, coll, changeStream, repeatInsert, val;
val = 0;

beforeEach(async function () {
client = this.configuration.newClient();
await client.connect().catch(() => expect.fail('Failed to connect to client'));

coll = client.db(this.configuration.db).collection('tester');
changeStream = coll.watch();

repeatInsert = setInterval(async function () {
await coll.insertOne({ c: val }).catch('Failed to insert document');
val++;
}, 75);
});

afterEach(async function () {
if (repeatInsert) {
clearInterval(repeatInsert);
}
if (changeStream) {
await changeStream.close();
}

await mock.cleanup();
if (client) {
await client.close();
}
});

it(
'should throw MongoDriverError when set as an emitter with "on" and used as an iterator with "hasNext"',
{
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
test: async function () {
await new Promise(resolve => changeStream.on('change', resolve));
try {
await changeStream.hasNext().catch(err => {
expect.fail(err.message);
});
} catch (error) {
return expect(error).to.be.instanceof(MongoDriverError);
}
return expect.fail('Should not reach here');
}
}
);

it(
'should throw MongoDriverError when set as an iterator with "hasNext" and used as an emitter with "on"',
{
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
test: async function () {
await changeStream
.hasNext()
.catch(() => expect.fail('Failed to set changeStream to iterator'));
try {
await new Promise(resolve => changeStream.on('change', resolve));
} catch (error) {
return expect(error).to.be.instanceof(MongoDriverError);
}
return expect.fail('Should not reach here');
}
}
);

it(
'should throw MongoDriverError when set as an emitter with "once" and used as an iterator with "next"',
{
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
test: async function () {
await new Promise(resolve => changeStream.once('change', resolve));
try {
await changeStream.next().catch(err => {
expect.fail(err.message);
});
} catch (error) {
return expect(error).to.be.instanceof(MongoDriverError);
}
return expect.fail('Should not reach here');
}
}
);

it(
'should throw MongoDriverError when set as an iterator with "tryNext" and used as an emitter with "on"',
{
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
test: async function () {
await changeStream
.tryNext()
.catch(() => expect.fail('Failed to set changeStream to iterator'));
try {
await new Promise(resolve => changeStream.on('change', resolve));
} catch (error) {
return expect(error).to.be.instanceof(MongoDriverError);
}
return expect.fail('Should not reach here');
}
}
);
});

describe('should properly handle a changeStream event being processed mid-close', function () {
let client, coll, changeStream;

Expand Down

0 comments on commit e0b3afe

Please sign in to comment.