Skip to content

Commit 51ffec2

Browse files
author
Beatriz Rizental
authored
Merge pull request #552 from brizental/shutdown-bug
Bug 1707891 - Do not clear deletion-request pings when clearing pings queue
2 parents 9f052bc + 8ab6ef4 commit 51ffec2

File tree

8 files changed

+257
-66
lines changed

8 files changed

+257
-66
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
[Full changelog](https://github.com/mozilla/glean.js/compare/v0.17.0...v0.18.0)
88

9-
* [#542](https://github.com/mozilla/glean.js/pull/542): Implement `shutdown` API.
9+
* [#542](https://github.com/mozilla/glean.js/pull/542), [#552](https://github.com/mozilla/glean.js/pull/552): Implement `shutdown` API.
1010

1111
# v0.17.0 (2021-07-16)
1212

glean/src/core/dispatcher.ts

Lines changed: 55 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,27 @@ export const enum DispatcherState {
2727

2828
// The possible commands to be processed by the dispatcher.
2929
const enum Commands {
30-
// The dispatcher must enqueue a new task.
30+
// The dispatcher will enqueue a new task.
3131
//
3232
// This command is always followed by a concrete task for the dispatcher to execute.
3333
Task,
34+
// Same as the `Task` command,
35+
// but the task enqueued by this command is not cleared by the `Clear` command.
36+
//
37+
// # Note
38+
//
39+
// `Shutdown` will still clear these tasks.
40+
//
41+
// Unless unavoidable, prefer using the normal `Task`.
42+
PersistentTask,
3443
// The dispatcher should stop executing the queued tasks.
3544
Stop,
3645
// The dispatcher should stop executing the queued tasks and clear the queue.
3746
Clear,
3847
// The dispatcher will clear the queue and go into the Shutdown state.
3948
Shutdown,
4049
// Exactly like a normal Task, but spawned for tests.
41-
TestTask,
50+
TestTask = "TestTask",
4251
}
4352

4453
// A task the dispatcher knows how to execute.
@@ -47,13 +56,13 @@ type Task = () => Promise<void>;
4756
// An executable command.
4857
type Command = {
4958
task: Task,
50-
command: Commands.Task,
59+
command: Commands.Task | Commands.PersistentTask,
5160
} | {
5261
resolver: (value: void | PromiseLike<void>) => void,
5362
task: Task,
5463
command: Commands.TestTask,
5564
} | {
56-
command: Exclude<Commands, Commands.Task | Commands.TestTask>,
65+
command: Exclude<Commands, Commands.Task | Commands.TestTask | Commands.PersistentTask>,
5766
};
5867

5968
/**
@@ -98,6 +107,19 @@ class Dispatcher {
98107
}
99108
}
100109

110+
/**
111+
* Resolve all test resolvers.
112+
*
113+
* Used before clearing the queue in on a `Shutdown` or `Clear` command.
114+
*/
115+
private unblockTestResolvers(): void {
116+
this.queue.forEach(c => {
117+
if (c.command === Commands.TestTask) {
118+
c.resolver();
119+
}
120+
});
121+
}
122+
101123
/**
102124
* Executes all the commands in the queue, from oldest to newest.
103125
*/
@@ -109,27 +131,23 @@ class Dispatcher {
109131
this.state = DispatcherState.Stopped;
110132
return;
111133
case(Commands.Shutdown):
112-
case(Commands.Clear):
113-
// Unblock test resolvers before clearing the queue.
114-
this.queue.forEach(c => {
115-
if (c.command === Commands.TestTask) {
116-
c.resolver();
117-
}
118-
});
119-
134+
this.unblockTestResolvers();
120135
this.queue = [];
121-
if (nextCommand.command === Commands.Clear) {
122-
this.state = DispatcherState.Stopped;
123-
} else {
124-
this.state = DispatcherState.Shutdown;
125-
}
126-
136+
this.state = DispatcherState.Shutdown;
127137
return;
138+
case(Commands.Clear):
139+
this.unblockTestResolvers();
140+
this.queue = this.queue.filter(c =>
141+
[Commands.PersistentTask, Commands.Shutdown].includes(c.command)
142+
);
143+
nextCommand = this.getNextCommand();
144+
continue;
128145
case (Commands.TestTask):
129146
await this.executeTask(nextCommand.task);
130147
nextCommand.resolver();
131148
nextCommand = this.getNextCommand();
132149
continue;
150+
case(Commands.PersistentTask):
133151
case(Commands.Task):
134152
await this.executeTask(nextCommand.task);
135153
nextCommand = this.getNextCommand();
@@ -237,6 +255,19 @@ class Dispatcher {
237255
});
238256
}
239257

258+
/**
259+
* Works exactly like {@link launch},
260+
* but enqueues a persistent task which is not cleared by the Clear command.
261+
*
262+
* @param task The task to enqueue.
263+
*/
264+
launchPersistent(task: Task): void {
265+
this.launchInternal({
266+
task,
267+
command: Commands.PersistentTask
268+
});
269+
}
270+
240271
/**
241272
* Flushes the tasks enqueued while the dispatcher was uninitialized.
242273
*
@@ -268,9 +299,8 @@ class Dispatcher {
268299
/**
269300
* Enqueues a Clear command at the front of the queue and triggers execution.
270301
*
271-
* The Clear command will remove all other tasks from the queue
272-
* and put the dispatcher in a Stopped state after the command is executed.
273-
* In order to re-start the dispatcher, call the `resume` method.
302+
* The Clear command will remove all other tasks
303+
* except for persistent tasks or shutdown tasks.
274304
*
275305
* # Note
276306
*
@@ -311,7 +341,7 @@ class Dispatcher {
311341
* Shutsdown the dispatcher.
312342
*
313343
* 1. Executes all tasks launched prior to this one.
314-
* 2. Clears the queue of any tasks launched after this one.
344+
* 2. Clears the queue of any tasks launched after this one (even persistent tasks).
315345
* 3. Puts the dispatcher in the `Shutdown` state.
316346
*
317347
* # Note
@@ -356,9 +386,10 @@ class Dispatcher {
356386
return;
357387
}
358388

389+
// Clear queue.
359390
this.clear();
360-
// We need to wait for the clear command to be executed.
361-
await this.testBlockOnQueue();
391+
// Wait for the clear command and any persistent tasks that may still be in the queue.
392+
await this.shutdown();
362393
this.state = DispatcherState.Uninitialized;
363394
}
364395

glean/src/core/glean.ts

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,10 @@ class Glean {
114114
* This function is only supposed to be called when telemetry is disabled.
115115
*/
116116
private static async clearMetrics(): Promise<void> {
117-
// Stop ongoing upload jobs and clear pending pings queue.
118-
Glean.pingUploader.clearPendingPingsQueue();
117+
// Clear enqueued upload jobs and clear pending pings queue.
118+
//
119+
// The only job that will still be sent is the deletion-request ping.
120+
await Glean.pingUploader.clearPendingPingsQueue();
119121

120122
// There is only one metric that we want to survive after clearing all
121123
// metrics: first_run_date. Here, we store its value
@@ -216,7 +218,11 @@ class Glean {
216218
Context.pingsDatabase = new PingsDatabase(Glean.platform.Storage);
217219
Context.errorManager = new ErrorManager();
218220

219-
Glean.instance._pingUploader = new PingUploader(correctConfig, Glean.platform, Context.pingsDatabase);
221+
Glean.instance._pingUploader = new PingUploader(
222+
correctConfig,
223+
Glean.platform,
224+
Context.pingsDatabase
225+
);
220226

221227
Context.pingsDatabase.attachObserver(Glean.pingUploader);
222228

@@ -504,17 +510,14 @@ class Glean {
504510
// Get back to an uninitialized state.
505511
await Context.testUninitialize();
506512

513+
// Shutdown the current uploader.
514+
//
515+
// This is fine because a new uploader is created on initialize.
516+
// It will also guarantee all pings to be sent before uninitializing.
517+
await Glean.pingUploader?.shutdown();
518+
507519
// Deregister all plugins
508520
testResetEvents();
509-
510-
// Await ongoing jobs and clear pending pings queue.
511-
if (Glean.pingUploader) {
512-
await Glean.pingUploader.testBlockOnPingsQueue();
513-
514-
// The first time tests run, before Glean is initialized, we are
515-
// not guaranteed to have an uploader. Account for this.
516-
Glean.pingUploader.clearPendingPingsQueue();
517-
}
518521
}
519522

520523
/**

glean/src/core/upload/index.ts

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { gzipSync, strToU8 } from "fflate";
66

77
import type Platform from "../../platform/index.js";
88
import type { Configuration } from "../config.js";
9-
import { GLEAN_VERSION } from "../constants.js";
9+
import { DELETION_REQUEST_PING_NAME, GLEAN_VERSION } from "../constants.js";
1010
import { Context } from "../context.js";
1111
import Dispatcher from "../dispatcher.js";
1212
import log, { LoggingLevel } from "../log.js";
@@ -51,6 +51,16 @@ interface QueuedPing extends PingInternalRepresentation {
5151
retries: number,
5252
}
5353

54+
/**
55+
* Whether or not a given queued ping is a deletion-request ping.
56+
*
57+
* @param ping The ping to verify.
58+
* @returns Whether or not the ping is a deletion-request ping.
59+
*/
60+
function isDeletionRequest(ping: QueuedPing): boolean {
61+
return ping.path.split("/")[3] === DELETION_REQUEST_PING_NAME;
62+
}
63+
5464
// Error to be thrown in case the final ping body is larger than MAX_PING_BODY_SIZE.
5565
class PingBodyOverflowError extends Error {
5666
constructor(message?: string) {
@@ -116,8 +126,14 @@ class PingUploader implements PingsDatabaseObserver {
116126

117127
// Add the ping to the list of pings being processsed.
118128
this.processing.push(ping);
129+
130+
// If the ping is a deletion-request ping, we want to enqueue it as a persistent task,
131+
// so that clearing the queue does not clear it.
132+
// eslint-disable-next-line @typescript-eslint/unbound-method
133+
const launchFn = isDeletionRequest(ping) ? this.dispatcher.launchPersistent : this.dispatcher.launch;
134+
119135
// Dispatch the uploading task.
120-
this.dispatcher.launch(async (): Promise<void> => {
136+
launchFn.bind(this.dispatcher)(async (): Promise<void> => {
121137
const status = await this.attemptPingUpload(ping);
122138
const shouldRetry = await this.processPingUploadResponse(ping.identifier, status);
123139

@@ -221,7 +237,14 @@ class PingUploader implements PingsDatabaseObserver {
221237
finalPing.headers
222238
);
223239
} catch(e) {
224-
log(LOG_TAG, ["Error trying to build ping request:", e], LoggingLevel.Warn);
240+
log(
241+
LOG_TAG,
242+
[
243+
"Error trying to build ping request:",
244+
(e as Error).message
245+
],
246+
LoggingLevel.Warn
247+
);
225248
// An unrecoverable failure will make sure the offending ping is removed from the queue and
226249
// deleted from the database, which is what we want here.
227250
return {
@@ -321,9 +344,8 @@ class PingUploader implements PingsDatabaseObserver {
321344
}
322345

323346
/**
324-
* Shuts down internal dispatcher, after executing all previously enqueued ping requests.
325-
*
326-
* This is irreversible.
347+
* Shuts down internal dispatcher,
348+
* after executing all previously enqueued ping requests.
327349
*
328350
* @returns A promise that resolves once shutdown is complete.
329351
*/
@@ -333,14 +355,25 @@ class PingUploader implements PingsDatabaseObserver {
333355

334356
/**
335357
* Clears the pending pings queue.
358+
*
359+
* # Important
360+
*
361+
* This will _drop_ pending pings still enqueued.
362+
* Only the `deletion-request` ping will still be processed.
336363
*/
337-
clearPendingPingsQueue(): void {
364+
async clearPendingPingsQueue(): Promise<void> {
365+
// Clears all tasks.
338366
this.dispatcher.clear();
339-
this.processing = [];
340-
// Create and initialize a new dispatcher so we don't need to wait
341-
// on the previous one finishing execution.
367+
// Wait for remaining jobs and shutdown.
342368
//
343-
// It will only finish execution of the current task, all other queued tasks are dropped.
369+
// The only jobs that may be left after clearing
370+
// are `deletion-request` uploads.
371+
await this.dispatcher.shutdown();
372+
// At this poit we are sure the dispatcher queue is also empty,
373+
// so we can empty the processing queue.
374+
this.processing = [];
375+
376+
// Create and initialize a new dispatcher, since the `shutdown` state is irreversible.
344377
this.dispatcher = createAndInitializeDispatcher();
345378
}
346379

0 commit comments

Comments
 (0)