Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update change streams #12

Merged
merged 2 commits into from
Feb 18, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Update for 3.6 driver
  • Loading branch information
ljhaywar committed Feb 12, 2021
commit a82e19490e92039d7d42829fb2b35df7e55080cc
23 changes: 15 additions & 8 deletions changeStreams.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ function closeChangeStream(timeInMs = 60000, changeStream) {
async function monitorListingsUsingEventEmitter(client, timeInMs = 60000, pipeline = []) {
const collection = client.db("sample_airbnb").collection("listingsAndReviews");

// See http://bit.ly/Node_watch for the watch() docs
// See https://mongodb.github.io/node-mongodb-native/3.6/api/Collection.html#watch for the watch() docs
const changeStream = collection.watch(pipeline);

// ChangeStream inherits from the Node Built-in Class EventEmitter (https://nodejs.org/dist/latest-v12.x/docs/api/events.html#events_class_eventemitter).
Expand All @@ -102,18 +102,25 @@ async function monitorListingsUsingEventEmitter(client, timeInMs = 60000, pipeli
async function monitorListingsUsingHasNext(client, timeInMs = 60000, pipeline = []) {
const collection = client.db("sample_airbnb").collection("listingsAndReviews");

// See http://bit.ly/Node_watch for the watch() docs
// See https://mongodb.github.io/node-mongodb-native/3.6/api/Collection.html#watch for the watch() docs
const changeStream = collection.watch(pipeline);

// Set a timer that will close the change stream after the given amount of time
// Function execution will continue because we are not using "await" here
closeChangeStream(timeInMs, changeStream);

// We can use ChangeStream's hasNext() function to wait for a new change in the change stream.
// If the change stream is closed, hasNext() will return false so the while loop will exit.
// See http://bit.ly/Node_ChangeStream for the ChangeStream docs.
while (await changeStream.hasNext()) {
console.log(await changeStream.next());
// See https://mongodb.github.io/node-mongodb-native/3.6/api/ChangeStream.html for the ChangeStream docs.
try {
while (await changeStream.hasNext()) {
console.log(await changeStream.next());
}
} catch (error) {
if (changeStream.isClosed()) {
console.log("The change stream is closed. Will not wait on any more changes.")
} else {
throw error;
}
}
}

Expand All @@ -127,10 +134,10 @@ async function monitorListingsUsingHasNext(client, timeInMs = 60000, pipeline =
async function monitorListingsUsingStreamAPI(client, timeInMs = 60000, pipeline = []) {
const collection = client.db('sample_airbnb').collection('listingsAndReviews');

// See http://bit.ly/Node_watch for the watch() docs
// See https://mongodb.github.io/node-mongodb-native/3.6/api/Collection.html#watch for the watch() docs
const changeStream = collection.watch(pipeline);

// See http://bit.ly/Node_pipe for the pipe() docs
// See https://mongodb.github.io/node-mongodb-native/3.6/api/ChangeStream.html#pipe for the pipe() docs
changeStream.pipe(
new stream.Writable({
objectMode: true,
Expand Down