Skip to content

Commit

Permalink
fix: correctly re-establishes pipe destinations
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas Reggi authored Oct 20, 2020
1 parent f8fd310 commit a6e7caf
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 20 deletions.
2 changes: 1 addition & 1 deletion lib/change_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ function createChangeStreamCursor(self, options) {

if (self.pipeDestinations) {
const cursorStream = changeStreamCursor.stream(self.streamOptions);
for (let pipeDestination in self.pipeDestinations) {
for (let pipeDestination of self.pipeDestinations) {
cursorStream.pipe(pipeDestination);
}
}
Expand Down
32 changes: 13 additions & 19 deletions test/functional/change_stream.test.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
'use strict';
const path = require('path');
const assert = require('assert');
const Transform = require('stream').Transform;
const MongoNetworkError = require('../../lib/core').MongoNetworkError;
Expand Down Expand Up @@ -1474,7 +1475,7 @@ describe('Change Streams', function() {
}
});

it.skip('should resume piping of Change Streams when a resumable error is encountered', {
it('should resume piping of Change Streams when a resumable error is encountered', {
metadata: {
requires: {
generators: true,
Expand All @@ -1483,14 +1484,13 @@ describe('Change Streams', function() {
}
},
test: function(done) {
const filename = path.join(__dirname, '_nodemongodbnative_resumepipe.txt');
this.defer(() => fs.unlinkSync(filename));
const configuration = this.configuration;
const ObjectId = configuration.require.ObjectId;
const Timestamp = configuration.require.Timestamp;
const Long = configuration.require.Long;

// Contain mock server
let primaryServer = null;

// Default message fields
const defaultFields = {
setName: 'rs',
Expand All @@ -1506,9 +1506,8 @@ describe('Change Streams', function() {
hosts: ['localhost:32000', 'localhost:32001', 'localhost:32002']
};

co(function*() {
primaryServer = yield mock.createServer();

mock.createServer(32000, 'localhost').then(primaryServer => {
this.defer(() => mock.cleanup());
let counter = 0;
primaryServer.setMessageHandler(request => {
const doc = request.document;
Expand Down Expand Up @@ -1594,31 +1593,26 @@ describe('Change Streams', function() {

client.connect((err, client) => {
expect(err).to.not.exist;
this.defer(() => client.close());

const database = client.db('integration_tests5');
const collection = database.collection('MongoNetworkErrorTestPromises');
const changeStream = collection.watch(pipeline);

const filename = '/tmp/_nodemongodbnative_resumepipe.txt';
const outStream = fs.createWriteStream(filename);

changeStream.stream({ transform: JSON.stringify }).pipe(outStream);

this.defer(() => changeStream.close());
// Listen for changes to the file
const watcher = fs.watch(filename, function(eventType) {
assert.equal(eventType, 'change');
const watcher = fs.watch(filename, eventType => {
this.defer(() => watcher.close());
expect(eventType).to.equal('change');

const fileContents = fs.readFileSync(filename, 'utf8');
const parsedFileContents = JSON.parse(fileContents);
assert.equal(parsedFileContents.fullDocument.a, 1);

watcher.close();
expect(parsedFileContents).to.have.nested.property('fullDocument.a', 1);

changeStream.close(err => {
expect(err).to.not.exist;

mock.cleanup(() => done());
});
done();
});
});
});
Expand Down

0 comments on commit a6e7caf

Please sign in to comment.