Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
[Full changelog](https://github.com/mozilla/glean.js/compare/v0.18.1...main)

* [#534](https://github.com/mozilla/glean.js/pull/534): Expose `Uploader` base class through `@mozilla/glean/<platform>/uploader` entry point.
* [#580](https://github.com/mozilla/glean.js/pull/580): Limit size of pings database to 250 pings or 10MB.
* [#580](https://github.com/mozilla/glean.js/pull/580): BUGFIX: Pending pings at startup up are uploaded from oldest to newest.

# v0.18.1 (2021-07-22)

Expand Down
5 changes: 5 additions & 0 deletions glean/.eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
"semi": "off",
"@typescript-eslint/semi": ["error", "always"],
"@typescript-eslint/consistent-type-imports": "error",
"no-unused-vars": "off",
"@typescript-eslint/no-unused-vars": [ "error", {
"argsIgnorePattern": "^_",
"varsIgnorePattern": "^_"
}],
"no-debugger": ["error"],
"no-multi-spaces": "error",
"jsdoc/no-types": "off",
Expand Down
2 changes: 1 addition & 1 deletion glean/src/core/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const enum Commands {
// The dispatcher will clear the queue and go into the Shutdown state.
Shutdown,
// Exactly like a normal Task, but spawned for tests.
TestTask = "TestTask",
TestTask,
}

// A task the dispatcher knows how to execute.
Expand Down
10 changes: 5 additions & 5 deletions glean/src/core/glean.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,7 @@ class Glean {
Context.pingsDatabase = new PingsDatabase(Glean.platform.Storage);
Context.errorManager = new ErrorManager();

Glean.instance._pingUploader = new PingUploader(
correctConfig,
Glean.platform,
Context.pingsDatabase
);
Glean.instance._pingUploader = new PingUploader(correctConfig, Glean.platform);

Context.pingsDatabase.attachObserver(Glean.pingUploader);

Expand Down Expand Up @@ -289,6 +285,10 @@ class Glean {
}
}

// We only scan the pendings pings **after** dealing with the upload state.
// If upload is disabled, we delete all pending pings files
// and we need to do that **before** scanning the pending pings
// to ensure we don't enqueue pings before their files are deleted.
await Context.pingsDatabase.scanPendingPings();
});
}
Expand Down
138 changes: 131 additions & 7 deletions glean/src/core/pings/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,33 @@ import type { JSONObject} from "../utils.js";
import { isObject, isJSONValue, isString } from "../utils.js";
import type { StorageBuilder } from "../../platform/index.js";
import log, { LoggingLevel } from "../log.js";
import { DELETION_REQUEST_PING_NAME } from "../constants.js";
import { strToU8 } from "fflate";

const LOG_TAG = "core.Pings.Database";

/**
* Whether or not a given ping is a deletion-request ping.
*
* @param ping The ping to verify.
* @returns Whether or not the ping is a deletion-request ping.
*/
export function isDeletionRequest(ping: PingInternalRepresentation): boolean {
return ping.path.split("/")[3] === DELETION_REQUEST_PING_NAME;
}

/**
* Gets the size of a ping in bytes.
*
* @param ping The ping to get the size of.
* @returns Size of the given ping in bytes.
*/
function getPingSize(ping: PingInternalRepresentation): number {
return strToU8(JSON.stringify(ping)).length;
}

export interface PingInternalRepresentation extends JSONObject {
collectionDate: string,
path: string,
payload: JSONObject,
headers?: Record<string, string>
Expand Down Expand Up @@ -95,6 +118,7 @@ class PingsDatabase {
headers?: Record<string, string>
): Promise<void> {
const ping: PingInternalRepresentation = {
collectionDate: (new Date()).toISOString(),
path,
payload
};
Expand All @@ -119,11 +143,16 @@ class PingsDatabase {
}

/**
* Gets all pings from the pings database. Deletes any data in unexpected format that is found.
* Gets all pings from the pings database.
* Deletes any data in unexpected format that is found.
*
* # Note
*
* The return value of this function can be turned into an object using Object.fromEntries.
*
* @returns List of all currently stored pings.
* @returns List of all currently stored pings in ascending order by date.
*/
async getAllPings(): Promise<{ [id: string]: PingInternalRepresentation }> {
async getAllPings(): Promise<[ string, PingInternalRepresentation ][]> {
const allStoredPings = await this.store._getWholeStore();
const finalPings: { [ident: string]: PingInternalRepresentation } = {};
for (const identifier in allStoredPings) {
Expand All @@ -136,22 +165,117 @@ class PingsDatabase {
}
}

return finalPings;
return Object.entries(finalPings)
.sort(([_idA, { collectionDate: dateA }], [_idB, { collectionDate: dateB }]): number => {
const timeA = (new Date(dateA)).getTime();
const timeB = (new Date(dateB)).getTime();
return timeA - timeB;
});
}

/**
* Delete surplus of pings in the database by count or database size
* and return list of remaining pings. Pings are deleted from oldest to newest.
*
* The size of the database will be calculated
* (by accumulating each ping's size in bytes)
* and in case the quota is exceeded, outstanding pings get deleted.
*
* Note: `deletion-request` pings are never deleted.
*
* @param maxCount The max number of pings in the database. Default: 250.
* @param maxSize The max size of the database (in bytes). Default: 10MB.
* @returns List of all currently stored pings, in ascending order by date.
* `deletion-request` pings are always in the front of the list.
*/
private async getAllPingsWithoutSurplus(
maxCount = 250,
maxSize = 10 * 1024 * 1024, // 10MB
): Promise<[ string, PingInternalRepresentation ][]> {
const allPings = await this.getAllPings();
// Separate deletion-request from other pings.
const pings = allPings
.filter(([_, ping]) => !isDeletionRequest(ping))
// We need to calculate the size of the pending pings database
// and delete the **oldest** pings in case quota is reached.
// So, we sort them in descending order (newest -> oldest).
.reverse();
const deletionRequestPings = allPings.filter(([_, ping]) => isDeletionRequest(ping));

const total = pings.length;
// TODO (bug 1722682): Record `glean.pending_pings` metric.
if (total > maxCount) {
log(
LOG_TAG,
[
`More than ${maxCount} pending pings in the pings database,`,
`will delete ${total - maxCount} old pings.`
],
LoggingLevel.Warn
);
}

let deleting = false;
let pendingPingsCount = 0;
let pendingPingsDatabaseSize = 0;
const remainingPings: [ string, PingInternalRepresentation ][] = [];
for (const [identifier, ping] of pings) {
pendingPingsCount++;
pendingPingsDatabaseSize += getPingSize(ping);

if (!deleting && pendingPingsDatabaseSize > maxSize) {
log(
LOG_TAG,
[
`Pending pings database has reached the size quota of ${maxSize} bytes,`,
"outstanding pings will be deleted."
],
LoggingLevel.Warn
);
deleting = true;
}

// Once we reach the number of allowed pings we start deleting,
// no matter what size. We already log this before the loop.
if (pendingPingsCount > maxCount) {
deleting = true;
}

if (deleting) {
// Delete ping from database.
await this.deletePing(identifier);

// TODO (bug 1722685): Record `deleted_pings_after_quota_hit` metric.
} else {
// Add pings in reverse order so the final array is in ascending order again.
remainingPings.unshift([identifier, ping]);
}
}

// TODO(bug 1722686): Record `pending_pings_directory_size` metric.

// Return pings to original order.
return [ ...deletionRequestPings, ...remainingPings ];
}

/**
* Scans the database for pending pings and enqueues them.
*
* # Important
*
* This function will also clear off pings in case
* the database is exceeding the ping or size quota.
*/
async scanPendingPings(): Promise<void> {
// If there's no observer, then there's no point in iterating.
if (!this.observer) {
return;
}

const pings = await this.getAllPings();
for (const identifier in pings) {
const pings = await this.getAllPingsWithoutSurplus();
for (const [identifier, ping] of pings) {
// Notify the observer that a new ping has been added to the pings database.
this.observer.update(identifier, pings[identifier]);
this.observer.update(identifier, ping);
}
}

Expand Down
28 changes: 11 additions & 17 deletions glean/src/core/upload/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ import { gzipSync, strToU8 } from "fflate";

import type Platform from "../../platform/index.js";
import type { Configuration } from "../config.js";
import { DELETION_REQUEST_PING_NAME, GLEAN_VERSION } from "../constants.js";
import { GLEAN_VERSION } from "../constants.js";
import { Context } from "../context.js";
import Dispatcher from "../dispatcher.js";
import log, { LoggingLevel } from "../log.js";
import type { Observer as PingsDatabaseObserver, PingInternalRepresentation } from "../pings/database.js";
import type PingsDatabase from "../pings/database.js";
import type {
Observer as PingsDatabaseObserver,
PingInternalRepresentation
} from "../pings/database.js";
import {
isDeletionRequest
} from "../pings/database.js";
import type PlatformInfo from "../platform_info.js";
import { UploadResult } from "./uploader.js";
import type Uploader from "./uploader.js";
Expand Down Expand Up @@ -51,16 +56,6 @@ interface QueuedPing extends PingInternalRepresentation {
retries: number,
}

/**
* Whether or not a given queued ping is a deletion-request ping.
*
* @param ping The ping to verify.
* @returns Whether or not the ping is a deletion-request ping.
*/
function isDeletionRequest(ping: QueuedPing): boolean {
return ping.path.split("/")[3] === DELETION_REQUEST_PING_NAME;
}

// Error to be thrown in case the final ping body is larger than MAX_PING_BODY_SIZE.
class PingBodyOverflowError extends Error {
constructor(message?: string) {
Expand Down Expand Up @@ -95,16 +90,15 @@ class PingUploader implements PingsDatabaseObserver {
constructor(
config: Configuration,
platform: Platform,
private readonly pingsDatabase: PingsDatabase,
private readonly policy = new Policy
private readonly pingsDatabase = Context.pingsDatabase,
private readonly policy = new Policy()
) {
this.processing = [];
// Initialize the ping uploader with either the platform defaults or a custom
// provided uploader from the configuration object.
this.uploader = config.httpClient ? config.httpClient : platform.uploader;
this.platformInfo = platform.info;
this.serverEndpoint = config.serverEndpoint;
this.pingsDatabase = pingsDatabase;

// Initialize the dispatcher immediatelly.
this.dispatcher = createAndInitializeDispatcher();
Expand All @@ -129,6 +123,7 @@ class PingUploader implements PingsDatabaseObserver {

// If the ping is a deletion-request ping, we want to enqueue it as a persistent task,
// so that clearing the queue does not clear it.
//
// eslint-disable-next-line @typescript-eslint/unbound-method
const launchFn = isDeletionRequest(ping) ? this.dispatcher.launchPersistent : this.dispatcher.launch;

Expand Down Expand Up @@ -303,7 +298,6 @@ class PingUploader implements PingsDatabaseObserver {
if (status && status >= 200 && status < 300) {
log(LOG_TAG, `Ping ${identifier} succesfully sent ${status}.`, LoggingLevel.Info);
await this.pingsDatabase.deletePing(identifier);
this.processing;
return false;
}

Expand Down
4 changes: 2 additions & 2 deletions glean/tests/unit/core/glean.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,8 @@ describe("Glean", function() {
await Context.dispatcher.testBlockOnQueue();
const storedPings = await Context.pingsDatabase.getAllPings();
const counterValues = [];
for (const ident in storedPings) {
const metrics = storedPings[ident].payload.metrics;
for (const [_, ping] of storedPings) {
const metrics = ping.payload.metrics;
const counterValue = isObject(metrics) && isObject(metrics.counter) ? metrics.counter["aCategory.aCounterMetric"] : undefined;
// Get the value of `aCounterMetric` inside each submitted ping.
counterValues.push(Number(counterValue));
Expand Down
Loading