Skip to content

Commit d8f8e24

Browse files
committed
Remove the observer mechanism to wait for test tasks
This PR removes the mechanism based on timeouts and observers to wait on test tasks in the dispatcher. It instead defines a new task type, TestTask, and passes a resolver function when its created.
1 parent 1d9db0b commit d8f8e24

File tree

2 files changed

+29
-135
lines changed

2 files changed

+29
-135
lines changed

glean/src/core/dispatcher.ts

Lines changed: 29 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -29,36 +29,26 @@ const enum Commands {
2929
Stop,
3030
// The dispatcher should stop executing the queued tasks and clear the queue.
3131
Clear,
32+
// Exactly like a normal Task, but spawned for tests.
33+
TestTask,
3234
}
3335

3436
// A task the dispatcher knows how to execute.
3537
type Task = () => Promise<void>;
3638

3739
// An executable command.
3840
type Command = {
39-
testId?: string,
4041
task: Task,
4142
command: Commands.Task,
4243
} | {
43-
command: Exclude<Commands, Commands.Task>,
44+
testId?: string,
45+
resolver: (value: void | PromiseLike<void>) => void,
46+
task: Task,
47+
command: Commands.TestTask,
48+
} | {
49+
command: Exclude<Commands, Commands.Task | Commands.TestTask>,
4450
};
4551

