Skip to content

Commit

Permalink
Fix PollingObserveDriver error handling
Browse files Browse the repository at this point in the history
The getRawObjects call can throw (eg, if you can't connect to the mongo
server for too long).  A few pieces of state were being corrupted in
that case:

- self._results was being set too early, leading to 'first' not being
  set on future _pollMongo calls, and_multiplexer.ready() never being
  called. This had two effects:

    - The observe (and thus any subscription) would never become
      ready().  Due to deduping, *no observe on this query* would
      ever become ready either.  This also implies that the
      observeChanges that are part of _publishCursor would never return,
      so the sub.onStop would never get called, so the observeHandle
      would never stop, leading not only to leaks, but for an inability
      for that query to ever stop being deduped with the corrupted
      PollingObserveDriver!

    - The onFlush calls would throw a "not ready" error instead of
      calling the callback, so (a) errors would be logged and (b) write
      fences would never be closed

  Fixed this by not writing to self._results at the top of the function.

- writesForCycle was being lost, so those write fences would never
  close. Fixed this by pushing writesForCycle back onto _pendingWrites
  if getRawObjects throws.
  • Loading branch information
glasser committed May 17, 2014
1 parent a3d71ca commit 78b280e
Showing 1 changed file with 27 additions and 12 deletions.
39 changes: 27 additions & 12 deletions packages/mongo-livedata/polling_observe_driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,11 @@ _.extend(PollingObserveDriver.prototype, {
--self._pollsScheduledButNotStarted;

var first = false;
if (!self._results) {
var oldResults = self._results;
if (!oldResults) {
first = true;
// XXX maybe use OrderedDict instead?
self._results = self._ordered ? [] : new LocalCollection._IdMap;
oldResults = self._ordered ? [] : new LocalCollection._IdMap;
}

self._testOnlyPollCallback && self._testOnlyPollCallback();
Expand All @@ -138,25 +139,39 @@ _.extend(PollingObserveDriver.prototype, {
var writesForCycle = self._pendingWrites;
self._pendingWrites = [];

// Get the new query results. (These calls can yield.)
if (!first)
self._synchronousCursor.rewind();
var newResults = self._synchronousCursor.getRawObjects(self._ordered);
var oldResults = self._results;
// Always rewind the cursor; it's a no-op the first time, but better safe
// than sorry (eg, if the first call to getRawObjects throws, the cursor
// needs rewinding even though 'first' is true).
self._synchronousCursor.rewind();

// Get the new query results. (This yields.)
try {
var newResults = self._synchronousCursor.getRawObjects(self._ordered);
} catch (e) {
// getRawObjects can throw if we're having trouble talking to the
// database. That's fine --- we will repoll later anyway. But we should
// make sure not to lose track of this cycle's writes.
Array.prototype.push.apply(self._pendingWrites, writesForCycle);
throw e;
}

// Run diffs. (This can yield too.)
// Run diffs.
if (!self._stopped) {
LocalCollection._diffQueryChanges(
self._ordered, oldResults, newResults, self._multiplexer);
}

// Replace self._results atomically.
self._results = newResults;

// Signals the multiplexer to call all initial adds.
// Signals the multiplexer to allow all observeChanges calls that share this
// multiplexer to return. (This happens asynchronously, via the
// multiplexer's queue.)
if (first)
self._multiplexer.ready();

// Replace self._results atomically. (This assignment is what makes `first`
// stay through on the next cycle, so we've waited until after we've
// committed to ready-ing the multiplexer.)
self._results = newResults;

// Once the ObserveMultiplexer has processed everything we've done in this
// round, mark all the writes which existed before this call as
// commmitted. (If new writes have shown up in the meantime, there'll
Expand Down

0 comments on commit 78b280e

Please sign in to comment.