diff --git a/packages/mongo-livedata/polling_observe_driver.js b/packages/mongo-livedata/polling_observe_driver.js index c1c700d0c19..9bfca7ec44a 100644 --- a/packages/mongo-livedata/polling_observe_driver.js +++ b/packages/mongo-livedata/polling_observe_driver.js @@ -126,10 +126,11 @@ _.extend(PollingObserveDriver.prototype, { --self._pollsScheduledButNotStarted; var first = false; - if (!self._results) { + var oldResults = self._results; + if (!oldResults) { first = true; // XXX maybe use OrderedDict instead? - self._results = self._ordered ? [] : new LocalCollection._IdMap; + oldResults = self._ordered ? [] : new LocalCollection._IdMap; } self._testOnlyPollCallback && self._testOnlyPollCallback(); @@ -138,25 +139,39 @@ _.extend(PollingObserveDriver.prototype, { var writesForCycle = self._pendingWrites; self._pendingWrites = []; - // Get the new query results. (These calls can yield.) - if (!first) - self._synchronousCursor.rewind(); - var newResults = self._synchronousCursor.getRawObjects(self._ordered); - var oldResults = self._results; + // Always rewind the cursor; it's a no-op the first time, but better safe + // than sorry (eg, if the first call to getRawObjects throws, the cursor + // needs rewinding even though 'first' is true). + self._synchronousCursor.rewind(); + + // Get the new query results. (This yields.) + try { + var newResults = self._synchronousCursor.getRawObjects(self._ordered); + } catch (e) { + // getRawObjects can throw if we're having trouble talking to the + // database. That's fine --- we will repoll later anyway. But we should + // make sure not to lose track of this cycle's writes. + Array.prototype.push.apply(self._pendingWrites, writesForCycle); + throw e; + } - // Run diffs. (This can yield too.) + // Run diffs. if (!self._stopped) { LocalCollection._diffQueryChanges( self._ordered, oldResults, newResults, self._multiplexer); } - // Replace self._results atomically. - self._results = newResults; - - // Signals the multiplexer to call all initial adds. + // Signals the multiplexer to allow all observeChanges calls that share this + // multiplexer to return. (This happens asynchronously, via the + // multiplexer's queue.) if (first) self._multiplexer.ready(); + // Replace self._results atomically. (This assignment is what makes `first` + // stay through on the next cycle, so we've waited until after we've + // committed to ready-ing the multiplexer.) + self._results = newResults; + // Once the ObserveMultiplexer has processed everything we've done in this // round, mark all the writes which existed before this call as // commmitted. (If new writes have shown up in the meantime, there'll