Skip to content

Commit

Permalink
Added aps confirm/indication queue working parallel to aps request queue
Browse files Browse the repository at this point in the history
tuned processQueue timeouts
added readyToSend flag for better control of tranceived data requests
  • Loading branch information
ChrisHae committed Oct 21, 2020
1 parent 4b012f7 commit 1e7005f
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 26 deletions.
86 changes: 65 additions & 21 deletions src/adapter/deconz/driver/driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,22 @@ var queue: Array<object> = [];
var busyQueue: Array<object> = [];
var apsQueue: Array<object> = [];
var apsBusyQueue: Array<object> = [];
var apsConfirmIndQueue: Array<object> = [];
var timeoutCounter = 0;
export { busyQueue, apsBusyQueue };
var readyToSend: boolean = true;
function enableRTS() {
if (readyToSend === false) {
readyToSend = true;
}
}
function disableRTS() {
readyToSend = false;
}

//setInterval(() => { enableRTS(); }, 1000); // enable ReadyTo Send after 1 sec if a data_confirm get lost
var enableRtsTimeout: ReturnType<typeof setTimeout> = null;

export { busyQueue, apsBusyQueue, readyToSend, enableRTS, disableRTS, enableRtsTimeout };

var frameParser = require('./frameParser');

Expand All @@ -41,7 +55,7 @@ class Driver extends events.EventEmitter {
private seqNumber: number;
private timeoutResetTimeout: any;
private apsRequestFreeSlots: number;
private apsDataConfirm: number;
private apsDataConfirm: number;
private apsDataIndication: number;
private configChanged: number;

Expand All @@ -58,17 +72,18 @@ class Driver extends events.EventEmitter {
this.configChanged = 0;

const that = this;
setInterval(() => { that.processQueue(); }, 200); //50
setInterval(() => { that.processBusyQueue(); }, 1000); //10
setInterval(() => { that.processApsQueue(); }, 200); // fire aps request, indications and confirms
setInterval(() => { that.processQueue(); }, 100);
setInterval(() => { that.processBusyQueue(); }, 1000); // check timeouts for non aps requests
setInterval(() => { that.processApsQueue(); }, 200); // fire aps request
setInterval(() => { that.processApsBusyQueue(); }, 1000); // check timeouts for all open aps requests
setInterval(() => { that.processApsConfirmIndQueue(); }, 100); // fire aps indications and confirms
setInterval(() => { that.deviceStateRequest()
.then(result => {})
.catch(error => {}); }, 10000);

setInterval(() => { that.handleDeviceStatus()
.then(result => {})
.catch(error => {}); }, 1000); // query confirm and indication requests
.catch(error => {}); }, 200); // query confirm and indication requests