46-
/**
47-
* An observer of the commands being executed by the dispatcher.
48-
*/
49-
class DispatcherObserver {
50-
constructor(private cb: (command: Command) => void) {}
51-
52-
/**
53-
* Updates an observer when the dispatcher finishes executing a command.
54-
*
55-
* @param command The command that was just executed by the dispatcher.
56-
*/
57-
update(command: Command): void {
58-
this.cb(command);
59-
}
60-
}
61-
6252
/**
6353
* A task dispatcher for async tasks.
6454
*
@@ -74,56 +64,11 @@ class Dispatcher {
7464
// This is `undefined` in case there is no ongoing execution of tasks.
7565
private currentJob?: Promise<void>;
7666

77-
// Observers that are notified about every executed command in this dispacther.
78-
//
79-
// This is private, because we only expect `testLaunch` to attach observers as of yet.
80-
private observers: DispatcherObserver[];
81-
8267
constructor(readonly maxPreInitQueueSize = 100) {
83-
this.observers = [];
8468
this.queue = [];
8569
this.state = DispatcherState.Uninitialized;
8670
}
8771

88-
/**
89-
* Attaches an observer that will be notified about state changes on the Dispatcher.
90-
*
91-
* # Note
92-
*
93-
* This is a private function because only the `testLaunch` function
94-
* is expected to watch the state of the Dispatcher as of yet.
95-
*
96-
* @param observer The observer to attach.
97-
*/
98-
private attachObserver(observer: DispatcherObserver): void {
99-
this.observers.push(observer);
100-
}
101-
102-
/**
103-
* Un-attaches an observer that will be notified about state changes on the Dispatcher.
104-
*
105-
* # Note
106-
*
107-
* This is a private function because only the `testLaunch` function
108-
* is expected to watch the state of the Dispatcher as of yet.
109-
*
110-
* @param observer The observer to attach.
111-
*/
112-
private unattachObserver(observer: DispatcherObserver): void {
113-
this.observers = this.observers.filter(curr => observer !== curr);
114-
}
115-
116-
/**
117-
* Notify any currently attached observer that a new command was executed.
118-
*
119-
* @param command The command to notify about
120-
*/
121-
private notifyObservers(command: Command): void {
122-
for (const observer of this.observers) {
123-
observer.update(command);
124-
}
125-
}
126-
12772
/**
12873
* Gets the oldest command added to the queue.
12974
*
@@ -155,16 +100,25 @@ class Dispatcher {
155100
switch(nextCommand.command) {
156101
case(Commands.Stop):
157102
this.state = DispatcherState.Stopped;
158-
this.notifyObservers(nextCommand);
159103
return;
160104
case(Commands.Clear):
105+
// Unblock test resolvers before clearing the queue.
106+
this.queue.forEach(c => {
107+
if (c.command === Commands.TestTask) {
108+
c.resolver();
109+
}
110+
});
111+
161112
this.queue = [];
162113
this.state = DispatcherState.Stopped;
163-
this.notifyObservers(nextCommand);
164114
return;
115+
case (Commands.TestTask):
116+
await this.executeTask(nextCommand.task);
117+
nextCommand.resolver();
118+
nextCommand = this.getNextCommand();
119+
continue;
165120
case(Commands.Task):
166121
await this.executeTask(nextCommand.task);
167-
this.notifyObservers(nextCommand);
168122
nextCommand = this.getNextCommand();
169123
}
170124
}
@@ -368,55 +322,23 @@ class Dispatcher {
368322
*
369323
* @returns A promise which only resolves once the task is done being executed
370324
* or is guaranteed to not be executed ever i.e. if the queue gets cleared.
371-
* This promise is rejected if the dispatcher takes more than 1s
372-
* to execute the current task i.e. if the dispatcher is uninitialized.
373325
*/
374326
testLaunch(task: Task): Promise<void> {
375327
const testId = generateUUIDv4();
376328
console.info("Launching a test task.", testId);
377329

378-
this.resume();
379-
const wasLaunched = this.launchInternal({
380-
testId,
381-
task,
382-
command: Commands.Task
383-
});
384-
385-
if (!wasLaunched) {
386-
return Promise.reject();
387-
}
388-
389-
// This promise will resolve:
390-
//
391-
// - If the dispatcher gets a Clear command;
392-
// - If a task with `testId` is executed;
393-
//
394-
// This promise will reject if:
395-
//
396-
// - If we wait for this task to be execute for more than 1s.
397-
// This is to attend to the case where the dispatcher is Stopped
398-
// and the task takes to long to be executed.
399-
return new Promise((resolve, reject) => {
400-
const observer = new DispatcherObserver((command: Command) => {
401-
const isCurrentTask = (command.command === Commands.Task && command.testId === testId);
402-
if (isCurrentTask || command.command === Commands.Clear) {
403-
this.unattachObserver(observer);
404-
clearTimeout(timeout);
405-
resolve();
406-
}
330+
return new Promise((resolver, reject) => {
331+
this.resume();
332+
const wasLaunched = this.launchInternal({
333+
testId,
334+
resolver,
335+
task,
336+
command: Commands.TestTask
407337
});
408338

409-
const timeout = setTimeout(() => {
410-
console.error(
411-
`Test task ${testId} took to long to execute.`,
412-
"Please check if the dispatcher was initialized and is not stopped.",
413-
"Bailing out."
414-
);
415-
this.unattachObserver(observer);
339+
if (!wasLaunched) {
416340
reject();
417-
}, 1000);
418-
419-
this.attachObserver(observer);
341+
}
420342
});
421343
}
422344
}

glean/tests/core/dispatcher.spec.ts

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -340,16 +340,6 @@ describe("Dispatcher", function() {
340340
assert.strictEqual(dispatcher["state"], DispatcherState.Idle);
341341
});
342342

343-
it("testLaunch will reject in case the dispatcher is uninitialized for too long", async function () {
344-
dispatcher = new Dispatcher();
345-
try {
346-
await dispatcher.testLaunch(sampleTask);
347-
assert.ok(false);
348-
} catch {
349-
assert.ok(true);
350-
}
351-
});
352-
353343
it("testLaunch will not reject in case the dispatcher is uninitialized, but quickly initializes", async function () {
354344
dispatcher = new Dispatcher();
355345
const testLaunchedTask = dispatcher.testLaunch(sampleTask);
@@ -378,22 +368,4 @@ describe("Dispatcher", function() {
378368

379369
sinon.assert.callOrder(stub1, stub2, stub3);
380370
});
381-
382-
it("testLaunch observers are unattached after promise is resolved or rejected", async function() {
383-
dispatcher = new Dispatcher();
384-
385-
const willReject = dispatcher.testLaunch(sampleTask);
386-
assert.strictEqual(dispatcher["observers"].length, 1);
387-
try {
388-
await willReject;
389-
} catch {
390-
assert.strictEqual(dispatcher["observers"].length, 0);
391-
}
392-
393-
dispatcher.flushInit();
394-
const willNotReject = dispatcher.testLaunch(sampleTask);
395-
assert.strictEqual(dispatcher["observers"].length, 1);
396-
await willNotReject;
397-
assert.strictEqual(dispatcher["observers"].length, 0);
398-
});
399371
});

0 commit comments

Comments
 (0)