-
Notifications
You must be signed in to change notification settings - Fork 2.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve (and shorten) query polling implementation. #4243
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
This implementation has the following benefits: - It collapses the QueryScheduler abstraction into the QueryManager (which was always ultimately responsible for managing the lifetime of polling timers), thus simplifying the relationship between the QueryManager and its ObservableQuery objects. - It's about 100 bytes smaller than the previous implementation, after minification and gzip. - It uses setTimeout rather than setInterval, so event loop starvation never leads to a rapid succession of setInterval catch-up calls. - It guarantees at most one timeout will be pending for an arbitrary number of polling queries, rather than a separate timer for every distinct polling interval. - Fewer independent timers means better batching behavior, usually. - Though there may be a delay between the desired polling time for a given query and the actual polling time, the delay is never greater than the minimum polling interval across all queries, which changes dynamically as polling queries are started and stopped.
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,8 +13,6 @@ import { | |
hasDirectives, | ||
} from 'apollo-utilities'; | ||
|
||
import { QueryScheduler } from '../scheduler/scheduler'; | ||
|
||
import { isApolloError, ApolloError } from '../errors/ApolloError'; | ||
|
||
import { Observer, Subscription, Observable } from '../util/Observable'; | ||
|
@@ -54,7 +52,6 @@ export interface QueryInfo { | |
} | ||
|
||
export class QueryManager<TStore> { | ||
public scheduler: QueryScheduler<TStore>; | ||
public link: ApolloLink; | ||
public mutationStore: MutationStore = new MutationStore(); | ||
public queryStore: QueryStore = new QueryStore(); | ||
|
@@ -66,6 +63,8 @@ export class QueryManager<TStore> { | |
|
||
private onBroadcast: () => void; | ||
|
||
private ssrMode: boolean; | ||
|
||
// let's not start at zero to avoid pain with bad checks | ||
private idCounter = 1; | ||
|
||
|
@@ -104,7 +103,7 @@ export class QueryManager<TStore> { | |
this.dataStore = store; | ||
this.onBroadcast = onBroadcast; | ||
this.clientAwareness = clientAwareness; | ||
this.scheduler = new QueryScheduler({ queryManager: this, ssrMode }); | ||
this.ssrMode = ssrMode; | ||
} | ||
|
||
public mutate<T>({ | ||
|
@@ -662,7 +661,7 @@ export class QueryManager<TStore> { | |
let transformedOptions = { ...options } as WatchQueryOptions; | ||
|
||
return new ObservableQuery<T>({ | ||
scheduler: this.scheduler, | ||
queryManager: this, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I hope it's safe to change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it should be safe. We are exporting |
||
options: transformedOptions, | ||
shouldSubscribe: shouldSubscribe, | ||
}); | ||
|
@@ -1262,4 +1261,133 @@ export class QueryManager<TStore> { | |
}, | ||
}; | ||
} | ||
|
||
public checkInFlight(queryId: string) { | ||
const query = this.queryStore.get(queryId); | ||
|
||
return ( | ||
query && | ||
query.networkStatus !== NetworkStatus.ready && | ||
query.networkStatus !== NetworkStatus.error | ||
); | ||
} | ||
|
||
// Map from client ID to { interval, options }. | ||
public pollingInfoByQueryId = new Map<string, { | ||
interval: number; | ||
lastPollTimeMs: number; | ||
options: WatchQueryOptions; | ||
}>(); | ||
|
||
private nextPoll: { | ||
time: number; | ||
timeout: NodeJS.Timeout; | ||
} | null = null; | ||
|
||
public startPollingQuery( | ||
options: WatchQueryOptions, | ||
queryId: string, | ||
listener?: QueryListener, | ||
): string { | ||
const { pollInterval } = options; | ||
|
||
if (!pollInterval) { | ||
throw new Error( | ||
'Attempted to start a polling query without a polling interval.', | ||
); | ||
} | ||
|
||
// Do not poll in SSR mode | ||
if (!this.ssrMode) { | ||
this.pollingInfoByQueryId.set(queryId, { | ||
interval: pollInterval, | ||
// Avoid polling until at least pollInterval milliseconds from now. | ||
// The -10 is a fudge factor to help with tests that rely on simulated | ||
// timeouts via jest.runTimersToTime. | ||
lastPollTimeMs: Date.now() - 10, | ||
options: { | ||
...options, | ||
fetchPolicy: 'network-only', | ||
}, | ||
}); | ||
|
||
if (listener) { | ||
this.addQueryListener(queryId, listener); | ||
} | ||
|
||
this.schedulePoll(pollInterval); | ||
} | ||
|
||
return queryId; | ||
} | ||
|
||
public stopPollingQuery(queryId: string) { | ||
// Since the master polling interval dynamically adjusts to the contents of | ||
// this.pollingInfoByQueryId, stopping a query from polling is as easy as | ||
// removing it from the map. | ||
this.pollingInfoByQueryId.delete(queryId); | ||
} | ||
|
||
// Calling this method ensures a poll will happen within the specified time | ||
// limit, canceling any pending polls that would not happen in time. | ||
private schedulePoll(timeLimitMs: number) { | ||
const now = Date.now(); | ||
|
||
if (this.nextPoll) { | ||
if (timeLimitMs < this.nextPoll.time - now) { | ||
// The next poll will happen too far in the future, so cancel it, and | ||
// fall through to scheduling a new timeout. | ||
clearTimeout(this.nextPoll.timeout); | ||
} else { | ||
// The next poll will happen within timeLimitMs, so all is well. | ||
return; | ||
} | ||
} | ||
|
||
this.nextPoll = { | ||
// Estimated time when the timeout will fire. | ||
time: now + timeLimitMs, | ||
|
||
timeout: setTimeout(() => { | ||
this.nextPoll = null; | ||
let nextTimeLimitMs = Infinity; | ||
|
||
this.pollingInfoByQueryId.forEach((info, queryId) => { | ||
// Pick next timeout according to current minimum interval. | ||
if (info.interval < nextTimeLimitMs) { | ||
nextTimeLimitMs = info.interval; | ||
} | ||
|
||
if (!this.checkInFlight(queryId)) { | ||
// If this query was last polled more than interval milliseconds | ||
// ago, poll it now. Note that there may be a small delay between | ||
// the desired polling time and the actual polling time (equal to | ||
// at most the minimum polling interval across all queries), but | ||
// that's the tradeoff to batching polling intervals. | ||
if (Date.now() - info.lastPollTimeMs >= info.interval) { | ||
const updateLastPollTime = () => { | ||
info.lastPollTimeMs = Date.now(); | ||
}; | ||
this.fetchQuery(queryId, info.options, FetchType.poll).then( | ||
// Set info.lastPollTimeMs after the fetch completes, whether | ||
// or not it succeeded. Promise.prototype.finally would be nice | ||
// here, but we don't have a polyfill for that at the moment, | ||
// and this code has historically silenced errors, which is not | ||
// the behavior of .finally(updateLastPollTime). | ||
updateLastPollTime, | ||
updateLastPollTime | ||
); | ||
} | ||
} | ||
}); | ||
|
||
// If there were no entries in this.pollingInfoByQueryId, then | ||
// nextTimeLimitMs will still be Infinity, so this.schedulePoll will | ||
// not be called, thus ending the master polling interval. | ||
if (isFinite(nextTimeLimitMs)) { | ||
this.schedulePoll(nextTimeLimitMs); | ||
} | ||
}, timeLimitMs), | ||
}; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to keep track of
isCurrentlyPolling
here inObservableQuery
, because theQueryManager
tracks that information inpollingInfoByQueryId
, and stopping is no longer required before restarting.