setInterval(() => {
that.writeParameterRequest(0x26, 600) // reset watchdog // 10 minutes
Expand Down Expand Up @@ -251,7 +266,7 @@ class Driver extends events.EventEmitter {

this.serialPort.write(slipframe, function(err) {
if (err) {
console.log("Error writing serial Port: " + err.message);
debug("Error writing serial Port: " + err.message);
}
});
}
Expand Down Expand Up @@ -393,38 +408,44 @@ class Driver extends events.EventEmitter {
private async handleDeviceStatus() {
if (this.apsDataConfirm === 1) {
try {
console.log("query aps data confirm");
debug("query aps data confirm");
this.apsDataConfirm = 0;
const x = await this.querySendDataStateRequest();
} catch {
//debug("APS Error - data confirm");
} catch (e) {
if (e.status === 5) {
this.apsDataConfirm = 0;
}
}
}
if (this.apsDataIndication === 1) {
try {
console.log("query aps data indication");
debug("query aps data indication");
this.apsDataIndication = 0;
const x = await this.readReceivedDataRequest();
} catch {
//debug("APS Error - data indication");
} catch (e) {
if (e.status === 5) {
this.apsDataIndication = 0;
}
}
}
if (this.configChanged === 1) {
// when network settings changed
}
}

// DATA_IND
private readReceivedDataRequest() : Promise<void> {
const seqNumber = this.nextSeqNumber();
return new Promise((resolve, reject): void => {
//debug(`push read received data request to apsQueue. seqNr: ${seqNumber}`);
const ts = 0;
const commandId = PARAM.PARAM.APS.DATA_INDICATION;
const req: Request = {commandId, seqNumber, resolve, reject, ts};
apsQueue.push(req);
apsConfirmIndQueue.push(req);
});
}

// DATA_REQ
public enqueueSendDataRequest(request: ApsDataRequest) : Promise<void | ReceivedDataResponse> {
const seqNumber = this.nextSeqNumber();
return new Promise((resolve, reject): void => {
Expand All @@ -437,14 +458,15 @@ class Driver extends events.EventEmitter {
});
}

// DATA_CONF
private querySendDataStateRequest() : Promise<void> {
const seqNumber = this.nextSeqNumber();
return new Promise((resolve, reject): void => {
//debug(`push query send data state request to apsQueue. seqNr: ${seqNumber}`);
const ts = 0;
const commandId = PARAM.PARAM.APS.DATA_CONFIRM;
const req: Request = {commandId, seqNumber, resolve, reject, ts};
apsQueue.push(req);
apsConfirmIndQueue.push(req);
});
}

Expand All @@ -462,6 +484,33 @@ class Driver extends events.EventEmitter {
const req: Request = apsQueue.shift();
req.ts = Date.now();

switch (req.commandId) {
case PARAM.PARAM.APS.DATA_REQUEST:
if (readyToSend === false) { // wait until last request was confirmed
debug("delay sending of APS Request");
apsQueue.unshift(req);
break;
} else {
disableRTS();
enableRtsTimeout = setTimeout(function(){enableRTS();}, 1000);
apsBusyQueue.push(req);
this.sendEnqueueSendDataRequest(req.request, req.seqNumber);
break;
}
default:
throw new Error("process APS queue - unknown command id");
break;
}
}

private async processApsConfirmIndQueue() {
if (apsConfirmIndQueue.length === 0) {
return;
}

const req: Request = apsConfirmIndQueue.shift();
req.ts = Date.now();

apsBusyQueue.push(req);

switch (req.commandId) {
Expand All @@ -473,12 +522,8 @@ class Driver extends events.EventEmitter {
//debug(`query send data state request. seqNr: ${req.seqNumber}`);
await this.sendQueryDataStateRequest(req.seqNumber);
break;
case PARAM.PARAM.APS.DATA_REQUEST:
//debug(`send data request. seqNr: ${req.seqNumber}`);
await this.sendEnqueueSendDataRequest(req.request, req.seqNumber);
break;
default:
throw new Error("process APS queue - unknown command id");
throw new Error("process APS Confirm/Ind queue - unknown command id");
break;
}
}
Expand Down Expand Up @@ -540,7 +585,6 @@ class Driver extends events.EventEmitter {
if (req.request != null && req.request.timeout != null) {
timeout = req.request.timeout * 1000; // seconds * 1000 = milliseconds
}

if ((now - req.ts) > timeout) {
debug(`Timeout for aps request CMD: 0x${req.commandId.toString(16)} seq: ${req.seqNumber}`);
//remove from busyQueue
Expand Down
10 changes: 5 additions & 5 deletions src/adapter/deconz/driver/frameParser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const MIN_BUFFER_SIZE = 3;
const littleEndian = true;
import PARAM from './constants';
import { busyQueue, apsBusyQueue } from './driver';
import { busyQueue, apsBusyQueue, readyToSend, enableRTS, disableRTS, enableRtsTimeout } from './driver';
import { Request, ReceivedDataResponse, DataStateResponse, Command, ParamMac, ParamPanId, ParamNwkAddr, ParamExtPanId, ParamChannel, ParamChannelMask, ParamPermitJoin, ParamNetworkKey, gpDataInd } from './constants';
import * as Events from '../../events';
import {ZclFrame} from '../../../zcl';
Expand Down Expand Up @@ -168,7 +168,8 @@ function parseQuerySendDataStateResponse(view : DataView) : object {
if (i < 0) {
return;
}

clearTimeout(enableRtsTimeout);
enableRTS(); // enable ReadyToSend because confirm received
const req: Request = apsBusyQueue[i];

// TODO timeout (at driver.ts)
Expand Down Expand Up @@ -300,7 +301,6 @@ function parseGreenPowerDataIndication(view : DataView) : object {
const ind: gpDataInd = {};
ind.seqNr = view.getUint8(1);

console.log(view);
if (view.byteLength < 30) {
debug("GP data notification");
ind.id = 0x00; // 0 = notification, 4 = commissioning
Expand Down Expand Up @@ -425,6 +425,8 @@ async function processFrame(frame: Uint8Array) : Promise<void> {
//}
}

//remove from busyqueue
queue.splice(i, 1);
if (status !== 0) {
// reject if status is not SUCCESS
//debug("REJECT REQUEST");
Expand All @@ -433,8 +435,6 @@ async function processFrame(frame: Uint8Array) : Promise<void> {
//debug("RESOLVE REQUEST");
req.resolve(command);
}
//remove from busyqueue
queue.splice(i, 1);
}

function parseFrame(frame: Uint8Array) : [number, number, Command, number] {
Expand Down

1 comment on commit 1e7005f

@ChrisHae
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sending APS requests is now faster again.
Sending many Requets at once is more stable now and not so many Confirms or Indications should get lost.
Because a ReadyToSend flag now controls the sending of aps requests depending of received data confirms and parallel processing of data indications and confirms next to the data request queue.

Please sign in to comment.