Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 192 additions & 0 deletions src/dispatcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

// The possible states a dispatcher instance can be in.
const enum DispatcherState {
// The dispatcher has not been initialized yet.
//
// When the dispatcher is in this state it will not enqueue
// more than `maxPreInitQueueSize` tasks.
Uninitialized,
// There are no commands queued and the dispatcher is idle.
Idle,
// The dispatcher is currently processing queued tasks.
Processing,
// The dispatcher is stopped, tasks queued will not be immediatelly processed.
Stopped,
}

// The possible commands to be processed by the dispatcher.
const enum Commands {
// The dispatcher must enqueue a new task.
//
// This command is always followed by a concrete task for the dispatcher to execute.
Task,
// The dispatcher should stop executing the queued tasks.
Stop,
}

// A task the dispatcher knows how to execute.
type Task = () => Promise<void>;

// An executable command.
type Command = {
task: Task,
command: Commands.Task
} | {
command: Commands.Stop
};

/**
* A task dispatcher for async tasks.
*
* Will make sure tasks are execute in order.
*/
class Dispatcher {
// A FIFO queue of tasks to execute.
private queue: Command[];
// The current state of this dispatcher.
private state: DispatcherState;
// A promise contianing the current execution promise.
//
// This is `undefined` in case there is no ongoing execution of tasks.
private currentJob?: Promise<void>;

constructor(readonly maxPreInitQueueSize = 100) {
this.queue = [];
this.state = DispatcherState.Uninitialized;
}

/**
* Gets the oldest command added to the queue.
*
* @returns The oldest command or `undefined` if the queue is empty.
*/
private getNextCommand(): Command | undefined {
return this.queue.shift();
}

/**
* Executes all the commands in the queue, from oldest to newest.
*
* Stops on case a `Stop` command is encountered.
*/
private async execute(): Promise<void> {
let nextCommand = this.getNextCommand();
while(nextCommand) {
if (nextCommand.command === Commands.Stop) {
break;
}

try {
await nextCommand.task();
} catch(e) {
console.error("Error executing task:", e);
}

nextCommand = this.getNextCommand();
}
}

/**
* Triggers the execution of enqueued command
* in case the dispatcher is currently Idle.
*/
private async triggerExecution(): Promise<void> {
if (this.state === DispatcherState.Idle && this.queue.length > 0) {
this.state = DispatcherState.Processing;
this.currentJob = this.execute();
await this.currentJob;
this.currentJob = undefined;
this.state = DispatcherState.Idle;
}
}

/**
* Launches a task on this dispatchers queue.
* Kickstarts the execution of the queue in case it is currently Idle.
*
* # Note
*
* Will not enqueue in case the dispatcher has not been initialized yet and the queues length exceeds `maxPreInitQueueSize`.
*
* @param task The task to enqueue.
*/
launch(task: Task): void {
if (this.state === DispatcherState.Uninitialized) {
if (this.queue.length >= this.maxPreInitQueueSize) {
console.warn("Unable to enqueue task, pre init queue is full.");
return;
}
}

this.queue.push({
task,
command: Commands.Task
});

// Even though triggerExecution is async we don't want to block on it.
// The point of the dispatcher is to execute the async functions
// in a deterministic order without having to wait for them.
this.triggerExecution();
}

/**
* Flushes the tasks enqueued while the dispatcher was uninitialized.
*
* This is a no-op in case the dispatcher is not in an uninitialized state.
*/
flushInit(): void {
if (this.state !== DispatcherState.Uninitialized) {
console.warn("Attempted to initialize the Dispatcher, but it is already initialized. Ignoring.");
return;
}

this.state = DispatcherState.Idle;

// Even though triggerExecution is async we don't want to block on it.
// The point of the dispatcher is to execute the async functions
// in a deterministic order without having to wait for them.
this.triggerExecution();
}

/**
* Stops the current job and clears the tasks queue.
*
* @returns A promise which resolves once the current job
* has been succesfully stopped and the queue was emptied.
*/
async clear(): Promise<void> {
await this.stop();
this.queue = [];
this.state = DispatcherState.Idle;
}

/**
* Sets the state of this dispatcher to "Stopped" and stops any ongoing task processing.
*
* @returns A promise which resolves once the current job
* has been succesfully stopped.
*/
async stop(): Promise<void> {
if (this.state !== DispatcherState.Stopped) {
this.state = DispatcherState.Stopped;
this.queue.unshift({ command: Commands.Stop });
await this.testBlockOnQueue();
}
}

/**
* **Test-Only API**
*
* Returns a promise that resolves once the current task execution in finished.
*
* @returns The promise.
*/
async testBlockOnQueue(): Promise<void> {
return this.currentJob && await this.currentJob;
}
}

