Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 27 additions & 112 deletions glean/src/core/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

import { generateUUIDv4 } from "./utils.js";

// The possible states a dispatcher instance can be in.
export const enum DispatcherState {
// The dispatcher has not been initialized yet.
Expand All @@ -29,36 +27,25 @@ const enum Commands {
Stop,
// The dispatcher should stop executing the queued tasks and clear the queue.
Clear,
// Exactly like a normal Task, but spawned for tests.
TestTask,
}

// A task the dispatcher knows how to execute.
type Task = () => Promise<void>;

// An executable command.
type Command = {
testId?: string,
task: Task,
command: Commands.Task,
} | {
command: Exclude<Commands, Commands.Task>,
resolver: (value: void | PromiseLike<void>) => void,
task: Task,
command: Commands.TestTask,
} | {
command: Exclude<Commands, Commands.Task | Commands.TestTask>,
};

/**
* An observer of the commands being executed by the dispatcher.
*/
class DispatcherObserver {
constructor(private cb: (command: Command) => void) {}

/**
* Updates an observer when the dispatcher finishes executing a command.
*
* @param command The command that was just executed by the dispatcher.
*/
update(command: Command): void {
this.cb(command);
}
}

/**
* A task dispatcher for async tasks.
*
Expand All @@ -74,56 +61,11 @@ class Dispatcher {
// This is `undefined` in case there is no ongoing execution of tasks.
private currentJob?: Promise<void>;

// Observers that are notified about every executed command in this dispacther.
//
// This is private, because we only expect `testLaunch` to attach observers as of yet.
private observers: DispatcherObserver[];

constructor(readonly maxPreInitQueueSize = 100) {
this.observers = [];
this.queue = [];
this.state = DispatcherState.Uninitialized;
}

/**
* Attaches an observer that will be notified about state changes on the Dispatcher.
*
* # Note
*
* This is a private function because only the `testLaunch` function
* is expected to watch the state of the Dispatcher as of yet.
*
* @param observer The observer to attach.
*/
private attachObserver(observer: DispatcherObserver): void {
this.observers.push(observer);
}

/**
* Un-attaches an observer that will be notified about state changes on the Dispatcher.
*
* # Note
*
* This is a private function because only the `testLaunch` function
* is expected to watch the state of the Dispatcher as of yet.
*
* @param observer The observer to attach.
*/
private unattachObserver(observer: DispatcherObserver): void {
this.observers = this.observers.filter(curr => observer !== curr);
}

/**
* Notify any currently attached observer that a new command was executed.
*
* @param command The command to notify about
*/
private notifyObservers(command: Command): void {
for (const observer of this.observers) {
observer.update(command);
}
}

/**
* Gets the oldest command added to the queue.
*
Expand Down Expand Up @@ -155,16 +97,25 @@ class Dispatcher {
switch(nextCommand.command) {
case(Commands.Stop):
this.state = DispatcherState.Stopped;
this.notifyObservers(nextCommand);
return;
case(Commands.Clear):
// Unblock test resolvers before clearing the queue.
this.queue.forEach(c => {
if (c.command === Commands.TestTask) {
c.resolver();
}
});

this.queue = [];
this.state = DispatcherState.Stopped;
this.notifyObservers(nextCommand);
return;
case (Commands.TestTask):
await this.executeTask(nextCommand.task);
nextCommand.resolver();
nextCommand = this.getNextCommand();
continue;
case(Commands.Task):
await this.executeTask(nextCommand.task);
this.notifyObservers(nextCommand);
nextCommand = this.getNextCommand();
}
}
Expand Down Expand Up @@ -368,55 +319,19 @@ class Dispatcher {
*
* @returns A promise which only resolves once the task is done being executed
* or is guaranteed to not be executed ever i.e. if the queue gets cleared.
* This promise is rejected if the dispatcher takes more than 1s
* to execute the current task i.e. if the dispatcher is uninitialized.
*/
testLaunch(task: Task): Promise<void> {
const testId = generateUUIDv4();
console.info("Launching a test task.", testId);

this.resume();
const wasLaunched = this.launchInternal({
testId,
task,
command: Commands.Task
});

if (!wasLaunched) {
return Promise.reject();
}

// This promise will resolve:
//
// - If the dispatcher gets a Clear command;
// - If a task with `testId` is executed;
//
// This promise will reject if:
//
// - If we wait for this task to be execute for more than 1s.
// This is to attend to the case where the dispatcher is Stopped
// and the task takes to long to be executed.
return new Promise((resolve, reject) => {
const observer = new DispatcherObserver((command: Command) => {
const isCurrentTask = (command.command === Commands.Task && command.testId === testId);
if (isCurrentTask || command.command === Commands.Clear) {
this.unattachObserver(observer);
clearTimeout(timeout);
resolve();
}
return new Promise((resolver, reject) => {
this.resume();
const wasLaunched = this.launchInternal({
resolver,
task,
command: Commands.TestTask
});

const timeout = setTimeout(() => {
console.error(
`Test task ${testId} took to long to execute.`,
"Please check if the dispatcher was initialized and is not stopped.",
"Bailing out."
);
this.unattachObserver(observer);
if (!wasLaunched) {
reject();
}, 1000);

this.attachObserver(observer);
}
});
}
}
Expand Down
28 changes: 0 additions & 28 deletions glean/tests/core/dispatcher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -340,16 +340,6 @@ describe("Dispatcher", function() {
assert.strictEqual(dispatcher["state"], DispatcherState.Idle);
});

it("testLaunch will reject in case the dispatcher is uninitialized for too long", async function () {
dispatcher = new Dispatcher();
try {
await dispatcher.testLaunch(sampleTask);
assert.ok(false);
} catch {
assert.ok(true);
}
});

it("testLaunch will not reject in case the dispatcher is uninitialized, but quickly initializes", async function () {
dispatcher = new Dispatcher();
const testLaunchedTask = dispatcher.testLaunch(sampleTask);
Expand Down Expand Up @@ -378,22 +368,4 @@ describe("Dispatcher", function() {

sinon.assert.callOrder(stub1, stub2, stub3);
});

it("testLaunch observers are unattached after promise is resolved or rejected", async function() {
dispatcher = new Dispatcher();

const willReject = dispatcher.testLaunch(sampleTask);
assert.strictEqual(dispatcher["observers"].length, 1);
try {
await willReject;
} catch {
assert.strictEqual(dispatcher["observers"].length, 0);
}

dispatcher.flushInit();
const willNotReject = dispatcher.testLaunch(sampleTask);
assert.strictEqual(dispatcher["observers"].length, 1);
await willNotReject;
assert.strictEqual(dispatcher["observers"].length, 0);
});
});