From e397e38849ff937acbf0aef9da69fa51ac3f0f7b Mon Sep 17 00:00:00 2001 From: Ilya Kirov Date: Sun, 18 Feb 2024 10:59:18 +0300 Subject: [PATCH] fix: EZSP: Some refactoring of UART level and packet processing. (#916) * simple parser tests * Minor refactoring of UART level and packet processing * lint * slightly increased the packet sending timeout for slow connections (for example, when using the Multiprotocol addon on rpi3). * Pause during stack initialization (for slow cases Multiprotocol on rpi3) * fix test * reduced the pause * Allow error on stop connection --- src/adapter/ezsp/adapter/ezspAdapter.ts | 40 +++++++------ src/adapter/ezsp/driver/driver.ts | 5 ++ src/adapter/ezsp/driver/ezsp.ts | 34 +++++++++-- src/adapter/ezsp/driver/parser.ts | 80 ++++++++++--------------- src/adapter/ezsp/driver/uart.ts | 59 +++++++++++++----- src/adapter/ezsp/driver/writer.ts | 23 ++++--- test/adapter/ezsp/uart.test.ts | 58 ++++++++++++++++++ 7 files changed, 199 insertions(+), 100 deletions(-) diff --git a/src/adapter/ezsp/adapter/ezspAdapter.ts b/src/adapter/ezsp/adapter/ezspAdapter.ts index b3671fc52e..ee69dba27b 100644 --- a/src/adapter/ezsp/adapter/ezspAdapter.ts +++ b/src/adapter/ezsp/adapter/ezspAdapter.ts @@ -249,24 +249,26 @@ class EZSPAdapter extends Adapter { } public async permitJoin(seconds: number, networkAddress: number): Promise { - return this.queue.execute(async () => { - this.checkInterpanLock(); - if (seconds) { - this.driver.preJoining(); - } - if (networkAddress) { - const result = await this.driver.zdoRequest( - networkAddress, EmberZDOCmd.Mgmt_Permit_Joining_req, - EmberZDOCmd.Mgmt_Permit_Joining_rsp, - {duration: seconds, tcSignificant: false} - ); - if (result.status !== EmberStatus.SUCCESS) { - throw new Error(`permitJoin for '${networkAddress}' failed`); + if (this.driver.ezsp.isInitialized()) { + return this.queue.execute(async () => { + this.checkInterpanLock(); + if (seconds) { + this.driver.preJoining(); } - } else { - await this.driver.permitJoining(seconds); - } - }); + if (networkAddress) { + const result = await this.driver.zdoRequest( + networkAddress, EmberZDOCmd.Mgmt_Permit_Joining_req, + EmberZDOCmd.Mgmt_Permit_Joining_rsp, + {duration: seconds, tcSignificant: false} + ); + if (result.status !== EmberStatus.SUCCESS) { + throw new Error(`permitJoin for '${networkAddress}' failed`); + } + } else { + await this.driver.permitJoining(seconds); + } + }); + } } public async getCoordinatorVersion(): Promise { @@ -626,7 +628,9 @@ class EZSPAdapter extends Adapter { } public async backup(): Promise { - return this.backupMan.createBackup(); + if (this.driver.ezsp.isInitialized()) { + return this.backupMan.createBackup(); + } } public async restoreChannelInterPAN(): Promise { diff --git a/src/adapter/ezsp/driver/driver.ts b/src/adapter/ezsp/driver/driver.ts index 8440798882..e0324aa2b1 100644 --- a/src/adapter/ezsp/driver/driver.ts +++ b/src/adapter/ezsp/driver/driver.ts @@ -115,7 +115,12 @@ export class Driver extends EventEmitter { try { // don't emit 'close' on stop since we don't want this to bubble back up as 'disconnected' to the controller. await this.stop(false); + } catch (err) { + debug.error(`Stop error ${err.stack}`); + } + try { await Wait(1000); + debug.log(`Startup again.`); await this.startup(); } catch (err) { debug.error(`Reset error ${err.stack}`); diff --git a/src/adapter/ezsp/driver/ezsp.ts b/src/adapter/ezsp/driver/ezsp.ts index 4bf89da2a7..4b613b18c7 100644 --- a/src/adapter/ezsp/driver/ezsp.ts +++ b/src/adapter/ezsp/driver/ezsp.ts @@ -329,6 +329,7 @@ export class Ezsp extends EventEmitter { private queue: Queue; private watchdogTimer: NodeJS.Timeout; private failures = 0; + private inResetingProcess = false; constructor() { super(); @@ -344,6 +345,11 @@ export class Ezsp extends EventEmitter { public async connect(options: SerialPortOptions): Promise { let lastError = null; + const resetForReconnect = (): void => { + throw new Error("Failure to connect"); + }; + this.serialDriver.on('reset', resetForReconnect); + for (let i = 1; i <= MAX_SERIAL_CONNECT_ATTEMPTS; i++) { try { await this.serialDriver.connect(options); @@ -360,10 +366,14 @@ export class Ezsp extends EventEmitter { } } + this.serialDriver.off('reset', resetForReconnect); + if (!this.serialDriver.isInitialized()) { throw new Error("Failure to connect", {cause: lastError}); } + this.inResetingProcess = false; + this.serialDriver.on('reset', this.onSerialReset.bind(this)); if (WATCHDOG_WAKE_PERIOD) { @@ -374,14 +384,21 @@ export class Ezsp extends EventEmitter { } } + public isInitialized(): boolean { + return this.serialDriver?.isInitialized(); + } + private onSerialReset(): void { debug.log('onSerialReset()'); + this.inResetingProcess = true; this.emit('reset'); } private onSerialClose(): void { debug.log('onSerialClose()'); - this.emit('close'); + if (!this.inResetingProcess) { + this.emit('close'); + } } public async close(emitClose: boolean): Promise { @@ -746,17 +763,24 @@ export class Ezsp extends EventEmitter { private async watchdogHandler(): Promise { debug.log(`Time to watchdog ... ${this.failures}`); + if (this.inResetingProcess) { + debug.log('The reset process is in progress...'); + return; + } + try { await this.execCommand('nop'); } catch (error) { debug.error(`Watchdog heartbeat timeout ${error.stack}`); - this.failures += 1; + if (!this.inResetingProcess) { + this.failures += 1; - if (this.failures > MAX_WATCHDOG_FAILURES) { - this.failures = 0; + if (this.failures > MAX_WATCHDOG_FAILURES) { + this.failures = 0; - this.emit('reset'); + this.emit('reset'); + } } } } diff --git a/src/adapter/ezsp/driver/parser.ts b/src/adapter/ezsp/driver/parser.ts index 2ba01f576f..abfa71037d 100644 --- a/src/adapter/ezsp/driver/parser.ts +++ b/src/adapter/ezsp/driver/parser.ts @@ -7,14 +7,14 @@ import Frame from './frame'; const debug = Debug('zigbee-herdsman:adapter:ezsp:uart'); export class Parser extends stream.Transform { - private buffer: Buffer; + private tail: Buffer[]; private flagXONXOFF: boolean; public constructor(flagXONXOFF: boolean = false) { super(); this.flagXONXOFF = flagXONXOFF; - this.buffer = Buffer.from([]); + this.tail = []; } public _transform(chunk: Buffer, _: string, cb: () => void): void { @@ -24,80 +24,64 @@ export class Parser extends stream.Transform { } if (chunk.indexOf(consts.CANCEL) >= 0) { - this.buffer = Buffer.from([]); + this.reset(); chunk = chunk.subarray(chunk.lastIndexOf(consts.CANCEL) + 1); } if (chunk.indexOf(consts.SUBSTITUTE) >= 0) { - this.buffer = Buffer.from([]); + this.reset(); chunk = chunk.subarray(chunk.indexOf(consts.FLAG) + 1); } debug(`<-- [${chunk.toString('hex')}]`); - this.buffer = Buffer.concat([this.buffer, chunk]); + let delimiterPlace = chunk.indexOf(consts.FLAG); - this.parseNext(); - cb(); - } - - private parseNext(): void { - if (this.buffer.length) { - const place = this.buffer.indexOf(consts.FLAG); - - if (place >= 0) { - const frameLength = place + 1; + while (delimiterPlace >= 0) { + const buffer = chunk.subarray(0, delimiterPlace + 1); + const frameBuffer = Buffer.from([...this.unstuff(Buffer.concat([...this.tail, buffer]))]); + this.reset(); - if (this.buffer.length >= frameLength) { - const frameBuffer = this.unstuff(this.buffer.subarray(0, frameLength)); + try { + const frame = Frame.fromBuffer(frameBuffer); - try { - const frame = Frame.fromBuffer(frameBuffer); - - if (frame) { - debug(`--> parsed ${frame}`); - this.emit('parsed', frame); - } - } catch (error) { - debug(`--> error ${error.stack}`); - } - - this.buffer = this.buffer.subarray(frameLength); - this.parseNext(); + if (frame) { + this.emit('parsed', frame); } + + } catch (error) { + debug(`<-- error ${error.stack}`); } + + chunk = chunk.subarray(delimiterPlace + 1); + delimiterPlace = chunk.indexOf(consts.FLAG); } - } - private unstuff(s: Buffer): Buffer { - /* Unstuff (unescape) a string after receipt */ + this.tail.push(chunk); + cb(); + } + + private* unstuff(buffer: Buffer): Generator { + /* Unstuff (unescape) a buffer after receipt */ let escaped = false; - const out = Buffer.alloc(s.length); - let outIdx = 0; - - for (let idx = 0; idx < s.length; idx += 1) { - const c = s[idx]; - + for (const byte of buffer) { if (escaped) { - out.writeUInt8(c ^ consts.STUFF, outIdx++); - + yield byte ^ consts.STUFF; escaped = false; } else { - if (c === consts.ESCAPE) { + if (byte === consts.ESCAPE) { escaped = true; - } else if (c === consts.XOFF || c === consts.XON) { + } else if (byte === consts.XOFF || byte === consts.XON) { // skip } else { - out.writeUInt8(c, outIdx++); + yield byte; } } } - - return out.subarray(0, outIdx); } public reset(): void { - // clear buffer - this.buffer = Buffer.from([]); + // clear tail + this.tail.length = 0; } } \ No newline at end of file diff --git a/src/adapter/ezsp/driver/uart.ts b/src/adapter/ezsp/driver/uart.ts index 1755a0e6ff..9af5f06597 100644 --- a/src/adapter/ezsp/driver/uart.ts +++ b/src/adapter/ezsp/driver/uart.ts @@ -9,6 +9,7 @@ import {Parser} from './parser'; import {Frame as NpiFrame, FrameType} from './frame'; import Debug from "debug"; import {SerialPortOptions} from '../../tstype'; +import wait from '../../../utils/wait'; const debug = Debug('zigbee-herdsman:adapter:ezsp:uart'); @@ -44,6 +45,7 @@ export class SerialDriver extends EventEmitter { private sendSeq = 0; // next frame number to send private recvSeq = 0; // next frame number to receive private ackSeq = 0; // next number after the last accepted frame + private rejectCondition = false; private waitress: Waitress; private queue: Queue; @@ -162,18 +164,10 @@ export class SerialDriver extends EventEmitter { } private async onParsed(frame: NpiFrame): Promise { + const rejectCondition = this.rejectCondition; try { frame.checkCRC(); - } catch (error) { - debug(error); - - // send NAK - this.writer.sendNAK(this.recvSeq); - // skip handler - return; - } - try { /* Frame receive handler */ switch (frame.type) { case FrameType.DATA: @@ -195,12 +189,21 @@ export class SerialDriver extends EventEmitter { await this.handleError(frame); break; default: + this.rejectCondition = true; debug(`UNKNOWN FRAME RECEIVED: ${frame}`); } } catch (error) { + this.rejectCondition = true; + debug(error); debug(`Error while parsing to NpiFrame '${error.stack}'`); } + + // We send NAK only if the rejectCondition was set in the current processing + if (!rejectCondition && this.rejectCondition) { + // send NAK + this.writer.sendNAK(this.recvSeq); + } } private handleDATA(frame: NpiFrame): void { @@ -210,6 +213,22 @@ export class SerialDriver extends EventEmitter { debug(`<-- DATA (${frmNum},${frame.control & 0x07},${reTx}): ${frame}`); + // Expected package {recvSeq}, but received {frmNum} + // This happens when the chip sends us a reTx packet, but we are waiting for the next one + if (this.recvSeq != frmNum) { + if (reTx) { + // if the reTx flag is set, then this is a packet replay + debug(`Unexpected DATA packet sequence ${frmNum} | ${this.recvSeq}: packet replay`); + } else { + // otherwise, the sequence of packets is out of order - skip or send NAK is needed + debug(`Unexpected DATA packet sequence ${frmNum} | ${this.recvSeq}: reject condition`); + this.rejectCondition = true; + return; + } + } + + this.rejectCondition = false; + this.recvSeq = (frmNum + 1) & 7; // next debug(`--> ACK (${this.recvSeq})`); @@ -241,6 +260,9 @@ export class SerialDriver extends EventEmitter { const handled = this.waitress.resolve({sequence: this.ackSeq}); if (!handled && this.sendSeq !== this.ackSeq) { + // Packet confirmation received for {ackSeq}, but was expected {sendSeq} + // This happens when the chip has not yet received of the packet {sendSeq} from us, + // but has already sent us the next one. debug(`Unexpected packet sequence ${this.ackSeq} | ${this.sendSeq}`); } @@ -270,8 +292,7 @@ export class SerialDriver extends EventEmitter { private handleRSTACK(frame: NpiFrame): void { /* Reset acknowledgement frame receive handler */ let code; - this.sendSeq = 0; - this.recvSeq = 0; + this.rejectCondition = false; debug(`<-- RSTACK ${frame}`); @@ -305,16 +326,22 @@ export class SerialDriver extends EventEmitter { debug('Uart reseting'); this.parser.reset(); this.queue.clear(); + this.sendSeq = 0; + this.recvSeq = 0; return this.queue.execute(async (): Promise => { try { debug(`--> Write reset`); const waiter = this.waitFor(-1, 10000); - + this.rejectCondition = false; + this.writer.sendReset(); debug(`-?- waiting reset`); await waiter.start().promise; debug(`-+- waiting reset success`); + + await wait(2000); + } catch (e) { debug(`--> Error: ${e}`); @@ -407,16 +434,16 @@ export class SerialDriver extends EventEmitter { debug(`--> Error: ${e2}`); debug(`-!- break rewaiting (${nextSeq})`); debug(`Can't resend DATA frame (${seq},${ackSeq},1): ${data.toString('hex')}`); - - this.emit('reset'); - + if (this.initialized) { + this.emit('reset'); + } throw new Error(`sendDATA error: try 1: ${e1}, try 2: ${e2}`); } } }); } - public waitFor(sequence: number, timeout = 2000) + public waitFor(sequence: number, timeout = 4000) : { start: () => { promise: Promise; ID: number }; ID: number } { return this.waitress.waitFor({sequence}, timeout); } diff --git a/src/adapter/ezsp/driver/writer.ts b/src/adapter/ezsp/driver/writer.ts index f53e7b292e..e657461f8f 100644 --- a/src/adapter/ezsp/driver/writer.ts +++ b/src/adapter/ezsp/driver/writer.ts @@ -40,27 +40,24 @@ export class Writer extends stream.Readable { this.writeBuffer(dataFrame); } - private stuff(s: Iterable): Buffer { + private* stuff (buffer: number[]): Generator { /* Byte stuff (escape) a string for transmission */ - const out = Buffer.alloc(256); - let outIdx = 0; - for (const c of s) { - if (consts.RESERVED.includes(c)) { - out.writeUInt8(consts.ESCAPE, outIdx++); - out.writeUInt8(c ^ consts.STUFF, outIdx++); + for (const byte of buffer) { + if (consts.RESERVED.includes(byte)) { + yield consts.ESCAPE; + yield byte ^ consts.STUFF; } else { - out.writeUInt8(c, outIdx++); + yield byte; } } - return out.subarray(0, outIdx); } private makeFrame(control: number, data?: Buffer): Buffer { /* Construct a frame */ - const ctrl = Buffer.from([control]); - const frm = (data) ? Buffer.concat([ctrl, data]) : ctrl; + const frm = [control, ...(data || [])]; const crc = crc16ccitt(frm, 65535); - const crcArr = Buffer.from([(crc >> 8), (crc % 256)]); - return Buffer.concat([this.stuff(Buffer.concat([frm, crcArr])), Buffer.from([consts.FLAG])]); + frm.push(crc >> 8); + frm.push(crc % 256); + return Buffer.from([...this.stuff(frm), consts.FLAG]); } } \ No newline at end of file diff --git a/test/adapter/ezsp/uart.test.ts b/test/adapter/ezsp/uart.test.ts index 44774cf3cd..6e2f774a14 100644 --- a/test/adapter/ezsp/uart.test.ts +++ b/test/adapter/ezsp/uart.test.ts @@ -2,6 +2,9 @@ import "regenerator-runtime/runtime"; import {SerialPort} from '../../../src/adapter/serialPort'; import {SerialDriver} from '../../../src/adapter/ezsp/driver/uart'; import {Writer} from '../../../src/adapter/ezsp/driver/writer'; +import {Parser} from '../../../src/adapter/ezsp/driver/parser'; +import {FrameType} from '../../../src/adapter/ezsp/driver/frame'; + let mockParser; const mockSerialPortClose = jest.fn().mockImplementation((cb) => cb ? cb() : null); @@ -42,6 +45,10 @@ jest.mock('../../../src/adapter/serialPort', () => { }; }); +jest.mock('../../../src/utils/wait', () => { + return () => {}; +}); + let writeBufferSpy; const mocks = [ @@ -118,4 +125,55 @@ describe('UART', () => { serialDriver.sendDATA(Buffer.from([1,2,3])); expect(writeBufferSpy).toHaveBeenCalledTimes(2); }); + + it('Receive data', async () => { + const parsed = []; + const parser = new Parser(); + parser.on('parsed', (result) => parsed.push(result)); + // send 4 frames + const buffer0 = Buffer.from([0xc1, 0x02, 0x0b, 0x0a, 0x52, 0x7e]); + const buffer1 = Buffer.from([0x22, 0x5b, 0xb1, 0xa9, 0x0d, 0x2a, 0xc1, 0xd8, 0x19, 0x53, 0x4a, 0x14, + 0xaa, 0xe9, 0x87, 0x49, 0xfc, 0xfa, 0x26, 0x7d, 0x5e, 0xc5, 0xaa, 0xc8, 0x7e]); + const buffer2 = Buffer.from([0x32, 0x5b, 0xb1, 0xa9, 0x7d, 0x31, 0x2a, 0x15, 0xb6, 0x58, 0x8d, 0x4a, + 0x06, 0xab, 0x55, 0x93, 0x49, 0x9c, 0x45, 0x7b, 0x7d, 0x38, 0x39, 0xa4, 0x98, 0x74, 0xf1, 0xd7, + 0x26, 0x88, 0xfc, 0x6b, 0x2f, 0xf6, 0xe9, 0xc5, 0xde, 0x6b, 0x8f, 0xfb, 0xd8, 0xf9, 0x7e]); + const buffer3 = Buffer.from([0xa2, 0x74, 0x58, 0x7e]); + parser._transform(buffer0, '', () => {}); + parser._transform(buffer1, '', () => {}); + parser._transform(buffer2, '', () => {}); + parser._transform(buffer3, '', () => {}); + expect(parsed.length).toBe(4); + expect(parsed[0].type).toBe(FrameType.RSTACK); + expect(parsed[1].type).toBe(FrameType.DATA); + expect(parsed[2].type).toBe(FrameType.DATA); + expect(parsed[3].type).toBe(FrameType.NAK); + }); + + it('Message in two chunks', async () => { + const parsed = []; + const parser = new Parser(); + parser.on('parsed', (result) => parsed.push(result)); + const buffer1 = Buffer.from([0x22, 0x5b, 0xb1, 0xa9, 0x0d, 0x2a, 0xc1, 0xd8, 0x19, 0x53, 0x4a, 0x14]); + parser._transform(buffer1, '', () => {}); + expect(parsed.length).toBe(0); + const buffer2 = Buffer.from([0xaa, 0xe9, 0x87, 0x49, 0xfc, 0xfa, 0x26, 0x7d, 0x5e, 0xc5, 0xaa, 0xc8, 0x7e]); + parser._transform(buffer2, '', () => {}); + expect(parsed.length).toBe(1); + expect(parsed[0].type).toBe(FrameType.DATA); + }); + + it('Two messages in one chunk', async () => { + const parsed = []; + const parser = new Parser(); + parser.on('parsed', (result) => parsed.push(result)); + const buffer1 = Buffer.from([0x22, 0x5b, 0xb1, 0xa9, 0x0d, 0x2a, 0xc1, 0xd8, 0x19, 0x53, 0x4a, 0x14, + 0xaa, 0xe9, 0x87, 0x49, 0xfc, 0xfa, 0x26, 0x7d, 0x5e, 0xc5, 0xaa, 0xc8, 0x7e, + 0x32, 0x5b, 0xb1, 0xa9, 0x7d, 0x31, 0x2a, 0x15, 0xb6, 0x58, 0x8d, 0x4a, + 0x06, 0xab, 0x55, 0x93, 0x49, 0x9c, 0x45, 0x7b, 0x7d, 0x38, 0x39, 0xa4, 0x98, 0x74, 0xf1, 0xd7, + 0x26, 0x88, 0xfc, 0x6b, 0x2f, 0xf6, 0xe9, 0xc5, 0xde, 0x6b, 0x8f, 0xfb, 0xd8, 0xf9, 0x7e]); + parser._transform(buffer1, '', () => {}); + expect(parsed.length).toBe(2); + expect(parsed[0].type).toBe(FrameType.DATA); + expect(parsed[1].type).toBe(FrameType.DATA); + }); });