export default Dispatcher;
46 changes: 40 additions & 6 deletions src/glean.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { isUndefined, sanitizeApplicationId, validateURL } from "utils";
import { CoreMetrics } from "internal_metrics";
import { Lifetime } from "metrics";
import { DatetimeMetric } from "metrics/types/datetime";
import Dispatcher from "dispatcher";

class Glean {
// The Glean singleton.
Expand All @@ -27,6 +28,8 @@ class Glean {
private _coreMetrics: CoreMetrics;
// The ping uploader.
private _pingUploader: PingUploader
// A task dispatcher to help execute in order asynchronous external API calls.
private _dispatcher: Dispatcher;

// Properties that will only be set on `initialize`.

Expand All @@ -44,6 +47,7 @@ class Glean {
Use Glean.instance instead to access the Glean singleton.`);
}

this._dispatcher = new Dispatcher();
this._pingUploader = new PingUploader();
this._coreMetrics = new CoreMetrics();
this._initialized = false;
Expand Down Expand Up @@ -109,7 +113,12 @@ class Glean {
* This function is only supposed to be called when telemetry is disabled.
*/
private static async clearMetrics(): Promise<void> {
// Stop ongoing uploading jobs and clear pending pings queue.
// Stops any task execution on the dispatcher.
//
// While stopped, the dispatcher will enqueue but won't execute any tasks it receives.
await Glean.dispatcher.stop();

// Stop ongoing upload jobs and clear pending pings queue.
await Glean.pingUploader.clearPendingPingsQueue();

// There is only one metric that we want to survive after clearing all
Expand All @@ -131,11 +140,8 @@ class Glean {
// We need to briefly set upload_enabled to true here so that `set`
// is not a no-op.
//
// Note that we can't provide the same guarantees as glean-core here.
// If by any change another actor attempts to record a metric while
// we are setting the known client id and first run date, they will be allowed to.
//
// TODO: Bug 1687491 might resolve this issue.
// This is safe.
// Since the dispatcher is stopped, no external API calls will be executed.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this nice? The dispatcher actually fixed the issue. Now, it will only really be fixed after my next PR when the recording calls are indeed dispatched.

Glean.uploadEnabled = true;

// Store a "dummy" KNOWN_CLIENT_ID in the client_id metric. This will
Expand All @@ -147,6 +153,9 @@ class Glean {
await Glean.coreMetrics.firstRunDate.set(existingFirstRunDate.date);

Glean.uploadEnabled = false;

// Clear the dispatcher queue.
await Glean.dispatcher.clear();
}

/**
Expand Down Expand Up @@ -198,6 +207,9 @@ class Glean {
await Glean.pingUploader.scanPendingPings();
// Even though this returns a promise, there is no need to block on it returning.
Glean.pingUploader.triggerUpload();

// Signal to the dispatcher that init is complete.
Glean.dispatcher.flushInit();
}

/**
Expand Down Expand Up @@ -227,14 +239,33 @@ class Glean {
return Glean.instance._initialized;
}

/**
* Gets this Glean's instance application id.
*
* @returns The application id or `undefined` in case Glean has not been initialized yet.
*/
static get applicationId(): string | undefined {
return Glean.instance._applicationId;
}

/**
* Gets this Glean's instance server endpoint.
*
* @returns The server endpoint or `undefined` in case Glean has not been initialized yet.
*/
static get serverEndpoint(): string | undefined {
return Glean.instance._serverEndpoint;
}

/**
* Gets this Gleans's instance dispatcher.
*
* @returns The dispatcher instance.
*/
static get dispatcher(): Dispatcher {
return Glean.instance._dispatcher;
}

/**
* Determines whether upload is enabled.
*
Expand Down Expand Up @@ -304,6 +335,9 @@ class Glean {
// Get back to an uninitialized state.
Glean.instance._initialized = false;

// Clear the dispatcher queue.
await Glean.dispatcher.clear();

// Stop ongoing jobs and clear pending pings queue.
await Glean.pingUploader.clearPendingPingsQueue();

Expand Down
Loading