Skip to content

Commit

Permalink
fix: Improve request queue behaviour (#817)
Browse files Browse the repository at this point in the history
* Keep requests in queue on send failure until expiration

* add missing await

* always send pending reqeuests in fastpoll, no implicit checkin on genPollCtrl
  • Loading branch information
slugzero authored Dec 4, 2023
1 parent 54adbb3 commit 1051d9d
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 42 deletions.
5 changes: 4 additions & 1 deletion src/controller/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,10 @@ class Controller extends events.EventEmitter {
}

device.updateLastSeen();
device.implicitCheckin();
//no implicit checkin for genPollCtrl data because it might interfere with the explicit checkin
if (!this.isZclDataPayload(dataPayload, dataType) || !dataPayload.frame.isCluster("genPollCtrl")) {
device.implicitCheckin();
}
device.linkquality = dataPayload.linkquality;

let endpoint = device.getEndpoint(dataPayload.endpoint);
Expand Down
5 changes: 2 additions & 3 deletions src/controller/model/endpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ class Endpoint extends Entity {

if (this.pendingRequests.size === 0) return;

if (this.sendInProgress) {
if (!fastPolling && this.sendInProgress) {
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): sendPendingRequests already in progress`);
return;
}
Expand All @@ -296,12 +296,11 @@ class Endpoint extends Entity {
const result = await request.send();
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): send success`);
request.resolve(result);
this.pendingRequests.delete(request);
} catch (error) {
debug.info(`Request Queue (${this.deviceIeeeAddress}/${this.ID}): send failed, expires in ` +
`${(request.expires - now) / 1000} seconds`);
request.reject(error);
}
this.pendingRequests.delete(request);
}
}
this.sendInProgress = false;
Expand Down
76 changes: 38 additions & 38 deletions test/controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4587,25 +4587,37 @@ describe('Controller', () => {
jest.advanceTimersByTime(10);
return f;
};
const origSendPendingRequests = endpoint.sendPendingRequests;
endpoint.sendPendingRequests = async (fastpoll) => {
const f = await origSendPendingRequests.call(endpoint, fastpoll);
jest.advanceTimersByTime(10);
return f;
};
endpoint.pendingRequests.add(new Request(async () => {}, [], 100));
mocksendZclFrameToEndpoint.mockClear();
mocksendZclFrameToEndpoint.mockImplementation(async () => {throw new Error('Dogs barking too hard')});
const nextTick = new Promise (process.nextTick);
mocksendZclFrameToEndpoint.mockImplementationOnce(async () => {throw new Error('Cats barking too hard');});
mocksendZclFrameToEndpoint.mockImplementationOnce(async () => {throw new Error('Dogs barking too hard');});
let nextTick = new Promise (process.nextTick);
const result = endpoint.write('genOnOff', {onOff: 1}, {disableResponse: true, sendWhen: 'active'});
await nextTick;
expect(mocksendZclFrameToEndpoint).toHaveBeenCalledTimes(1);
const data = {
wasBroadcast: false,
address: '0x129',
frame: ZclFrame.fromBuffer(Zcl.Utils.getCluster("msOccupancySensing").ID, Buffer.from([24,169,10,0,0,24,1])),
endpoint: 1,
linkquality: 50,
groupID: 1,
}

nextTick = new Promise (process.nextTick);
await mockAdapterEvents['zclData'](data);
await nextTick;
expect(mocksendZclFrameToEndpoint).toHaveBeenCalledTimes(2);
Date.now.mockReturnValue(100000);
let error = null;
try {
await mockAdapterEvents['zclData']({
wasBroadcast: false,
address: '0x129',
frame: ZclFrame.fromBuffer(Zcl.Utils.getCluster("msOccupancySensing").ID, Buffer.from([24,169,10,0,0,24,1])),
endpoint: 1,
linkquality: 50,
groupID: 1,
});

try{
await mockAdapterEvents['zclData'](data);
await result;
} catch (e) {
error = e;
Expand Down Expand Up @@ -4649,8 +4661,9 @@ describe('Controller', () => {
mocksendZclFrameToEndpoint.mockReturnValueOnce( {frame: {Payload: new Array( {"attrId": 3, "attrData": "three", "status": 0})}});
mocksendZclFrameToEndpoint.mockReturnValueOnce( {frame: {Payload: new Array( {"attrId": 4, "attrData": "four", "status": 0})}});
let result1, result2: Promise <any>;
let nextTick = new Promise (process.nextTick);
endpoint.write('genOnOff', {onOff: 0, startUpOnOff: 0}, {disableResponse: true, sendWhen: 'active'});
await new Promise (process.nextTick);
await nextTick;
// Queue content:
// 1. empty
// 2. ZCL write 'genOnOff' {onOff: 0, startUpOnOff: 0}
Expand Down Expand Up @@ -4729,37 +4742,24 @@ describe('Controller', () => {
expect(endpoint.pendingRequests.size).toStrictEqual (7);
expect(mocksendZclFrameToEndpoint).toHaveBeenCalledTimes(8);

try {
// Implicit checkin, there are 5 ZclFrames and 2 other requests left in the queue:
await mockAdapterEvents['zclData']({
wasBroadcast: false,
address: '0x129',
frame: ZclFrame.fromBuffer(Zcl.Utils.getCluster("msOccupancySensing").ID, Buffer.from([24,169,10,0,0,24,1])),
endpoint: 1,
linkquality: 50,
groupID: 1,
});
// Implicit checkin, there are 5 ZclFrames and 2 other requests left in the queue:
await mockAdapterEvents['zclData']({
wasBroadcast: false,
address: '0x129',
frame: ZclFrame.fromBuffer(Zcl.Utils.getCluster("msOccupancySensing").ID, Buffer.from([24,169,10,0,0,24,1])),
endpoint: 1,
linkquality: 50,
groupID: 1,
});

await result6;
} catch (e) {
try {
await result3;
} catch (e) {
try {
await result2;
} catch(e) {
error = e;
}
}
}
expect (result4).resolves.toStrictEqual({"3": "three"});
expect (result5).resolves.toStrictEqual({"3": "three"});
await result4;
expect(result4).resolves.toStrictEqual({"3": "three"});
expect(result5).resolves.toStrictEqual({"3": "three"});
expect(mocksendZclFrameToEndpoint).toHaveBeenCalledTimes(13);
expect(mocksendZclFrameToEndpoint.mock.calls[8][3].Payload).toStrictEqual ([{"attrData": 0, "attrId": 16387, "dataType": 48}]);
expect(mocksendZclFrameToEndpoint.mock.calls[9][3].Payload).toStrictEqual ([{"attrData": 1, "attrId": 0, "dataType": 16}]);
expect(mocksendZclFrameToEndpoint.mock.calls[10][3].Payload).toStrictEqual ([{"attrData": 0, "attrId": 0, "dataType": 16}, {"attrData": 0, "attrId": 16387, "dataType": 48}]);
expect(mocksendZclFrameToEndpoint.mock.calls[11][3].Payload).toStrictEqual ([{"attrData": 1, "attrId": 16387, "dataType": 48}]);
expect(error.message).toStrictEqual(`Write 0x129/1 genOnOff({"onOff":1}, {"sendWhen":"active","timeout":10000,"disableResponse":true,"disableRecovery":false,"disableDefaultResponse":true,"direction":0,"srcEndpoint":null,"reservedBits":0,"manufacturerCode":null,"transactionSequenceNumber":null,"writeUndiv":false}) failed (Write 0x129/1 genOnOff({"onOff":1}, {"sendWhen":"active","timeout":10000,"disableResponse":true,"disableRecovery":false,"disableDefaultResponse":true,"direction":0,"srcEndpoint":null,"reservedBits":0,"manufacturerCode":null,"transactionSequenceNumber":null,"writeUndiv":false}) failed (Dogs barking too hard))`);
});

it('Write with sendWhen active, discard messages after expiration', async () => {
Expand Down

0 comments on commit 1051d9d

Please sign in to comment.