Skip to content

Commit

Permalink
Ensure that oplog observe driver retries on error
Browse files Browse the repository at this point in the history
  • Loading branch information
glasser committed Apr 8, 2014
1 parent 1d3b7f9 commit be4d306
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 28 deletions.
56 changes: 34 additions & 22 deletions packages/mongo-livedata/oplog_observe_driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,6 @@ _.extend(OplogObserveDriver.prototype, {
var thisGeneration = ++self._fetchGeneration;
self._needToFetch = new LocalCollection._IdMap;
var waiting = 0;
var anyError = null;
var fut = new Future;
// This loop is safe, because _currentlyFetching will not be updated
// during this loop (in fact, it is never mutated).
Expand All @@ -435,8 +434,11 @@ _.extend(OplogObserveDriver.prototype, {
finishIfNeedToPollQuery(function (err, doc) {
try {
if (err) {
if (!anyError)
anyError = err;
Meteor._debug("Got exception while fetching documents: " +
err);
if (self._phase !== PHASE.QUERYING) {
self._needToPollQuery();
}
} else if (!self._stopped && self._phase === PHASE.FETCHING
&& self._fetchGeneration === thisGeneration) {
// We re-check the generation in case we've had an explicit
Expand All @@ -456,9 +458,6 @@ _.extend(OplogObserveDriver.prototype, {
}));
});
fut.wait();
// XXX do this even if we've switched to PHASE.QUERYING?
if (anyError)
throw anyError;
// Exit now if we've had a _pollQuery call (here or in another fiber).
if (self._phase === PHASE.QUERYING)
return;
Expand Down Expand Up @@ -599,22 +598,35 @@ _.extend(OplogObserveDriver.prototype, {

_runQuery: function () {
var self = this;
var newResults = new LocalCollection._IdMap;
var newBuffer = new LocalCollection._IdMap;

// Query 2x documents as the half excluded from the original query will go
// into unpublished buffer to reduce additional Mongo lookups in cases when
// documents are removed from the published set and need a replacement.
// XXX needs more thought on non-zero skip
// XXX 2 is a "magic number" meaning there is an extra chunk of docs for
// buffer if such is needed.
var cursor = self._cursorForQuery({ limit: self._limit * 2 });
cursor.forEach(function (doc, i) {
if (!self._limit || i < self._limit)
newResults.set(doc._id, doc);
else
newBuffer.set(doc._id, doc);
});
var newResults, newBuffer;

while (true) {
newResults = new LocalCollection._IdMap;
newBuffer = new LocalCollection._IdMap;

// Query 2x documents as the half excluded from the original query will go
// into unpublished buffer to reduce additional Mongo lookups in cases
// when documents are removed from the published set and need a
// replacement.
// XXX needs more thought on non-zero skip
// XXX 2 is a "magic number" meaning there is an extra chunk of docs for
// buffer if such is needed.
var cursor = self._cursorForQuery({ limit: self._limit * 2 });
try {
cursor.forEach(function (doc, i) {
if (!self._limit || i < self._limit)
newResults.set(doc._id, doc);
else
newBuffer.set(doc._id, doc);
});
break;
} catch (e) {
// During failover (eg) if we get an exception we should log and retry
// instead of crashing.
Meteor._debug("Got exception while polling query: " + e);
Meteor._sleepForMs(100);
}
}

self._publishNewResults(newResults, newBuffer);
},
Expand Down
22 changes: 16 additions & 6 deletions packages/mongo-livedata/oplog_tailing.js
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,22 @@ _.extend(OplogHandle.prototype, {
// be ready.
self._readyFuture.wait();

// We need to make the selector at least as restrictive as the actual
// tailing selector (ie, we need to specify the DB name) or else we might
// find a TS that won't show up in the actual tail stream.
var lastEntry = self._oplogLastEntryConnection.findOne(
OPLOG_COLLECTION, self._baseOplogSelector,
{fields: {ts: 1}, sort: {$natural: -1}});
while (true) {
// We need to make the selector at least as restrictive as the actual
// tailing selector (ie, we need to specify the DB name) or else we might
// find a TS that won't show up in the actual tail stream.
try {
var lastEntry = self._oplogLastEntryConnection.findOne(
OPLOG_COLLECTION, self._baseOplogSelector,
{fields: {ts: 1}, sort: {$natural: -1}});
break;
} catch (e) {
// During failover (eg) if we get an exception we should log and retry
// instead of crashing.
Meteor._debug("Got exception while reading last entry: " + e);
Meteor._sleepForMs(100);
}
}

if (!lastEntry) {
// Really, nothing in the oplog? Well, we've processed everything.
Expand Down

0 comments on commit be4d306

Please sign in to comment.