Skip to content

Commit

Permalink
trying now with throttleit library
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanfmartinez committed Sep 29, 2024
1 parent d678819 commit 8f07c7f
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 51 deletions.
55 changes: 21 additions & 34 deletions lib/extension/receive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import assert from 'assert';
import bind from 'bind-decorator';
import debounce from 'debounce';
import stringify from 'json-stable-stringify-without-jsonify';
import throttle from 'throttleit';

import * as zhc from 'zigbee-herdsman-converters';

Expand All @@ -16,6 +17,7 @@ type DebounceFunction = (() => void) & {clear(): void} & {flush(): void};
export default class Receive extends Extension {
private elapsed: {[s: string]: number} = {};
private debouncers: {[s: string]: {payload: KeyValue; publish: DebounceFunction}} = {};
private throttlers: {[s: string]: {publish: Function}} = {};

async start(): Promise<void> {
this.eventBus.onPublishEntityState(this, this.onPublishEntityState);
Expand Down Expand Up @@ -68,6 +70,20 @@ export default class Receive extends Extension {
this.debouncers[device.ieeeAddr].publish();
}

publishThrottle(device: Device, payload: KeyValue, time: number): void {
if (!this.throttlers[device.ieeeAddr]) {
this.throttlers[device.ieeeAddr] = {
publish: throttle(this.publishEntityState, time * 1000),
};
}

// Update state cache right away. This makes sure that during throttling cached state is always up to date.
// By updating cache we make sure that state cache is always up-to-date.
this.state.set(device, payload);

this.throttlers[device.ieeeAddr].publish(device, payload, 'publishThrottle');
}

// if debounce_ignore are specified (Array of strings)
// then all newPayload values with key present in debounce_ignore
// should equal or be undefined in oldPayload
Expand Down Expand Up @@ -121,50 +137,21 @@ export default class Receive extends Extension {
const options: KeyValue = data.device.options;
zhc.postProcessConvertedFromZigbeeMessage(data.device.definition, payload, options);

const checkElapsedTime =
data.device.options.min_elapsed || (data.device.options.description && data.device.options.description.includes('SPAMMER'));

if (settings.get().advanced.elapsed || checkElapsedTime) {
if (settings.get().advanced.elapsed) {
const now = Date.now();
if (this.elapsed[data.device.ieeeAddr]) {
payload.elapsed = now - this.elapsed[data.device.ieeeAddr];

// very simple and dirty anti-spamming https://github.com/Koenkk/zigbee2mqtt/issues/17984
// as a proof of concept maybe Koenkk can find a better solution as the debounce does not help for my SPAMMER devices
// ambient sensor and water level that sometimes send mupliple messages on same second
// this will not help on zigbee network, but at least help on mqtt and homeassistant recorder and history
// this will not work for devices that have actions and specific events that are important
// this will only DISCARD messages that came to fast from device
// it solves the SPAMMING on sensor devices that does not change values too fast and messages can be ignored
// I dont know all the side effects of this code, but here is the ones that I found already
// - on web ui, the last-seen is only updated after a non ignored message
// - web ui are more responsive than before
// - my homeassistant does not have a lot of data from this devices that are not need
// - my homeassistant became more responsive
// - the CPU load are sensible lower
// using "SPAMMER" in description is an easy way to test without changing options on yaml
if (checkElapsedTime) {
let min_elapsed = 30000;
if (data.device.options.min_elapsed) {
min_elapsed = data.device.options.min_elapsed;
}

if (payload.elapsed < min_elapsed) {
logger.debug(
`Ignoring message from SPAMMER - ${data.device.ieeeAddr} - ${data.device.options.friendly_name} - elapsed=${payload.elapsed} - min_elapsed=${min_elapsed}`,
);
return;
}
}
// end of changes
}

this.elapsed[data.device.ieeeAddr] = now;
}

// Check if we have to debounce
// Check if we have to debounce or throttle
if (data.device.options.debounce) {
this.publishDebounce(data.device, payload, data.device.options.debounce, data.device.options.debounce_ignore);
} else if (data.device.options.throttle || (data.device.options.description && data.device.options.description.includes('SPAMMER'))) {
let throttleTime = data.device.options.throttle || 30;
this.publishThrottle(data.device, payload, throttleTime);
} else {
await this.publishEntityState(data.device, payload);
}
Expand Down
2 changes: 1 addition & 1 deletion lib/types/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ declare global {
retrieve_state?: boolean;
debounce?: number;
debounce_ignore?: string[];
min_elapsed?: number;
throttle?: number;
filtered_attributes?: string[];
filtered_cache?: string[];
filtered_optimistic?: string[];
Expand Down
13 changes: 13 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
"rimraf": "^6.0.1",
"semver": "^7.6.3",
"source-map-support": "^0.5.21",
"throttleit": "^2.1.0",
"uri-js": "^4.4.1",
"winston": "^3.14.2",
"winston-syslog": "^2.7.1",
Expand Down
66 changes: 50 additions & 16 deletions test/receive.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,8 @@ describe('Receive', () => {

it('Should ignore multiple messages from spamming devices', async () => {
const device = zigbeeHerdsman.devices.SPAMMER1;
const min_elapsed_for_testing = 500;
settings.set(['device_options', 'min_elapsed'], min_elapsed_for_testing);
const throttle_for_testing = 1;
settings.set(['device_options', 'throttle'], throttle_for_testing);
settings.set(['device_options', 'retain'], true);
settings.set(['devices', device.ieeeAddr, 'friendly_name'], 'spammer1');
const data1 = {measuredValue: 1};
Expand Down Expand Up @@ -400,20 +400,37 @@ describe('Receive', () => {
expect(JSON.parse(MQTT.publish.mock.calls[0][1])).toStrictEqual({temperature: 0.01});
expect(MQTT.publish.mock.calls[0][2]).toStrictEqual({qos: 0, retain: true});

// Now we try after elapsed time to see if it publishes
const timeshift = min_elapsed_for_testing + 500;
// Now we try after elapsed time to see if it publishes next message
const timeshift = throttle_for_testing * 2000;
jest.advanceTimersByTime(timeshift);
await zigbeeHerdsman.events.message(payload3);
await flushPromises();
expect(MQTT.publish).toHaveBeenCalledTimes(2);
await flushPromises();

expect(MQTT.publish.mock.calls[1][0]).toStrictEqual('zigbee2mqtt/spammer1');
expect(JSON.parse(MQTT.publish.mock.calls[1][1])).toStrictEqual({elapsed: timeshift, temperature: 0.03});
expect(JSON.parse(MQTT.publish.mock.calls[1][1])).toStrictEqual({temperature: 0.03});
expect(MQTT.publish.mock.calls[1][2]).toStrictEqual({qos: 0, retain: true});

const data4 = {measuredValue: 4};
const payload4 = {
data: data4,
cluster: 'msTemperatureMeasurement',
device,
endpoint: device.getEndpoint(1),
type: 'attributeReport',
linkquality: 10,
};
await zigbeeHerdsman.events.message(payload4);
await flushPromises();

expect(MQTT.publish).toHaveBeenCalledTimes(3);
expect(MQTT.publish.mock.calls[2][0]).toStrictEqual('zigbee2mqtt/spammer1');
expect(JSON.parse(MQTT.publish.mock.calls[2][1])).toStrictEqual({temperature: 0.04});
expect(MQTT.publish.mock.calls[2][2]).toStrictEqual({qos: 0, retain: true});
});

it('Should ignore multiple messages from spamming devices defined by description', async () => {
const device = zigbeeHerdsman.devices.SPAMMER2;
const min_elapsed_for_testing = 50000;
const throttle_for_testing = 50;
settings.set(['device_options', 'retain'], true);
settings.set(['devices', device.ieeeAddr, 'description'], 'this is a SPAMMER device');
settings.set(['devices', device.ieeeAddr, 'friendly_name'], 'spammer2');
Expand Down Expand Up @@ -451,20 +468,37 @@ describe('Receive', () => {

expect(MQTT.publish).toHaveBeenCalledTimes(1);
await flushPromises();
expect(MQTT.publish).toHaveBeenCalledTimes(1);
jest.advanceTimersByTime(throttle_for_testing * 1000);
expect(MQTT.publish).toHaveBeenCalledTimes(2);

expect(MQTT.publish.mock.calls[0][0]).toStrictEqual('zigbee2mqtt/spammer2');
expect(JSON.parse(MQTT.publish.mock.calls[0][1])).toStrictEqual({temperature: 0.01});
expect(MQTT.publish.mock.calls[0][2]).toStrictEqual({qos: 0, retain: true});

// Now we try after elapsed time to see if it publishes
const timeshift = min_elapsed_for_testing + 500;
jest.advanceTimersByTime(timeshift);
await zigbeeHerdsman.events.message(payload3);
await flushPromises();
expect(MQTT.publish).toHaveBeenCalledTimes(2);
expect(MQTT.publish.mock.calls[1][0]).toStrictEqual('zigbee2mqtt/spammer2');
expect(JSON.parse(MQTT.publish.mock.calls[1][1])).toStrictEqual({elapsed: timeshift, temperature: 0.03});
expect(JSON.parse(MQTT.publish.mock.calls[1][1])).toStrictEqual({temperature: 0.03});
expect(MQTT.publish.mock.calls[1][2]).toStrictEqual({qos: 0, retain: true});

// Now we try again after elapsed time to see if it publishes
const timeshift = throttle_for_testing * 2000;
jest.advanceTimersByTime(timeshift);

const data4 = {measuredValue: 4};
const payload4 = {
data: data4,
cluster: 'msTemperatureMeasurement',
device,
endpoint: device.getEndpoint(1),
type: 'attributeReport',
linkquality: 10,
};
await zigbeeHerdsman.events.message(payload4);
await flushPromises();

expect(MQTT.publish).toHaveBeenCalledTimes(3);
expect(MQTT.publish.mock.calls[2][0]).toStrictEqual('zigbee2mqtt/spammer2');
expect(JSON.parse(MQTT.publish.mock.calls[2][1])).toStrictEqual({temperature: 0.04});
expect(MQTT.publish.mock.calls[2][2]).toStrictEqual({qos: 0, retain: true});
});

it('Shouldnt republish old state', async () => {
Expand Down

0 comments on commit 8f07c7f

Please sign in to comment.