-
Notifications
You must be signed in to change notification settings - Fork 34
Bug 1687491 - Implement a dispatcher and add an instance to the Glean singleton #31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,192 @@ | ||
| /* This Source Code Form is subject to the terms of the Mozilla Public | ||
| * License, v. 2.0. If a copy of the MPL was not distributed with this | ||
| * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ | ||
|
|
||
| // The possible states a dispatcher instance can be in. | ||
| const enum DispatcherState { | ||
| // The dispatcher has not been initialized yet. | ||
| // | ||
| // When the dispatcher is in this state it will not enqueue | ||
| // more than `maxPreInitQueueSize` tasks. | ||
| Uninitialized, | ||
| // There are no commands queued and the dispatcher is idle. | ||
| Idle, | ||
| // The dispatcher is currently processing queued tasks. | ||
| Processing, | ||
| // The dispatcher is stopped, tasks queued will not be immediatelly processed. | ||
| Stopped, | ||
| } | ||
|
|
||
| // The possible commands to be processed by the dispatcher. | ||
| const enum Commands { | ||
| // The dispatcher must enqueue a new task. | ||
| // | ||
| // This command is always followed by a concrete task for the dispatcher to execute. | ||
| Task, | ||
| // The dispatcher should stop executing the queued tasks. | ||
| Stop, | ||
| } | ||
|
|
||
| // A task the dispatcher knows how to execute. | ||
| type Task = () => Promise<void>; | ||
|
|
||
| // An executable command. | ||
| type Command = { | ||
| task: Task, | ||
| command: Commands.Task | ||
| } | { | ||
| command: Commands.Stop | ||
| }; | ||
|
|
||
| /** | ||
| * A task dispatcher for async tasks. | ||
| * | ||
| * Will make sure tasks are execute in order. | ||
| */ | ||
| class Dispatcher { | ||
| // A FIFO queue of tasks to execute. | ||
| private queue: Command[]; | ||
| // The current state of this dispatcher. | ||
| private state: DispatcherState; | ||
| // A promise contianing the current execution promise. | ||
| // | ||
| // This is `undefined` in case there is no ongoing execution of tasks. | ||
| private currentJob?: Promise<void>; | ||
|
|
||
| constructor(readonly maxPreInitQueueSize = 100) { | ||
| this.queue = []; | ||
| this.state = DispatcherState.Uninitialized; | ||
| } | ||
|
|
||
| /** | ||
| * Gets the oldest command added to the queue. | ||
| * | ||
| * @returns The oldest command or `undefined` if the queue is empty. | ||
| */ | ||
| private getNextCommand(): Command | undefined { | ||
| return this.queue.shift(); | ||
| } | ||
|
|
||
| /** | ||
| * Executes all the commands in the queue, from oldest to newest. | ||
| * | ||
| * Stops on case a `Stop` command is encountered. | ||
| */ | ||
| private async execute(): Promise<void> { | ||
| let nextCommand = this.getNextCommand(); | ||
| while(nextCommand) { | ||
| if (nextCommand.command === Commands.Stop) { | ||
| break; | ||
| } | ||
|
|
||
| try { | ||
| await nextCommand.task(); | ||
| } catch(e) { | ||
| console.error("Error executing task:", e); | ||
| } | ||
|
|
||
| nextCommand = this.getNextCommand(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Triggers the execution of enqueued command | ||
| * in case the dispatcher is currently Idle. | ||
| */ | ||
| private async triggerExecution(): Promise<void> { | ||
| if (this.state === DispatcherState.Idle && this.queue.length > 0) { | ||
| this.state = DispatcherState.Processing; | ||
| this.currentJob = this.execute(); | ||
| await this.currentJob; | ||
| this.currentJob = undefined; | ||
| this.state = DispatcherState.Idle; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Launches a task on this dispatchers queue. | ||
| * Kickstarts the execution of the queue in case it is currently Idle. | ||
| * | ||
| * # Note | ||
| * | ||
| * Will not enqueue in case the dispatcher has not been initialized yet and the queues length exceeds `maxPreInitQueueSize`. | ||
| * | ||
| * @param task The task to enqueue. | ||
| */ | ||
| launch(task: Task): void { | ||
| if (this.state === DispatcherState.Uninitialized) { | ||
| if (this.queue.length >= this.maxPreInitQueueSize) { | ||
| console.warn("Unable to enqueue task, pre init queue is full."); | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| this.queue.push({ | ||
| task, | ||
| command: Commands.Task | ||
| }); | ||
|
|
||
| // Even though triggerExecution is async we don't want to block on it. | ||
| // The point of the dispatcher is to execute the async functions | ||
| // in a deterministic order without having to wait for them. | ||
| this.triggerExecution(); | ||
| } | ||
|
|
||
| /** | ||
| * Flushes the tasks enqueued while the dispatcher was uninitialized. | ||
| * | ||
| * This is a no-op in case the dispatcher is not in an uninitialized state. | ||
| */ | ||
| flushInit(): void { | ||
| if (this.state !== DispatcherState.Uninitialized) { | ||
| console.warn("Attempted to initialize the Dispatcher, but it is already initialized. Ignoring."); | ||
| return; | ||
| } | ||
|
|
||
| this.state = DispatcherState.Idle; | ||
|
|
||
| // Even though triggerExecution is async we don't want to block on it. | ||
| // The point of the dispatcher is to execute the async functions | ||
| // in a deterministic order without having to wait for them. | ||
| this.triggerExecution(); | ||
| } | ||
|
|
||
| /** | ||
| * Stops the current job and clears the tasks queue. | ||
| * | ||
| * @returns A promise which resolves once the current job | ||
| * has been succesfully stopped and the queue was emptied. | ||
| */ | ||
| async clear(): Promise<void> { | ||
| await this.stop(); | ||
| this.queue = []; | ||
| this.state = DispatcherState.Idle; | ||
| } | ||
|
|
||
| /** | ||
| * Sets the state of this dispatcher to "Stopped" and stops any ongoing task processing. | ||
| * | ||
| * @returns A promise which resolves once the current job | ||
| * has been succesfully stopped. | ||
| */ | ||
| async stop(): Promise<void> { | ||
| if (this.state !== DispatcherState.Stopped) { | ||
| this.state = DispatcherState.Stopped; | ||
| this.queue.unshift({ command: Commands.Stop }); | ||
| await this.testBlockOnQueue(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * **Test-Only API** | ||
| * | ||
| * Returns a promise that resolves once the current task execution in finished. | ||
| * | ||
| * @returns The promise. | ||
| */ | ||
| async testBlockOnQueue(): Promise<void> { | ||
| return this.currentJob && await this.currentJob; | ||
| } | ||
| } | ||
|
|
||
| export default Dispatcher; | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,7 @@ import { isUndefined, sanitizeApplicationId, validateURL } from "utils"; | |
| import { CoreMetrics } from "internal_metrics"; | ||
| import { Lifetime } from "metrics"; | ||
| import { DatetimeMetric } from "metrics/types/datetime"; | ||
| import Dispatcher from "dispatcher"; | ||
|
|
||
| class Glean { | ||
| // The Glean singleton. | ||
|
|
@@ -27,6 +28,8 @@ class Glean { | |
| private _coreMetrics: CoreMetrics; | ||
| // The ping uploader. | ||
| private _pingUploader: PingUploader | ||
| // A task dispatcher to help execute in order asynchronous external API calls. | ||
| private _dispatcher: Dispatcher; | ||
|
|
||
| // Properties that will only be set on `initialize`. | ||
|
|
||
|
|
@@ -44,6 +47,7 @@ class Glean { | |
| Use Glean.instance instead to access the Glean singleton.`); | ||
| } | ||
|
|
||
| this._dispatcher = new Dispatcher(); | ||
| this._pingUploader = new PingUploader(); | ||
| this._coreMetrics = new CoreMetrics(); | ||
| this._initialized = false; | ||
|
|
@@ -109,7 +113,12 @@ class Glean { | |
| * This function is only supposed to be called when telemetry is disabled. | ||
| */ | ||
| private static async clearMetrics(): Promise<void> { | ||
| // Stop ongoing uploading jobs and clear pending pings queue. | ||
| // Stops any task execution on the dispatcher. | ||
| // | ||
| // While stopped, the dispatcher will enqueue but won't execute any tasks it receives. | ||
| await Glean.dispatcher.stop(); | ||
|
|
||
| // Stop ongoing upload jobs and clear pending pings queue. | ||
| await Glean.pingUploader.clearPendingPingsQueue(); | ||
|
|
||
| // There is only one metric that we want to survive after clearing all | ||
|
|
@@ -131,11 +140,8 @@ class Glean { | |
| // We need to briefly set upload_enabled to true here so that `set` | ||
| // is not a no-op. | ||
| // | ||
| // Note that we can't provide the same guarantees as glean-core here. | ||
| // If by any change another actor attempts to record a metric while | ||
| // we are setting the known client id and first run date, they will be allowed to. | ||
| // | ||
| // TODO: Bug 1687491 might resolve this issue. | ||
| // This is safe. | ||
| // Since the dispatcher is stopped, no external API calls will be executed. | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this nice? The dispatcher actually fixed the issue. Now, it will only really be fixed after my next PR when the recording calls are indeed dispatched. |
||
| Glean.uploadEnabled = true; | ||
|
|
||
| // Store a "dummy" KNOWN_CLIENT_ID in the client_id metric. This will | ||
|
|
@@ -147,6 +153,9 @@ class Glean { | |
| await Glean.coreMetrics.firstRunDate.set(existingFirstRunDate.date); | ||
|
|
||
| Glean.uploadEnabled = false; | ||
|
|
||
| // Clear the dispatcher queue. | ||
| await Glean.dispatcher.clear(); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -198,6 +207,9 @@ class Glean { | |
| await Glean.pingUploader.scanPendingPings(); | ||
| // Even though this returns a promise, there is no need to block on it returning. | ||
| Glean.pingUploader.triggerUpload(); | ||
|
|
||
| // Signal to the dispatcher that init is complete. | ||
| Glean.dispatcher.flushInit(); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -227,14 +239,33 @@ class Glean { | |
| return Glean.instance._initialized; | ||
| } | ||
|
|
||
| /** | ||
| * Gets this Glean's instance application id. | ||
| * | ||
| * @returns The application id or `undefined` in case Glean has not been initialized yet. | ||
| */ | ||
| static get applicationId(): string | undefined { | ||
| return Glean.instance._applicationId; | ||
| } | ||
|
|
||
| /** | ||
| * Gets this Glean's instance server endpoint. | ||
| * | ||
| * @returns The server endpoint or `undefined` in case Glean has not been initialized yet. | ||
| */ | ||
| static get serverEndpoint(): string | undefined { | ||
| return Glean.instance._serverEndpoint; | ||
| } | ||
|
|
||
| /** | ||
| * Gets this Gleans's instance dispatcher. | ||
| * | ||
| * @returns The dispatcher instance. | ||
| */ | ||
| static get dispatcher(): Dispatcher { | ||
| return Glean.instance._dispatcher; | ||
| } | ||
|
|
||
| /** | ||
| * Determines whether upload is enabled. | ||
| * | ||
|
|
@@ -304,6 +335,9 @@ class Glean { | |
| // Get back to an uninitialized state. | ||
| Glean.instance._initialized = false; | ||
|
|
||
| // Clear the dispatcher queue. | ||
| await Glean.dispatcher.clear(); | ||
brizental marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // Stop ongoing jobs and clear pending pings queue. | ||
| await Glean.pingUploader.clearPendingPingsQueue(); | ||
|
|
||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.