-
Notifications
You must be signed in to change notification settings - Fork 34
Bug 1687491 - Dispatch all external API calls #40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
51d7652
2a58cb9
b38029e
16507cb
f131e10
8989ca6
023f770
03140c6
fedf266
f51d872
38233ca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,8 @@ | |
| * License, v. 2.0. If a copy of the MPL was not distributed with this | ||
| * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ | ||
|
|
||
| import { generateUUIDv4 } from "utils"; | ||
|
|
||
| // The possible states a dispatcher instance can be in. | ||
| export const enum DispatcherState { | ||
| // The dispatcher has not been initialized yet. | ||
|
|
@@ -34,12 +36,29 @@ type Task = () => Promise<void>; | |
|
|
||
| // An executable command. | ||
| type Command = { | ||
| testId?: string, | ||
| task: Task, | ||
| command: Commands.Task, | ||
| } | { | ||
| command: Exclude<Commands, Commands.Task>, | ||
| }; | ||
|
|
||
| /** | ||
| * An observer of the commands being executed by the dispatcher. | ||
| */ | ||
| class DispatcherObserver { | ||
| constructor(private cb: (command: Command) => void) {} | ||
|
|
||
| /** | ||
| * Updates an observer when the dispatcher finishes executing a command. | ||
| * | ||
| * @param command The command that was just executed by the dispatcher. | ||
| */ | ||
| update(command: Command): void { | ||
| this.cb(command); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * A task dispatcher for async tasks. | ||
| * | ||
|
|
@@ -55,11 +74,56 @@ class Dispatcher { | |
| // This is `undefined` in case there is no ongoing execution of tasks. | ||
| private currentJob?: Promise<void>; | ||
|
|
||
| // Observers that are notified about every executed command in this dispacther. | ||
| // | ||
| // This is private, because we only expect `testLaunch` to attach observers as of yet. | ||
| private observers: DispatcherObserver[]; | ||
|
|
||
| constructor(readonly maxPreInitQueueSize = 100) { | ||
| this.observers = []; | ||
| this.queue = []; | ||
| this.state = DispatcherState.Uninitialized; | ||
| } | ||
|
|
||
| /** | ||
| * Attaches an observer that will be notified about state changes on the Dispatcher. | ||
| * | ||
| * # Note | ||
| * | ||
| * This is a private function because only the `testLaunch` function | ||
| * is expected to watch the state of the Dispatcher as of yet. | ||
| * | ||
| * @param observer The observer to attach. | ||
| */ | ||
| private attachObserver(observer: DispatcherObserver): void { | ||
| this.observers.push(observer); | ||
| } | ||
|
|
||
| /** | ||
| * Un-attaches an observer that will be notified about state changes on the Dispatcher. | ||
| * | ||
| * # Note | ||
| * | ||
| * This is a private function because only the `testLaunch` function | ||
| * is expected to watch the state of the Dispatcher as of yet. | ||
| * | ||
| * @param observer The observer to attach. | ||
| */ | ||
| private unattachObserver(observer: DispatcherObserver): void { | ||
| this.observers = this.observers.filter(curr => observer !== curr); | ||
| } | ||
|
|
||
| /** | ||
| * Notify any currently attached observer that a new command was executed. | ||
| * | ||
| * @param command The command to notify about | ||
| */ | ||
| private notifyObservers(command: Command): void { | ||
| for (const observer of this.observers) { | ||
| observer.update(command); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Gets the oldest command added to the queue. | ||
| * | ||
|
|
@@ -91,13 +155,16 @@ class Dispatcher { | |
| switch(nextCommand.command) { | ||
| case(Commands.Stop): | ||
| this.state = DispatcherState.Stopped; | ||
| this.notifyObservers(nextCommand); | ||
| return; | ||
| case(Commands.Clear): | ||
| this.queue = []; | ||
| this.state = DispatcherState.Stopped; | ||
| this.notifyObservers(nextCommand); | ||
| return; | ||
| case(Commands.Task): | ||
| await this.executeTask(nextCommand.task); | ||
| this.notifyObservers(nextCommand); | ||
| nextCommand = this.getNextCommand(); | ||
| } | ||
| } | ||
|
|
@@ -118,12 +185,19 @@ class Dispatcher { | |
| // That should be avoided as much as possible, | ||
| // because it will cause a deadlock in case you wait inside a task | ||
| // that was launched inside another task. | ||
| this.currentJob.then(() => { | ||
| this.currentJob = undefined; | ||
| if (this.state === DispatcherState.Processing) { | ||
| this.state = DispatcherState.Idle; | ||
| } | ||
| }); | ||
| this.currentJob | ||
| .then(() => { | ||
| this.currentJob = undefined; | ||
| if (this.state === DispatcherState.Processing) { | ||
| this.state = DispatcherState.Idle; | ||
| } | ||
| }) | ||
| .catch(error => { | ||
| console.error( | ||
| "IMPOSSIBLE: Something went wrong while the dispatcher was executing the tasks queue.", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bets on when we'll see this message?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no bets, unless we change the |
||
| error | ||
| ); | ||
| }); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -139,12 +213,14 @@ class Dispatcher { | |
| * @param command The command to enqueue. | ||
| * @param priorityTask Whether or not this task is a priority task | ||
| * and should be enqueued at the front of the queue. | ||
| * | ||
| * @returns Wheter or not the task was queued. | ||
| */ | ||
| private launchInternal(command: Command, priorityTask = false): void { | ||
| private launchInternal(command: Command, priorityTask = false): boolean { | ||
| if (!priorityTask && this.state === DispatcherState.Uninitialized) { | ||
| if (this.queue.length >= this.maxPreInitQueueSize) { | ||
| console.warn("Unable to enqueue task, pre init queue is full."); | ||
| return; | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -155,6 +231,8 @@ class Dispatcher { | |
| } | ||
|
|
||
| this.triggerExecution(); | ||
|
|
||
| return true; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -180,13 +258,22 @@ class Dispatcher { | |
| * Flushes the tasks enqueued while the dispatcher was uninitialized. | ||
| * | ||
| * This is a no-op in case the dispatcher is not in an uninitialized state. | ||
| * | ||
| * @param task Optional task to execute before any of the tasks enqueued prior to init. | ||
| */ | ||
| flushInit(): void { | ||
| flushInit(task?: Task): void { | ||
| if (this.state !== DispatcherState.Uninitialized) { | ||
| console.warn("Attempted to initialize the Dispatcher, but it is already initialized. Ignoring."); | ||
| return; | ||
| } | ||
|
|
||
| if (task) { | ||
| this.launchInternal({ | ||
| task, | ||
| command: Commands.Task | ||
| }, true); | ||
| } | ||
|
|
||
| this.state = DispatcherState.Idle; | ||
| this.triggerExecution(); | ||
| } | ||
|
|
@@ -238,7 +325,7 @@ class Dispatcher { | |
| * | ||
| * Returns a promise that resolves once the current task execution in finished. | ||
| * | ||
| * Use this with caution. | ||
| * Use this with caution. | ||
| * If called inside a task launched inside another task, it will cause a deadlock. | ||
| * | ||
| * @returns The promise. | ||
|
|
@@ -260,6 +347,72 @@ class Dispatcher { | |
| await this.testBlockOnQueue(); | ||
| this.state = DispatcherState.Uninitialized; | ||
| } | ||
|
|
||
| /** | ||
| * Launches a task in test mode. | ||
| * | ||
| * # Note | ||
| * | ||
| * This function will resume the execution of tasks if the dispatcher was stopped | ||
| * and return the dispatcher back to an idle state. | ||
| * | ||
| * This is important in order not to hang forever in case the dispatcher is stopped. | ||
| * | ||
| * @param task The task to launch. | ||
| * | ||
| * @returns A promise which only resolves once the task is done being executed | ||
| * or is guaranteed to not be executed ever i.e. if the queue gets cleared. | ||
| * This promise is rejected if the dispatcher takes more than 1s | ||
| * to execute the current task i.e. if the dispatcher is uninitialized. | ||
| */ | ||
| testLaunch(task: Task): Promise<void> { | ||
brizental marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| const testId = generateUUIDv4(); | ||
| console.info("Launching a test task.", testId); | ||
|
|
||
| this.resume(); | ||
| const wasLaunched = this.launchInternal({ | ||
| testId, | ||
| task, | ||
| command: Commands.Task | ||
| }); | ||
|
|
||
| if (!wasLaunched) { | ||
| return Promise.reject(); | ||
| } | ||
|
|
||
| // This promise will resolve: | ||
| // | ||
| // - If the dispatcher gets a Clear command; | ||
| // - If a task with `testId` is executed; | ||
| // | ||
| // This promise will reject if: | ||
| // | ||
| // - If we wait for this task to be execute for more than 1s. | ||
| // This is to attend to the case where the dispatcher is Stopped | ||
| // and the task takes to long to be executed. | ||
| return new Promise((resolve, reject) => { | ||
| const observer = new DispatcherObserver((command: Command) => { | ||
| const isCurrentTask = (command.command === Commands.Task && command.testId === testId); | ||
| if (isCurrentTask || command.command === Commands.Clear) { | ||
| this.unattachObserver(observer); | ||
| clearTimeout(timeout); | ||
| resolve(); | ||
| } | ||
| }); | ||
|
|
||
| const timeout = setTimeout(() => { | ||
| console.error( | ||
| `Test task ${testId} took to long to execute.`, | ||
| "Please check if the dispatcher was initialized and is not stopped.", | ||
| "Bailing out." | ||
| ); | ||
| this.unattachObserver(observer); | ||
| reject(); | ||
| }, 1000); | ||
|
|
||
| this.attachObserver(observer); | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| export default Dispatcher; | ||
Uh oh!
There was an error while loading. Please reload this page.