From 8f07c7f8259fb8aa76c44a229ce8ec63954d926d Mon Sep 17 00:00:00 2001 From: Ivan Martinez Date: Sun, 29 Sep 2024 16:39:18 -0300 Subject: [PATCH] trying now with throttleit library --- lib/extension/receive.ts | 55 +++++++++++++-------------------- lib/types/types.d.ts | 2 +- package-lock.json | 13 ++++++++ package.json | 1 + test/receive.test.js | 66 ++++++++++++++++++++++++++++++---------- 5 files changed, 86 insertions(+), 51 deletions(-) diff --git a/lib/extension/receive.ts b/lib/extension/receive.ts index 8da692ab0f..b62ee38510 100755 --- a/lib/extension/receive.ts +++ b/lib/extension/receive.ts @@ -3,6 +3,7 @@ import assert from 'assert'; import bind from 'bind-decorator'; import debounce from 'debounce'; import stringify from 'json-stable-stringify-without-jsonify'; +import throttle from 'throttleit'; import * as zhc from 'zigbee-herdsman-converters'; @@ -16,6 +17,7 @@ type DebounceFunction = (() => void) & {clear(): void} & {flush(): void}; export default class Receive extends Extension { private elapsed: {[s: string]: number} = {}; private debouncers: {[s: string]: {payload: KeyValue; publish: DebounceFunction}} = {}; + private throttlers: {[s: string]: {publish: Function}} = {}; async start(): Promise { this.eventBus.onPublishEntityState(this, this.onPublishEntityState); @@ -68,6 +70,20 @@ export default class Receive extends Extension { this.debouncers[device.ieeeAddr].publish(); } + publishThrottle(device: Device, payload: KeyValue, time: number): void { + if (!this.throttlers[device.ieeeAddr]) { + this.throttlers[device.ieeeAddr] = { + publish: throttle(this.publishEntityState, time * 1000), + }; + } + + // Update state cache right away. This makes sure that during throttling cached state is always up to date. + // By updating cache we make sure that state cache is always up-to-date. + this.state.set(device, payload); + + this.throttlers[device.ieeeAddr].publish(device, payload, 'publishThrottle'); + } + // if debounce_ignore are specified (Array of strings) // then all newPayload values with key present in debounce_ignore // should equal or be undefined in oldPayload @@ -121,50 +137,21 @@ export default class Receive extends Extension { const options: KeyValue = data.device.options; zhc.postProcessConvertedFromZigbeeMessage(data.device.definition, payload, options); - const checkElapsedTime = - data.device.options.min_elapsed || (data.device.options.description && data.device.options.description.includes('SPAMMER')); - - if (settings.get().advanced.elapsed || checkElapsedTime) { + if (settings.get().advanced.elapsed) { const now = Date.now(); if (this.elapsed[data.device.ieeeAddr]) { payload.elapsed = now - this.elapsed[data.device.ieeeAddr]; - - // very simple and dirty anti-spamming https://github.com/Koenkk/zigbee2mqtt/issues/17984 - // as a proof of concept maybe Koenkk can find a better solution as the debounce does not help for my SPAMMER devices - // ambient sensor and water level that sometimes send mupliple messages on same second - // this will not help on zigbee network, but at least help on mqtt and homeassistant recorder and history - // this will not work for devices that have actions and specific events that are important - // this will only DISCARD messages that came to fast from device - // it solves the SPAMMING on sensor devices that does not change values too fast and messages can be ignored - // I dont know all the side effects of this code, but here is the ones that I found already - // - on web ui, the last-seen is only updated after a non ignored message - // - web ui are more responsive than before - // - my homeassistant does not have a lot of data from this devices that are not need - // - my homeassistant became more responsive - // - the CPU load are sensible lower - // using "SPAMMER" in description is an easy way to test without changing options on yaml - if (checkElapsedTime) { - let min_elapsed = 30000; - if (data.device.options.min_elapsed) { - min_elapsed = data.device.options.min_elapsed; - } - - if (payload.elapsed < min_elapsed) { - logger.debug( - `Ignoring message from SPAMMER - ${data.device.ieeeAddr} - ${data.device.options.friendly_name} - elapsed=${payload.elapsed} - min_elapsed=${min_elapsed}`, - ); - return; - } - } - // end of changes } this.elapsed[data.device.ieeeAddr] = now; } - // Check if we have to debounce + // Check if we have to debounce or throttle if (data.device.options.debounce) { this.publishDebounce(data.device, payload, data.device.options.debounce, data.device.options.debounce_ignore); + } else if (data.device.options.throttle || (data.device.options.description && data.device.options.description.includes('SPAMMER'))) { + let throttleTime = data.device.options.throttle || 30; + this.publishThrottle(data.device, payload, throttleTime); } else { await this.publishEntityState(data.device, payload); } diff --git a/lib/types/types.d.ts b/lib/types/types.d.ts index fa87d4ddb8..28cc9c5813 100644 --- a/lib/types/types.d.ts +++ b/lib/types/types.d.ts @@ -232,7 +232,7 @@ declare global { retrieve_state?: boolean; debounce?: number; debounce_ignore?: string[]; - min_elapsed?: number; + throttle?: number; filtered_attributes?: string[]; filtered_cache?: string[]; filtered_optimistic?: string[]; diff --git a/package-lock.json b/package-lock.json index 507a1a11eb..cfa77b6b2d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -27,6 +27,7 @@ "rimraf": "^6.0.1", "semver": "^7.6.3", "source-map-support": "^0.5.21", + "throttleit": "^2.1.0", "uri-js": "^4.4.1", "winston": "^3.14.2", "winston-syslog": "^2.7.1", @@ -9700,6 +9701,18 @@ "dev": true, "license": "MIT" }, + "node_modules/throttleit": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/throttleit/-/throttleit-2.1.0.tgz", + "integrity": "sha512-nt6AMGKW1p/70DF/hGBdJB57B8Tspmbp5gfJ8ilhLnt7kkr2ye7hzD6NVG8GGErk2HWF34igrL2CXmNIkzKqKw==", + "license": "MIT", + "engines": { + "node": ">=18" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/thunky": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/thunky/-/thunky-1.1.0.tgz", diff --git a/package.json b/package.json index 9671bf8334..296f544d71 100644 --- a/package.json +++ b/package.json @@ -55,6 +55,7 @@ "rimraf": "^6.0.1", "semver": "^7.6.3", "source-map-support": "^0.5.21", + "throttleit": "^2.1.0", "uri-js": "^4.4.1", "winston": "^3.14.2", "winston-syslog": "^2.7.1", diff --git a/test/receive.test.js b/test/receive.test.js index ead3bf1f0c..522cde95bc 100755 --- a/test/receive.test.js +++ b/test/receive.test.js @@ -357,8 +357,8 @@ describe('Receive', () => { it('Should ignore multiple messages from spamming devices', async () => { const device = zigbeeHerdsman.devices.SPAMMER1; - const min_elapsed_for_testing = 500; - settings.set(['device_options', 'min_elapsed'], min_elapsed_for_testing); + const throttle_for_testing = 1; + settings.set(['device_options', 'throttle'], throttle_for_testing); settings.set(['device_options', 'retain'], true); settings.set(['devices', device.ieeeAddr, 'friendly_name'], 'spammer1'); const data1 = {measuredValue: 1}; @@ -400,20 +400,37 @@ describe('Receive', () => { expect(JSON.parse(MQTT.publish.mock.calls[0][1])).toStrictEqual({temperature: 0.01}); expect(MQTT.publish.mock.calls[0][2]).toStrictEqual({qos: 0, retain: true}); - // Now we try after elapsed time to see if it publishes - const timeshift = min_elapsed_for_testing + 500; + // Now we try after elapsed time to see if it publishes next message + const timeshift = throttle_for_testing * 2000; jest.advanceTimersByTime(timeshift); - await zigbeeHerdsman.events.message(payload3); - await flushPromises(); expect(MQTT.publish).toHaveBeenCalledTimes(2); + await flushPromises(); + expect(MQTT.publish.mock.calls[1][0]).toStrictEqual('zigbee2mqtt/spammer1'); - expect(JSON.parse(MQTT.publish.mock.calls[1][1])).toStrictEqual({elapsed: timeshift, temperature: 0.03}); + expect(JSON.parse(MQTT.publish.mock.calls[1][1])).toStrictEqual({temperature: 0.03}); expect(MQTT.publish.mock.calls[1][2]).toStrictEqual({qos: 0, retain: true}); + + const data4 = {measuredValue: 4}; + const payload4 = { + data: data4, + cluster: 'msTemperatureMeasurement', + device, + endpoint: device.getEndpoint(1), + type: 'attributeReport', + linkquality: 10, + }; + await zigbeeHerdsman.events.message(payload4); + await flushPromises(); + + expect(MQTT.publish).toHaveBeenCalledTimes(3); + expect(MQTT.publish.mock.calls[2][0]).toStrictEqual('zigbee2mqtt/spammer1'); + expect(JSON.parse(MQTT.publish.mock.calls[2][1])).toStrictEqual({temperature: 0.04}); + expect(MQTT.publish.mock.calls[2][2]).toStrictEqual({qos: 0, retain: true}); }); it('Should ignore multiple messages from spamming devices defined by description', async () => { const device = zigbeeHerdsman.devices.SPAMMER2; - const min_elapsed_for_testing = 50000; + const throttle_for_testing = 50; settings.set(['device_options', 'retain'], true); settings.set(['devices', device.ieeeAddr, 'description'], 'this is a SPAMMER device'); settings.set(['devices', device.ieeeAddr, 'friendly_name'], 'spammer2'); @@ -451,20 +468,37 @@ describe('Receive', () => { expect(MQTT.publish).toHaveBeenCalledTimes(1); await flushPromises(); - expect(MQTT.publish).toHaveBeenCalledTimes(1); + jest.advanceTimersByTime(throttle_for_testing * 1000); + expect(MQTT.publish).toHaveBeenCalledTimes(2); + expect(MQTT.publish.mock.calls[0][0]).toStrictEqual('zigbee2mqtt/spammer2'); expect(JSON.parse(MQTT.publish.mock.calls[0][1])).toStrictEqual({temperature: 0.01}); expect(MQTT.publish.mock.calls[0][2]).toStrictEqual({qos: 0, retain: true}); - // Now we try after elapsed time to see if it publishes - const timeshift = min_elapsed_for_testing + 500; - jest.advanceTimersByTime(timeshift); - await zigbeeHerdsman.events.message(payload3); - await flushPromises(); - expect(MQTT.publish).toHaveBeenCalledTimes(2); expect(MQTT.publish.mock.calls[1][0]).toStrictEqual('zigbee2mqtt/spammer2'); - expect(JSON.parse(MQTT.publish.mock.calls[1][1])).toStrictEqual({elapsed: timeshift, temperature: 0.03}); + expect(JSON.parse(MQTT.publish.mock.calls[1][1])).toStrictEqual({temperature: 0.03}); expect(MQTT.publish.mock.calls[1][2]).toStrictEqual({qos: 0, retain: true}); + + // Now we try again after elapsed time to see if it publishes + const timeshift = throttle_for_testing * 2000; + jest.advanceTimersByTime(timeshift); + + const data4 = {measuredValue: 4}; + const payload4 = { + data: data4, + cluster: 'msTemperatureMeasurement', + device, + endpoint: device.getEndpoint(1), + type: 'attributeReport', + linkquality: 10, + }; + await zigbeeHerdsman.events.message(payload4); + await flushPromises(); + + expect(MQTT.publish).toHaveBeenCalledTimes(3); + expect(MQTT.publish.mock.calls[2][0]).toStrictEqual('zigbee2mqtt/spammer2'); + expect(JSON.parse(MQTT.publish.mock.calls[2][1])).toStrictEqual({temperature: 0.04}); + expect(MQTT.publish.mock.calls[2][2]).toStrictEqual({qos: 0, retain: true}); }); it('Shouldnt republish old state', async () => {