Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

[Full changelog](https://github.com/mozilla/glean.js/compare/v0.17.0...main)

* [#542](https://github.com/mozilla/glean.js/pull/542): Implement `shutdown` API.

# v0.17.0 (2021-07-16)

[Full changelog](https://github.com/mozilla/glean.js/compare/v0.16.0...v0.17.0)
Expand Down
59 changes: 53 additions & 6 deletions glean/src/core/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ export const enum DispatcherState {
Processing,
// The dispatcher is stopped, tasks queued will not be immediatelly processed.
Stopped,
// The dispatcher is shutdown, attempting to queue tasks while in this state is a no-op.
//
// This state is irreversible.
Shutdown,
}

// The possible commands to be processed by the dispatcher.
Expand All @@ -31,6 +35,8 @@ const enum Commands {
Stop,
// The dispatcher should stop executing the queued tasks and clear the queue.
Clear,
// The dispatcher will clear the queue and go into the Shutdown state.
Shutdown,
// Exactly like a normal Task, but spawned for tests.
TestTask,
}
Expand Down Expand Up @@ -65,7 +71,7 @@ class Dispatcher {
// This is `undefined` in case there is no ongoing execution of tasks.
private currentJob?: Promise<void>;

constructor(readonly maxPreInitQueueSize = 100) {
constructor(readonly maxPreInitQueueSize = 100, readonly logTag = LOG_TAG) {
this.queue = [];
this.state = DispatcherState.Uninitialized;
}
Expand All @@ -88,7 +94,7 @@ class Dispatcher {
try {
await task();
} catch(e) {
log(LOG_TAG, ["Error executing task:", e], LoggingLevel.Error);
log(this.logTag, ["Error executing task:", e], LoggingLevel.Error);
}
}

Expand All @@ -102,6 +108,7 @@ class Dispatcher {
case(Commands.Stop):
this.state = DispatcherState.Stopped;
return;
case(Commands.Shutdown):
case(Commands.Clear):
// Unblock test resolvers before clearing the queue.
this.queue.forEach(c => {
Expand All @@ -111,7 +118,12 @@ class Dispatcher {
});

this.queue = [];
this.state = DispatcherState.Stopped;
if (nextCommand.command === Commands.Clear) {
this.state = DispatcherState.Stopped;
} else {
this.state = DispatcherState.Shutdown;
}

return;
case (Commands.TestTask):
await this.executeTask(nextCommand.task);
Expand Down Expand Up @@ -149,7 +161,7 @@ class Dispatcher {
})
.catch(error => {
log(
LOG_TAG,
this.logTag,
[
"IMPOSSIBLE: Something went wrong while the dispatcher was executing the tasks queue.",
error
Expand All @@ -175,10 +187,19 @@ class Dispatcher {
* @returns Wheter or not the task was queued.
*/
private launchInternal(command: Command, priorityTask = false): boolean {
if (this.state === DispatcherState.Shutdown) {
log(
this.logTag,
"Attempted to enqueue a new task but the dispatcher is shutdown. Ignoring.",
LoggingLevel.Warn
);
return false;
}

if (!priorityTask && this.state === DispatcherState.Uninitialized) {
if (this.queue.length >= this.maxPreInitQueueSize) {
log(
LOG_TAG,
this.logTag,
"Unable to enqueue task, pre init queue is full.",
LoggingLevel.Warn
);
Expand Down Expand Up @@ -226,7 +247,7 @@ class Dispatcher {
flushInit(task?: Task): void {
if (this.state !== DispatcherState.Uninitialized) {
log(
LOG_TAG,
this.logTag,
"Attempted to initialize the Dispatcher, but it is already initialized. Ignoring.",
LoggingLevel.Warn
);
Expand Down Expand Up @@ -286,6 +307,27 @@ class Dispatcher {
}
}

/**
* Shutsdown the dispatcher.
*
* 1. Executes all tasks launched prior to this one.
* 2. Clears the queue of any tasks launched after this one.
* 3. Puts the dispatcher in the `Shutdown` state.
*
* # Note
*
* - This is a command like any other, if the dispatcher is uninitialized
* it will get executed when the dispatcher is initialized.
* - If the dispatcher is stopped, it is resumed and all pending tasks are executed.
*
* @returns A promise which resolves once shutdown is complete.
*/
shutdown(): Promise<void> {
this.launchInternal({ command: Commands.Shutdown });
this.resume();
return this.currentJob || Promise.resolve();
}

/**
* Test-Only API**
*
Expand Down Expand Up @@ -330,6 +372,11 @@ class Dispatcher {
*
* This is important in order not to hang forever in case the dispatcher is stopped.
*
* # Errors
*
* This function will reject in case the task is not launched.
* Make sure the dispatcher is initialized or is not shutdown in these cases.
*
* @param task The task to launch.
* @returns A promise which only resolves once the task is done being executed
* or is guaranteed to not be executed ever i.e. if the queue gets cleared.
Expand Down
34 changes: 26 additions & 8 deletions glean/src/core/glean.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class Glean {
*/
private static async clearMetrics(): Promise<void> {
// Stop ongoing upload jobs and clear pending pings queue.
await Glean.pingUploader.clearPendingPingsQueue();
Glean.pingUploader.clearPendingPingsQueue();

// There is only one metric that we want to survive after clearing all
// metrics: first_run_date. Here, we store its value
Expand Down Expand Up @@ -281,11 +281,6 @@ class Glean {
}

await Context.pingsDatabase.scanPendingPings();

// Even though this returns a promise, there is no need to block on it returning.
//
// On the contrary we _want_ the uploading tasks to be executed async.
void Glean.pingUploader.triggerUpload();
});
}

Expand Down Expand Up @@ -418,6 +413,27 @@ class Glean {
});
}

/**
* Finishes executing all pending tasks
* and shuts down both Glean's dispatcher and the ping uploader.
*
* # Important
*
* This is irreversible.
* Only a restart will return Glean back to an idle state.
*
* @returns A promise which resolves once the shutdown is complete.
*/
static async shutdown(): Promise<void> {
// Order here matters!
//
// The main dispatcher needs to be shut down first,
// because some of its tasks may enqueue new tasks on the ping uploader dispatcher
// and we want these uploading tasks to also be executed prior to complete shutdown.
await Context.dispatcher.shutdown();
await Glean.pingUploader.shutdown();
}

/**
* Sets the current environment.
*
Expand Down Expand Up @@ -491,11 +507,13 @@ class Glean {
// Deregister all plugins
testResetEvents();

// Stop ongoing jobs and clear pending pings queue.
// Await ongoing jobs and clear pending pings queue.
if (Glean.pingUploader) {
await Glean.pingUploader.testBlockOnPingsQueue();

// The first time tests run, before Glean is initialized, we are
// not guaranteed to have an uploader. Account for this.
await Glean.pingUploader.clearPendingPingsQueue();
Glean.pingUploader.clearPendingPingsQueue();
}
}

Expand Down
Loading