Skip to content

Commit 06289c9

Browse files
Replace adhoc promise queueing (#19)
I think we should be doing this more widely to deal with lifecycle event overlap (disconnect/connect etc) but this is a start.
1 parent 5943a10 commit 06289c9

File tree

7 files changed

+195
-128
lines changed

7 files changed

+195
-128
lines changed

lib/accelerometer-service.ts

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { AccelerometerData, AccelerometerDataEvent } from "./accelerometer.js";
2-
import { GattOperation, Service } from "./bluetooth-device-wrapper.js";
2+
import { Service } from "./bluetooth-device-wrapper.js";
33
import { profile } from "./bluetooth-profile.js";
4-
import { createGattOperationPromise } from "./bluetooth-utils.js";
54
import { BackgroundErrorEvent, DeviceError } from "./device.js";
65
import {
76
CharacteristicDataTarget,
@@ -14,7 +13,7 @@ export class AccelerometerService implements Service {
1413
private accelerometerDataCharacteristic: BluetoothRemoteGATTCharacteristic,
1514
private accelerometerPeriodCharacteristic: BluetoothRemoteGATTCharacteristic,
1615
private dispatchTypedEvent: TypedServiceEventDispatcher,
17-
private queueGattOperation: (gattOperation: GattOperation) => void,
16+
private queueGattOperation: <R>(action: () => Promise<R>) => Promise<R>,
1817
) {
1918
this.accelerometerDataCharacteristic.addEventListener(
2019
"characteristicvaluechanged",
@@ -32,7 +31,7 @@ export class AccelerometerService implements Service {
3231
static async createService(
3332
gattServer: BluetoothRemoteGATTServer,
3433
dispatcher: TypedServiceEventDispatcher,
35-
queueGattOperation: (gattOperation: GattOperation) => void,
34+
queueGattOperation: <R>(action: () => Promise<R>) => Promise<R>,
3635
listenerInit: boolean,
3736
): Promise<AccelerometerService | undefined> {
3837
let accelerometerService: BluetoothRemoteGATTService;
@@ -76,22 +75,16 @@ export class AccelerometerService implements Service {
7675
}
7776

7877
async getData(): Promise<AccelerometerData> {
79-
const { callback, gattOperationPromise } = createGattOperationPromise();
80-
this.queueGattOperation({
81-
callback,
82-
operation: () => this.accelerometerDataCharacteristic.readValue(),
83-
});
84-
const dataView = (await gattOperationPromise) as DataView;
78+
const dataView = await this.queueGattOperation(() =>
79+
this.accelerometerDataCharacteristic.readValue(),
80+
);
8581
return this.dataViewToData(dataView);
8682
}
8783

8884
async getPeriod(): Promise<number> {
89-
const { callback, gattOperationPromise } = createGattOperationPromise();
90-
this.queueGattOperation({
91-
callback,
92-
operation: () => this.accelerometerPeriodCharacteristic.readValue(),
93-
});
94-
const dataView = (await gattOperationPromise) as DataView;
85+
const dataView = await this.queueGattOperation(() =>
86+
this.accelerometerPeriodCharacteristic.readValue(),
87+
);
9588
return dataView.getUint16(0, true);
9689
}
9790

@@ -104,17 +97,11 @@ export class AccelerometerService implements Service {
10497
// Values passed are rounded up to the allowed values on device.
10598
// Documentation for allowed values looks wrong.
10699
// https://lancaster-university.github.io/microbit-docs/resources/bluetooth/bluetooth_profile.html
107-
const { callback, gattOperationPromise } = createGattOperationPromise();
108100
const dataView = new DataView(new ArrayBuffer(2));
109101
dataView.setUint16(0, value, true);
110-
this.queueGattOperation({
111-
callback,
112-
operation: () =>
113-
this.accelerometerPeriodCharacteristic.writeValueWithoutResponse(
114-
dataView,
115-
),
116-
});
117-
await gattOperationPromise;
102+
return this.queueGattOperation(() =>
103+
this.accelerometerDataCharacteristic.writeValueWithoutResponse(dataView),
104+
);
118105
}
119106

120107
async startNotifications(type: TypedServiceEvent): Promise<void> {

lib/bluetooth-device-wrapper.ts

Lines changed: 22 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -9,27 +9,13 @@ import { profile } from "./bluetooth-profile.js";
99
import { ButtonService } from "./button-service.js";
1010
import { BoardVersion, DeviceError } from "./device.js";
1111
import { Logging, NullLogging } from "./logging.js";
12+
import { PromiseQueue } from "./promise-queue.js";
1213
import {
1314
ServiceConnectionEventMap,
1415
TypedServiceEvent,
1516
TypedServiceEventDispatcher,
1617
} from "./service-events.js";
1718

18-
export interface GattOperationCallback {
19-
resolve: (result: DataView | void) => void;
20-
reject: (error: DeviceError) => void;
21-
}
22-
23-
export interface GattOperation {
24-
operation: () => Promise<DataView | void>;
25-
callback: GattOperationCallback;
26-
}
27-
28-
interface GattOperations {
29-
busy: boolean;
30-
queue: GattOperation[];
31-
}
32-
3319
const deviceIdToWrapper: Map<string, BluetoothDeviceWrapper> = new Map();
3420

3521
const connectTimeoutDuration: number = 10000;
@@ -62,7 +48,7 @@ class ServiceInfo<T extends Service> {
6248
private serviceFactory: (
6349
gattServer: BluetoothRemoteGATTServer,
6450
dispatcher: TypedServiceEventDispatcher,
65-
queueGattOperation: (gattOperation: GattOperation) => void,
51+
queueGattOperation: <R>(action: () => Promise<R>) => Promise<R>,
6652
listenerInit: boolean,
6753
) => Promise<T | undefined>,
6854
public events: TypedServiceEvent[],
@@ -75,7 +61,7 @@ class ServiceInfo<T extends Service> {
7561
async createIfNeeded(
7662
gattServer: BluetoothRemoteGATTServer,
7763
dispatcher: TypedServiceEventDispatcher,
78-
queueGattOperation: (gattOperation: GattOperation) => void,
64+
queueGattOperation: <R>(action: () => Promise<R>) => Promise<R>,
7965
listenerInit: boolean,
8066
): Promise<T | undefined> {
8167
this.service =
@@ -132,11 +118,22 @@ export class BluetoothDeviceWrapper {
132118

133119
boardVersion: BoardVersion | undefined;
134120

135-
private gattOperations: GattOperations = {
136-
busy: false,
137-
queue: [],
121+
private disconnectedRejectionErrorFactory = () => {
122+
return new DeviceError({
123+
code: "device-disconnected",
124+
message: "Error processing gatt operations queue - device disconnected",
125+
});
138126
};
139127

128+
private gattOperations = new PromiseQueue({
129+
abortCheck: () => {
130+
if (!this.device.gatt?.connected) {
131+
return this.disconnectedRejectionErrorFactory;
132+
}
133+
return undefined;
134+
},
135+
});
136+
140137
constructor(
141138
public readonly device: BluetoothDevice,
142139
private logging: Logging = new NullLogging(),
@@ -357,55 +354,10 @@ export class BluetoothDeviceWrapper {
357354
}
358355
}
359356

360-
private queueGattOperation(gattOperation: GattOperation) {
361-
this.gattOperations.queue.push(gattOperation);
362-
this.processGattOperationQueue();
363-
}
364-
365-
private processGattOperationQueue = (): void => {
366-
if (!this.device.gatt?.connected) {
367-
// No longer connected. Drop queue.
368-
this.clearGattQueueOnDisconnect();
369-
return;
370-
}
371-
if (this.gattOperations.busy) {
372-
// We will finish processing the current operation, then
373-
// pick up processing the queue in the finally block.
374-
return;
375-
}
376-
const gattOperation = this.gattOperations.queue.shift();
377-
if (!gattOperation) {
378-
return;
379-
}
380-
this.gattOperations.busy = true;
381-
gattOperation
382-
.operation()
383-
.then((result) => {
384-
gattOperation.callback.resolve(result);
385-
})
386-
.catch((err) => {
387-
gattOperation.callback.reject(
388-
new DeviceError({ code: "background-comms-error", message: err }),
389-
);
390-
this.logging.error("Error processing gatt operations queue", err);
391-
})
392-
.finally(() => {
393-
this.gattOperations.busy = false;
394-
this.processGattOperationQueue();
395-
});
396-
};
397-
398-
private clearGattQueueOnDisconnect() {
399-
this.gattOperations.queue.forEach((op) => {
400-
op.callback.reject(
401-
new DeviceError({
402-
code: "device-disconnected",
403-
message:
404-
"Error processing gatt operations queue - device disconnected",
405-
}),
406-
);
407-
});
408-
this.gattOperations = { busy: false, queue: [] };
357+
private queueGattOperation<T>(action: () => Promise<T>): Promise<T> {
358+
// Previously we wrapped rejections with:
359+
// new DeviceError({ code: "background-comms-error", message: err }),
360+
return this.gattOperations.add(action);
409361
}
410362

411363
private createIfNeeded<T extends Service>(
@@ -441,7 +393,7 @@ export class BluetoothDeviceWrapper {
441393

442394
private disposeServices() {
443395
this.serviceInfo.forEach((s) => s.dispose());
444-
this.clearGattQueueOnDisconnect();
396+
this.gattOperations.clear(this.disconnectedRejectionErrorFactory);
445397
}
446398
}
447399

lib/bluetooth-utils.ts

Lines changed: 0 additions & 18 deletions
This file was deleted.

lib/button-service.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { GattOperation, Service } from "./bluetooth-device-wrapper.js";
1+
import { Service } from "./bluetooth-device-wrapper.js";
22
import { profile } from "./bluetooth-profile.js";
33
import { ButtonEvent, ButtonState } from "./buttons.js";
44
import { BackgroundErrorEvent, DeviceError } from "./device.js";
@@ -29,7 +29,7 @@ export class ButtonService implements Service {
2929
static async createService(
3030
gattServer: BluetoothRemoteGATTServer,
3131
dispatcher: TypedServiceEventDispatcher,
32-
queueGattOperation: (gattOperation: GattOperation) => void,
32+
queueGattOperation: <R>(action: () => Promise<R>) => Promise<R>,
3333
listenerInit: boolean,
3434
): Promise<ButtonService | undefined> {
3535
let buttonService: BluetoothRemoteGATTService;

lib/promise-queue.test.ts

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import { describe, expect, it } from "vitest";
2+
import { PromiseQueue } from "./promise-queue.js";
3+
4+
describe("PromiseQueue", () => {
5+
it("waits for previous items", async () => {
6+
const sequence: number[] = [];
7+
const queue = new PromiseQueue();
8+
queue.add(async () => {
9+
expect(sequence).toEqual([]);
10+
sequence.push(1);
11+
});
12+
queue.add(async () => {
13+
expect(sequence).toEqual([1]);
14+
sequence.push(2);
15+
});
16+
expect(
17+
await queue.add(async () => {
18+
expect(sequence).toEqual([1, 2]);
19+
sequence.push(3);
20+
return 3;
21+
}),
22+
).toEqual(3);
23+
expect(sequence).toEqual([1, 2, 3]);
24+
});
25+
26+
it("copes with errors", async () => {
27+
const queue = new PromiseQueue();
28+
const sequence: (number | Error)[] = [];
29+
const p1 = queue.add(() => {
30+
sequence.push(1);
31+
throw new Error("Oops");
32+
});
33+
const p2 = queue.add(() => {
34+
sequence.push(2);
35+
return Promise.resolve(2);
36+
});
37+
expect(await p2).toEqual(2);
38+
await expect(p1).rejects.toThrow("Oops");
39+
expect(sequence).toEqual([1, 2]);
40+
});
41+
42+
it("clears", async () => {
43+
const queue = new PromiseQueue();
44+
const rejected: Promise<unknown>[] = [];
45+
const p1 = queue.add(
46+
() => new Promise((resolve) => setTimeout(resolve, 1000)),
47+
);
48+
const p2 = queue.add(
49+
() => new Promise((resolve) => setTimeout(resolve, 1000)),
50+
);
51+
const p3 = queue.add(
52+
() => new Promise((resolve) => setTimeout(resolve, 1000)),
53+
);
54+
p1.catch(() => rejected.push(p1));
55+
p2.catch(() => rejected.push(p2));
56+
p3.catch(() => rejected.push(p3));
57+
queue.clear(() => new Error("Cleared!"));
58+
await expect(p2).rejects.toThrow("Cleared!");
59+
await expect(p3).rejects.toThrow("Cleared!");
60+
expect(rejected).toEqual([p2, p3]);
61+
});
62+
63+
it("detects abort", async () => {
64+
let abort = false;
65+
const queue = new PromiseQueue({
66+
abortCheck: () => (abort ? () => new Error("Aborted") : undefined),
67+
});
68+
const p1 = queue.add(async () => {
69+
abort = true;
70+
});
71+
const p2 = queue.add(async () => {
72+
throw new Error("Does not happen");
73+
});
74+
expect(await p1).toBeUndefined();
75+
await expect(p2).rejects.toThrow("Aborted");
76+
});
77+
});

0 commit comments

Comments
 (0)