Skip to content

Replace adhoc promise queueing #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 12 additions & 25 deletions lib/accelerometer-service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { AccelerometerData, AccelerometerDataEvent } from "./accelerometer.js";
import { GattOperation, Service } from "./bluetooth-device-wrapper.js";
import { Service } from "./bluetooth-device-wrapper.js";
import { profile } from "./bluetooth-profile.js";
import { createGattOperationPromise } from "./bluetooth-utils.js";
import { BackgroundErrorEvent, DeviceError } from "./device.js";
import {
CharacteristicDataTarget,
Expand All @@ -14,7 +13,7 @@ export class AccelerometerService implements Service {
private accelerometerDataCharacteristic: BluetoothRemoteGATTCharacteristic,
private accelerometerPeriodCharacteristic: BluetoothRemoteGATTCharacteristic,
private dispatchTypedEvent: TypedServiceEventDispatcher,
private queueGattOperation: (gattOperation: GattOperation) => void,
private queueGattOperation: <R>(action: () => Promise<R>) => Promise<R>,
) {
this.accelerometerDataCharacteristic.addEventListener(
"characteristicvaluechanged",
Expand All @@ -32,7 +31,7 @@ export class AccelerometerService implements Service {
static async createService(
gattServer: BluetoothRemoteGATTServer,
dispatcher: TypedServiceEventDispatcher,
queueGattOperation: (gattOperation: GattOperation) => void,
queueGattOperation: <R>(action: () => Promise<R>) => Promise<R>,
listenerInit: boolean,
): Promise<AccelerometerService | undefined> {
let accelerometerService: BluetoothRemoteGATTService;
Expand Down Expand Up @@ -76,22 +75,16 @@ export class AccelerometerService implements Service {
}

async getData(): Promise<AccelerometerData> {
const { callback, gattOperationPromise } = createGattOperationPromise();
this.queueGattOperation({
callback,
operation: () => this.accelerometerDataCharacteristic.readValue(),
});
const dataView = (await gattOperationPromise) as DataView;
const dataView = await this.queueGattOperation(() =>
this.accelerometerDataCharacteristic.readValue(),
);
return this.dataViewToData(dataView);
}

async getPeriod(): Promise<number> {
const { callback, gattOperationPromise } = createGattOperationPromise();
this.queueGattOperation({
callback,
operation: () => this.accelerometerPeriodCharacteristic.readValue(),
});
const dataView = (await gattOperationPromise) as DataView;
const dataView = await this.queueGattOperation(() =>
this.accelerometerPeriodCharacteristic.readValue(),
);
return dataView.getUint16(0, true);
}

Expand All @@ -104,17 +97,11 @@ export class AccelerometerService implements Service {
// Values passed are rounded up to the allowed values on device.
// Documentation for allowed values looks wrong.
// https://lancaster-university.github.io/microbit-docs/resources/bluetooth/bluetooth_profile.html
const { callback, gattOperationPromise } = createGattOperationPromise();
const dataView = new DataView(new ArrayBuffer(2));
dataView.setUint16(0, value, true);
this.queueGattOperation({
callback,
operation: () =>
this.accelerometerPeriodCharacteristic.writeValueWithoutResponse(
dataView,
),
});
await gattOperationPromise;
return this.queueGattOperation(() =>
this.accelerometerDataCharacteristic.writeValueWithoutResponse(dataView),
);
}

async startNotifications(type: TypedServiceEvent): Promise<void> {
Expand Down
92 changes: 22 additions & 70 deletions lib/bluetooth-device-wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,13 @@ import { profile } from "./bluetooth-profile.js";
import { ButtonService } from "./button-service.js";
import { BoardVersion, DeviceError } from "./device.js";
import { Logging, NullLogging } from "./logging.js";
import { PromiseQueue } from "./promise-queue.js";
import {
ServiceConnectionEventMap,
TypedServiceEvent,
TypedServiceEventDispatcher,
} from "./service-events.js";

export interface GattOperationCallback {
resolve: (result: DataView | void) => void;
reject: (error: DeviceError) => void;
}

export interface GattOperation {
operation: () => Promise<DataView | void>;
callback: GattOperationCallback;
}

interface GattOperations {
busy: boolean;
queue: GattOperation[];
}

const deviceIdToWrapper: Map<string, BluetoothDeviceWrapper> = new Map();

const connectTimeoutDuration: number = 10000;
Expand Down Expand Up @@ -62,7 +48,7 @@ class ServiceInfo<T extends Service> {
private serviceFactory: (
gattServer: BluetoothRemoteGATTServer,
dispatcher: TypedServiceEventDispatcher,
queueGattOperation: (gattOperation: GattOperation) => void,
queueGattOperation: <R>(action: () => Promise<R>) => Promise<R>,
listenerInit: boolean,
) => Promise<T | undefined>,
public events: TypedServiceEvent[],
Expand All @@ -75,7 +61,7 @@ class ServiceInfo<T extends Service> {
async createIfNeeded(
gattServer: BluetoothRemoteGATTServer,
dispatcher: TypedServiceEventDispatcher,
queueGattOperation: (gattOperation: GattOperation) => void,
queueGattOperation: <R>(action: () => Promise<R>) => Promise<R>,
listenerInit: boolean,
): Promise<T | undefined> {
this.service =
Expand Down Expand Up @@ -125,11 +111,22 @@ export class BluetoothDeviceWrapper {

boardVersion: BoardVersion | undefined;

private gattOperations: GattOperations = {
busy: false,
queue: [],
private disconnectedRejectionErrorFactory = () => {
return new DeviceError({
code: "device-disconnected",
message: "Error processing gatt operations queue - device disconnected",
});
};

private gattOperations = new PromiseQueue({
abortCheck: () => {
if (!this.device.gatt?.connected) {
return this.disconnectedRejectionErrorFactory;
}
return undefined;
},
});

constructor(
public readonly device: BluetoothDevice,
private logging: Logging = new NullLogging(),
Expand Down Expand Up @@ -340,55 +337,10 @@ export class BluetoothDeviceWrapper {
}
}

private queueGattOperation(gattOperation: GattOperation) {
this.gattOperations.queue.push(gattOperation);
this.processGattOperationQueue();
}

private processGattOperationQueue = (): void => {
if (!this.device.gatt?.connected) {
// No longer connected. Drop queue.
this.clearGattQueueOnDisconnect();
return;
}
if (this.gattOperations.busy) {
// We will finish processing the current operation, then
// pick up processing the queue in the finally block.
return;
}
const gattOperation = this.gattOperations.queue.shift();
if (!gattOperation) {
return;
}
this.gattOperations.busy = true;
gattOperation
.operation()
.then((result) => {
gattOperation.callback.resolve(result);
})
.catch((err) => {
gattOperation.callback.reject(
new DeviceError({ code: "background-comms-error", message: err }),
);
this.logging.error("Error processing gatt operations queue", err);
})
.finally(() => {
this.gattOperations.busy = false;
this.processGattOperationQueue();
});
};

private clearGattQueueOnDisconnect() {
this.gattOperations.queue.forEach((op) => {
op.callback.reject(
new DeviceError({
code: "device-disconnected",
message:
"Error processing gatt operations queue - device disconnected",
}),
);
});
this.gattOperations = { busy: false, queue: [] };
private queueGattOperation<T>(action: () => Promise<T>): Promise<T> {
// Previously we wrapped rejections with:
// new DeviceError({ code: "background-comms-error", message: err }),
return this.gattOperations.add(action);
}

private createIfNeeded<T extends Service>(
Expand Down Expand Up @@ -424,7 +376,7 @@ export class BluetoothDeviceWrapper {

private disposeServices() {
this.serviceInfo.forEach((s) => s.dispose());
this.clearGattQueueOnDisconnect();
this.gattOperations.clear(this.disconnectedRejectionErrorFactory);
}
}

Expand Down
18 changes: 0 additions & 18 deletions lib/bluetooth-utils.ts

This file was deleted.

4 changes: 2 additions & 2 deletions lib/button-service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { GattOperation, Service } from "./bluetooth-device-wrapper.js";
import { Service } from "./bluetooth-device-wrapper.js";
import { profile } from "./bluetooth-profile.js";
import { ButtonEvent, ButtonState } from "./buttons.js";
import { BackgroundErrorEvent, DeviceError } from "./device.js";
Expand Down Expand Up @@ -29,7 +29,7 @@ export class ButtonService implements Service {
static async createService(
gattServer: BluetoothRemoteGATTServer,
dispatcher: TypedServiceEventDispatcher,
queueGattOperation: (gattOperation: GattOperation) => void,
queueGattOperation: <R>(action: () => Promise<R>) => Promise<R>,
listenerInit: boolean,
): Promise<ButtonService | undefined> {
let buttonService: BluetoothRemoteGATTService;
Expand Down
77 changes: 77 additions & 0 deletions lib/promise-queue.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { describe, expect, it } from "vitest";
import { PromiseQueue } from "./promise-queue.js";

describe("PromiseQueue", () => {
it("waits for previous items", async () => {
const sequence: number[] = [];
const queue = new PromiseQueue();
queue.add(async () => {
expect(sequence).toEqual([]);
sequence.push(1);
});
queue.add(async () => {
expect(sequence).toEqual([1]);
sequence.push(2);
});
expect(
await queue.add(async () => {
expect(sequence).toEqual([1, 2]);
sequence.push(3);
return 3;
}),
).toEqual(3);
expect(sequence).toEqual([1, 2, 3]);
});

it("copes with errors", async () => {
const queue = new PromiseQueue();
const sequence: (number | Error)[] = [];
const p1 = queue.add(() => {
sequence.push(1);
throw new Error("Oops");
});
const p2 = queue.add(() => {
sequence.push(2);
return Promise.resolve(2);
});
expect(await p2).toEqual(2);
await expect(p1).rejects.toThrow("Oops");
expect(sequence).toEqual([1, 2]);
});

it("clears", async () => {
const queue = new PromiseQueue();
const rejected: Promise<unknown>[] = [];
const p1 = queue.add(
() => new Promise((resolve) => setTimeout(resolve, 1000)),
);
const p2 = queue.add(
() => new Promise((resolve) => setTimeout(resolve, 1000)),
);
const p3 = queue.add(
() => new Promise((resolve) => setTimeout(resolve, 1000)),
);
p1.catch(() => rejected.push(p1));
p2.catch(() => rejected.push(p2));
p3.catch(() => rejected.push(p3));
queue.clear(() => new Error("Cleared!"));
await expect(p2).rejects.toThrow("Cleared!");
await expect(p3).rejects.toThrow("Cleared!");
expect(rejected).toEqual([p2, p3]);
});

it("detects abort", async () => {
let abort = false;
const queue = new PromiseQueue({
abortCheck: () => (abort ? () => new Error("Aborted") : undefined),
});
const p1 = queue.add(async () => {
abort = true;
});
const p2 = queue.add(async () => {
throw new Error("Does not happen");
});
expect(await p1).toBeUndefined();
await expect(p2).rejects.toThrow("Aborted");
});
});
Loading