Skip to content

Commit 0d5ac63

Browse files
author
Beatriz Rizental
authored
Bug 1687491 - Implement a dispatcher and add an instance to the Glean singleton (#31)
1 parent 0abe823 commit 0d5ac63

File tree

3 files changed

+409
-6
lines changed

3 files changed

+409
-6
lines changed

src/dispatcher.ts

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/* This Source Code Form is subject to the terms of the Mozilla Public
2+
* License, v. 2.0. If a copy of the MPL was not distributed with this
3+
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4+
5+
// The possible states a dispatcher instance can be in.
6+
const enum DispatcherState {
7+
// The dispatcher has not been initialized yet.
8+
//
9+
// When the dispatcher is in this state it will not enqueue
10+
// more than `maxPreInitQueueSize` tasks.
11+
Uninitialized,
12+
// There are no commands queued and the dispatcher is idle.
13+
Idle,
14+
// The dispatcher is currently processing queued tasks.
15+
Processing,
16+
// The dispatcher is stopped, tasks queued will not be immediatelly processed.
17+
Stopped,
18+
}
19+
20+
// The possible commands to be processed by the dispatcher.
21+
const enum Commands {
22+
// The dispatcher must enqueue a new task.
23+
//
24+
// This command is always followed by a concrete task for the dispatcher to execute.
25+
Task,
26+
// The dispatcher should stop executing the queued tasks.
27+
Stop,
28+
}
29+
30+
// A task the dispatcher knows how to execute.
31+
type Task = () => Promise<void>;
32+
33+
// An executable command.
34+
type Command = {
35+
task: Task,
36+
command: Commands.Task
37+
} | {
38+
command: Commands.Stop
39+
};
40+
41+
/**
42+
* A task dispatcher for async tasks.
43+
*
44+
* Will make sure tasks are execute in order.
45+
*/
46+
class Dispatcher {
47+
// A FIFO queue of tasks to execute.
48+
private queue: Command[];
49+
// The current state of this dispatcher.
50+
private state: DispatcherState;
51+
// A promise contianing the current execution promise.
52+
//
53+
// This is `undefined` in case there is no ongoing execution of tasks.
54+
private currentJob?: Promise<void>;
55+
56+
constructor(readonly maxPreInitQueueSize = 100) {
57+
this.queue = [];
58+
this.state = DispatcherState.Uninitialized;
59+
}
60+
61+
/**
62+
* Gets the oldest command added to the queue.
63+
*
64+
* @returns The oldest command or `undefined` if the queue is empty.
65+
*/
66+
private getNextCommand(): Command | undefined {
67+
return this.queue.shift();
68+
}
69+
70+
/**
71+
* Executes all the commands in the queue, from oldest to newest.
72+
*
73+
* Stops on case a `Stop` command is encountered.
74+
*/
75+
private async execute(): Promise<void> {
76+
let nextCommand = this.getNextCommand();
77+
while(nextCommand) {
78+
if (nextCommand.command === Commands.Stop) {
79+
break;
80+
}
81+
82+
try {
83+
await nextCommand.task();
84+
} catch(e) {
85+
console.error("Error executing task:", e);
86+
}
87+
88+
nextCommand = this.getNextCommand();
89+
}
90+
}
91+
92+
/**
93+
* Triggers the execution of enqueued command
94+
* in case the dispatcher is currently Idle.
95+
*/
96+
private async triggerExecution(): Promise<void> {
97+
if (this.state === DispatcherState.Idle && this.queue.length > 0) {
98+
this.state = DispatcherState.Processing;
99+
this.currentJob = this.execute();
100+
await this.currentJob;
101+
this.currentJob = undefined;
102+
this.state = DispatcherState.Idle;
103+
}
104+
}
105+
106+
/**
107+
* Launches a task on this dispatchers queue.
108+
* Kickstarts the execution of the queue in case it is currently Idle.
109+
*
110+
* # Note
111+
*
112+
* Will not enqueue in case the dispatcher has not been initialized yet and the queues length exceeds `maxPreInitQueueSize`.
113+
*
114+
* @param task The task to enqueue.
115+
*/
116+
launch(task: Task): void {
117+
if (this.state === DispatcherState.Uninitialized) {
118+
if (this.queue.length >= this.maxPreInitQueueSize) {
119+
console.warn("Unable to enqueue task, pre init queue is full.");
120+
return;
121+
}
122+
}
123+
124+
this.queue.push({
125+
task,
126+
command: Commands.Task
127+
});
128+
129+
// Even though triggerExecution is async we don't want to block on it.
130+
// The point of the dispatcher is to execute the async functions
131+
// in a deterministic order without having to wait for them.
132+
this.triggerExecution();
133+
}
134+
135+
/**
136+
* Flushes the tasks enqueued while the dispatcher was uninitialized.
137+
*
138+
* This is a no-op in case the dispatcher is not in an uninitialized state.
139+
*/
140+
flushInit(): void {
141+
if (this.state !== DispatcherState.Uninitialized) {
142+
console.warn("Attempted to initialize the Dispatcher, but it is already initialized. Ignoring.");
143+
return;
144+
}
145+
146+
this.state = DispatcherState.Idle;
147+
148+
// Even though triggerExecution is async we don't want to block on it.
149+
// The point of the dispatcher is to execute the async functions
150+
// in a deterministic order without having to wait for them.
151+
this.triggerExecution();
152+
}
153+
154+
/**
155+
* Stops the current job and clears the tasks queue.
156+
*
157+
* @returns A promise which resolves once the current job
158+
* has been succesfully stopped and the queue was emptied.
159+
*/
160+
async clear(): Promise<void> {
161+
await this.stop();
162+
this.queue = [];
163+
this.state = DispatcherState.Idle;
164+
}
165+
166+
/**
167+
* Sets the state of this dispatcher to "Stopped" and stops any ongoing task processing.
168+
*
169+
* @returns A promise which resolves once the current job
170+
* has been succesfully stopped.
171+
*/
172+
async stop(): Promise<void> {
173+
if (this.state !== DispatcherState.Stopped) {
174+
this.state = DispatcherState.Stopped;
175+
this.queue.unshift({ command: Commands.Stop });
176+
await this.testBlockOnQueue();
177+
}
178+
}
179+
180+
/**
181+
* **Test-Only API**
182+
*
183+
* Returns a promise that resolves once the current task execution in finished.
184+
*
185+
* @returns The promise.
186+
*/
187+
async testBlockOnQueue(): Promise<void> {
188+
return this.currentJob && await this.currentJob;
189+
}
190+
}
191+
192+
export default Dispatcher;

src/glean.ts

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { isUndefined, sanitizeApplicationId, validateURL } from "utils";
1111
import { CoreMetrics } from "internal_metrics";
1212
import { Lifetime } from "metrics";
1313
import { DatetimeMetric } from "metrics/types/datetime";
14+
import Dispatcher from "dispatcher";
1415

1516
class Glean {
1617
// The Glean singleton.
@@ -27,6 +28,8 @@ class Glean {
2728
private _coreMetrics: CoreMetrics;
2829
// The ping uploader.
2930
private _pingUploader: PingUploader
31+
// A task dispatcher to help execute in order asynchronous external API calls.
32+
private _dispatcher: Dispatcher;
3033

3134
// Properties that will only be set on `initialize`.
3235

@@ -44,6 +47,7 @@ class Glean {
4447
Use Glean.instance instead to access the Glean singleton.`);
4548
}
4649

50+
this._dispatcher = new Dispatcher();
4751
this._pingUploader = new PingUploader();
4852
this._coreMetrics = new CoreMetrics();
4953
this._initialized = false;
@@ -109,7 +113,12 @@ class Glean {
109113
* This function is only supposed to be called when telemetry is disabled.
110114
*/
111115
private static async clearMetrics(): Promise<void> {
112-
// Stop ongoing uploading jobs and clear pending pings queue.
116+
// Stops any task execution on the dispatcher.
117+
//
118+
// While stopped, the dispatcher will enqueue but won't execute any tasks it receives.
119+
await Glean.dispatcher.stop();
120+
121+
// Stop ongoing upload jobs and clear pending pings queue.
113122
await Glean.pingUploader.clearPendingPingsQueue();
114123

115124
// There is only one metric that we want to survive after clearing all
@@ -131,11 +140,8 @@ class Glean {
131140
// We need to briefly set upload_enabled to true here so that `set`
132141
// is not a no-op.
133142
//
134-
// Note that we can't provide the same guarantees as glean-core here.
135-
// If by any change another actor attempts to record a metric while
136-
// we are setting the known client id and first run date, they will be allowed to.
137-
//
138-
// TODO: Bug 1687491 might resolve this issue.
143+
// This is safe.
144+
// Since the dispatcher is stopped, no external API calls will be executed.
139145
Glean.uploadEnabled = true;
140146

141147
// Store a "dummy" KNOWN_CLIENT_ID in the client_id metric. This will
@@ -147,6 +153,9 @@ class Glean {
147153
await Glean.coreMetrics.firstRunDate.set(existingFirstRunDate.date);
148154

149155
Glean.uploadEnabled = false;
156+
157+
// Clear the dispatcher queue.
158+
await Glean.dispatcher.clear();
150159
}
151160

152161
/**
@@ -198,6 +207,9 @@ class Glean {
198207
await Glean.pingUploader.scanPendingPings();
199208
// Even though this returns a promise, there is no need to block on it returning.
200209
Glean.pingUploader.triggerUpload();
210+
211+
// Signal to the dispatcher that init is complete.
212+
Glean.dispatcher.flushInit();
201213
}
202214

203215
/**
@@ -227,14 +239,33 @@ class Glean {
227239
return Glean.instance._initialized;
228240
}
229241

242+
/**
243+
* Gets this Glean's instance application id.
244+
*
245+
* @returns The application id or `undefined` in case Glean has not been initialized yet.
246+
*/
230247
static get applicationId(): string | undefined {
231248
return Glean.instance._applicationId;
232249
}
233250

251+
/**
252+
* Gets this Glean's instance server endpoint.
253+
*
254+
* @returns The server endpoint or `undefined` in case Glean has not been initialized yet.
255+
*/
234256
static get serverEndpoint(): string | undefined {
235257
return Glean.instance._serverEndpoint;
236258
}
237259

260+
/**
261+
* Gets this Gleans's instance dispatcher.
262+
*
263+
* @returns The dispatcher instance.
264+
*/
265+
static get dispatcher(): Dispatcher {
266+
return Glean.instance._dispatcher;
267+
}
268+
238269
/**
239270
* Determines whether upload is enabled.
240271
*
@@ -304,6 +335,9 @@ class Glean {
304335
// Get back to an uninitialized state.
305336
Glean.instance._initialized = false;
306337

338+
// Clear the dispatcher queue.
339+
await Glean.dispatcher.clear();
340+
307341
// Stop ongoing jobs and clear pending pings queue.
308342
await Glean.pingUploader.clearPendingPingsQueue();
309343

0 commit comments

Comments
 (0)