Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

basic spam control / throttle #24122

Merged
merged 13 commits into from
Oct 2, 2024
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ Zigbee2MQTT is made up of three modules, each developed in its own Github projec
### Developing

Zigbee2MQTT uses TypeScript (partially for now). Therefore after making changes to files in the `lib/` directory you need to recompile Zigbee2MQTT. This can be done by executing `npm run build`. For faster development instead of running `npm run build` you can run `npm run build-watch` in another terminal session, this will recompile as you change files.
In first time before building you need to run `npm install --include=dev`
Before submitting changes run `npm run test-with-coverage`, `npm run pretty:check` and `npm run eslint`

## Supported devices

Expand Down
20 changes: 19 additions & 1 deletion lib/extension/receive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import assert from 'assert';
import bind from 'bind-decorator';
import debounce from 'debounce';
import stringify from 'json-stable-stringify-without-jsonify';
import throttle from 'throttleit';

import * as zhc from 'zigbee-herdsman-converters';

Expand All @@ -16,6 +17,7 @@ type DebounceFunction = (() => void) & {clear(): void} & {flush(): void};
export default class Receive extends Extension {
private elapsed: {[s: string]: number} = {};
private debouncers: {[s: string]: {payload: KeyValue; publish: DebounceFunction}} = {};
private throttlers: {[s: string]: {publish: PublishEntityState}} = {};

async start(): Promise<void> {
this.eventBus.onPublishEntityState(this, this.onPublishEntityState);
Expand Down Expand Up @@ -68,6 +70,20 @@ export default class Receive extends Extension {
this.debouncers[device.ieeeAddr].publish();
}

async publishThrottle(device: Device, payload: KeyValue, time: number): Promise<void> {
if (!this.throttlers[device.ieeeAddr]) {
this.throttlers[device.ieeeAddr] = {
publish: throttle(this.publishEntityState, time * 1000),
};
}

// Update state cache right away. This makes sure that during throttling cached state is always up to date.
// By updating cache we make sure that state cache is always up-to-date.
this.state.set(device, payload);

await this.throttlers[device.ieeeAddr].publish(device, payload, 'publishThrottle');
}

// if debounce_ignore are specified (Array of strings)
// then all newPayload values with key present in debounce_ignore
// should equal or be undefined in oldPayload
Expand Down Expand Up @@ -130,9 +146,11 @@ export default class Receive extends Extension {
this.elapsed[data.device.ieeeAddr] = now;
}

// Check if we have to debounce
// Check if we have to debounce or throttle
if (data.device.options.debounce) {
Koenkk marked this conversation as resolved.
Show resolved Hide resolved
this.publishDebounce(data.device, payload, data.device.options.debounce, data.device.options.debounce_ignore);
} else if (data.device.options.throttle) {
await this.publishThrottle(data.device, payload, data.device.options.throttle);
} else {
await this.publishEntityState(data.device, payload);
}
Expand Down
3 changes: 2 additions & 1 deletion lib/types/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ declare global {
properties?: {messageExpiryInterval: number};
}
type Scene = {id: number; name: string};
type StateChangeReason = 'publishDebounce' | 'groupOptimistic' | 'lastSeenChanged' | 'publishCached';
type StateChangeReason = 'publishDebounce' | 'groupOptimistic' | 'lastSeenChanged' | 'publishCached' | 'publishThrottle';
type PublishEntityState = (entity: Device | Group, payload: KeyValue, stateChangeReason?: StateChangeReason) => Promise<void>;
type RecursivePartial<T> = {[P in keyof T]?: RecursivePartial<T[P]>};
interface KeyValue {
Expand Down Expand Up @@ -232,6 +232,7 @@ declare global {
retrieve_state?: boolean;
debounce?: number;
debounce_ignore?: string[];
throttle?: number;
filtered_attributes?: string[];
filtered_cache?: string[];
filtered_optimistic?: string[];
Expand Down
13 changes: 13 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
"rimraf": "^6.0.1",
"semver": "^7.6.3",
"source-map-support": "^0.5.21",
"throttleit": "^2.1.0",
"uri-js": "^4.4.1",
"winston": "^3.14.2",
"winston-syslog": "^2.7.1",
Expand Down
73 changes: 73 additions & 0 deletions test/receive.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,79 @@ describe('Receive', () => {
expect(JSON.parse(MQTT.publish.mock.calls[2][1])).toStrictEqual({temperature: 0.09, humidity: 0.01, pressure: 2});
});

it('Should throttle multiple messages from spamming devices', async () => {
const device = zigbeeHerdsman.devices.SPAMMER;
const throttle_for_testing = 1;
settings.set(['device_options', 'throttle'], throttle_for_testing);
settings.set(['device_options', 'retain'], true);
settings.set(['devices', device.ieeeAddr, 'friendly_name'], 'spammer1');
const data1 = {measuredValue: 1};
const payload1 = {
data: data1,
cluster: 'msTemperatureMeasurement',
device,
endpoint: device.getEndpoint(1),
type: 'attributeReport',
linkquality: 10,
};
await zigbeeHerdsman.events.message(payload1);
const data2 = {measuredValue: 2};
const payload2 = {
data: data2,
cluster: 'msTemperatureMeasurement',
device,
endpoint: device.getEndpoint(1),
type: 'attributeReport',
linkquality: 10,
};
await zigbeeHerdsman.events.message(payload2);
const data3 = {measuredValue: 3};
const payload3 = {
data: data3,
cluster: 'msTemperatureMeasurement',
device,
endpoint: device.getEndpoint(1),
type: 'attributeReport',
linkquality: 10,
};
await zigbeeHerdsman.events.message(payload3);
await flushPromises();

expect(MQTT.publish).toHaveBeenCalledTimes(1);
await flushPromises();
expect(MQTT.publish).toHaveBeenCalledTimes(1);
expect(MQTT.publish.mock.calls[0][0]).toStrictEqual('zigbee2mqtt/spammer1');
expect(JSON.parse(MQTT.publish.mock.calls[0][1])).toStrictEqual({temperature: 0.01});
expect(MQTT.publish.mock.calls[0][2]).toStrictEqual({qos: 0, retain: true});

// Now we try after elapsed time to see if it publishes next message
const timeshift = throttle_for_testing * 2000;
jest.advanceTimersByTime(timeshift);
expect(MQTT.publish).toHaveBeenCalledTimes(2);
await flushPromises();

expect(MQTT.publish.mock.calls[1][0]).toStrictEqual('zigbee2mqtt/spammer1');
expect(JSON.parse(MQTT.publish.mock.calls[1][1])).toStrictEqual({temperature: 0.03});
expect(MQTT.publish.mock.calls[1][2]).toStrictEqual({qos: 0, retain: true});

const data4 = {measuredValue: 4};
const payload4 = {
data: data4,
cluster: 'msTemperatureMeasurement',
device,
endpoint: device.getEndpoint(1),
type: 'attributeReport',
linkquality: 10,
};
await zigbeeHerdsman.events.message(payload4);
await flushPromises();

expect(MQTT.publish).toHaveBeenCalledTimes(3);
expect(MQTT.publish.mock.calls[2][0]).toStrictEqual('zigbee2mqtt/spammer1');
expect(JSON.parse(MQTT.publish.mock.calls[2][1])).toStrictEqual({temperature: 0.04});
expect(MQTT.publish.mock.calls[2][2]).toStrictEqual({qos: 0, retain: true});
});

it('Shouldnt republish old state', async () => {
// https://github.com/Koenkk/zigbee2mqtt/issues/3572
const device = zigbeeHerdsman.devices.bulb;
Expand Down
2 changes: 2 additions & 0 deletions test/stub/zigbeeHerdsman.js
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,8 @@ const devices = {
'lumi.sensor_86sw2.es1',
),
WSDCGQ11LM: new Device('EndDevice', '0x0017880104e45522', 6539, 4151, [new Endpoint(1, [0], [])], true, 'Battery', 'lumi.weather'),
// This are not a real spammer device, just copy of previous to test the throttle filter
SPAMMER: new Device('EndDevice', '0x0017880104e455fe', 6539, 4151, [new Endpoint(1, [0], [])], true, 'Battery', 'lumi.weather'),
RTCGQ11LM: new Device('EndDevice', '0x0017880104e45523', 6540, 4151, [new Endpoint(1, [0], [])], true, 'Battery', 'lumi.sensor_motion.aq2'),
ZNCZ02LM: ZNCZ02LM,
E1743: new Device('Router', '0x0017880104e45540', 6540, 4476, [new Endpoint(1, [0], [])], true, 'Mains (single phase)', 'TRADFRI on/off switch'),
Expand Down