Skip to content
76 changes: 76 additions & 0 deletions tests/test_binary_sensor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Test zhaws binary sensor."""

import asyncio
from collections.abc import Awaitable, Callable
from unittest.mock import MagicMock, call

Expand All @@ -20,8 +21,10 @@
find_entity,
get_entity,
join_zigpy_device,
make_zcl_header,
send_attributes_report,
update_attribute_cache,
zigpy_device_from_json,
)
from zha.application import Platform
from zha.application.gateway import Gateway
Expand Down Expand Up @@ -287,3 +290,76 @@ async def test_quirks_binary_sensor_attr_converter(zha_gateway: Gateway) -> None

await send_attributes_report(zha_gateway, cluster, {"on_off": 0})
assert entity.is_on is True


async def test_onoff_client_binary_sensor_on_with_timed_off(
zha_gateway: Gateway,
) -> None:
"""Test binary sensor with client OnOff cluster handles on_with_timed_off.

This tests motion sensors that use output/client OnOff clusters and send
on_with_timed_off commands when motion is detected.
"""
zigpy_device = await zigpy_device_from_json(
zha_gateway.application_controller,
"tests/data/devices/ikea-of-sweden-tradfri-motion-sensor.json",
)

zha_device = await join_zigpy_device(zha_gateway, zigpy_device)
entity = find_entity(zha_device, Platform.BINARY_SENSOR)
assert entity is not None
assert isinstance(entity, BinarySensor)

# Initial state should be off
assert entity.is_on is False

# Get the client/output cluster
cluster = zigpy_device.endpoints[1].out_clusters[OnOff.cluster_id]

# Simulate motion sensor sending on_with_timed_off command
# on_off_control=0 means always accept, on_time=1800 (180 seconds in 10ths)
hdr = make_zcl_header(
OnOff.ServerCommandDefs.on_with_timed_off.id, global_command=False
)
cluster.handle_message(hdr, [0, 1800, 0])
await zha_gateway.async_block_till_done()

# Binary sensor should now be on
assert entity.is_on is True

# Send another on_with_timed_off while timer is active (covers timer cancel logic)
hdr = make_zcl_header(
OnOff.ServerCommandDefs.on_with_timed_off.id, global_command=False
)
cluster.handle_message(hdr, [0, 500, 0]) # 50 seconds
await zha_gateway.async_block_till_done()
assert entity.is_on is True

# Advance time past the new timeout (50 seconds)
await asyncio.sleep(60)
await zha_gateway.async_block_till_done()

# Binary sensor should now be off
assert entity.is_on is False

# Test toggle command
hdr = make_zcl_header(OnOff.ServerCommandDefs.toggle.id, global_command=False)
cluster.handle_message(hdr, [])
await zha_gateway.async_block_till_done()
assert entity.is_on is True

hdr = make_zcl_header(OnOff.ServerCommandDefs.toggle.id, global_command=False)
cluster.handle_message(hdr, [])
await zha_gateway.async_block_till_done()
assert entity.is_on is False

# Test on/off command
hdr = make_zcl_header(OnOff.ServerCommandDefs.on.id, global_command=False)
cluster.handle_message(hdr, [])
await zha_gateway.async_block_till_done()
assert entity.is_on is True

hdr = make_zcl_header(OnOff.ServerCommandDefs.off.id, global_command=False)
cluster.handle_message(hdr, [])
await zha_gateway.async_block_till_done()
assert entity.is_on is False
15 changes: 13 additions & 2 deletions tests/test_cluster_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1241,8 +1241,19 @@ async def test_zha_send_event_from_quirk(zha_gateway: Gateway):

on_off_ch.cluster_command(1, OnOff.ServerCommandDefs.on.id, [])

assert on_off_ch.emit_zha_event.call_count == 1
assert on_off_ch.emit_zha_event.mock_calls == [call("on", [])]
assert on_off_ch.emit_zha_event.call_count == 2
assert on_off_ch.emit_zha_event.mock_calls == [
call("on", []),
call(
"attribute_updated",
{
"attribute_id": 0,
"attribute_name": "on_off",
"attribute_value": t.Bool.true,
"value": t.Bool.true,
},
),
]
on_off_ch.emit_zha_event.reset_mock()

await send_attributes_report(
Expand Down
94 changes: 53 additions & 41 deletions zha/zigbee/cluster_handlers/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,59 @@ class MultistateValueClusterHandler(ClusterHandler):
class OnOffClientClusterHandler(ClientClusterHandler):
"""OnOff client cluster handler."""

def __init__(self, cluster: zigpy.zcl.Cluster, endpoint: Endpoint) -> None:
"""Initialize OnOffClientClusterHandler."""
super().__init__(cluster, endpoint)
self._off_listener: asyncio.TimerHandle | None = None

@property
def on_off(self) -> bool | None:
"""Return cached value of on/off attribute."""
return self.cluster.get(OnOff.AttributeDefs.on_off.name)

def cluster_command(self, tsn, command_id, args):
"""Handle commands received to this cluster."""
# for emitting ZHA event
super().cluster_command(tsn, command_id, args)

cmd = parse_and_log_command(self, tsn, command_id, args)

if cmd in (
OnOff.ServerCommandDefs.off.name,
OnOff.ServerCommandDefs.off_with_effect.name,
):
self.cluster.update_attribute(OnOff.AttributeDefs.on_off.id, t.Bool.false)
elif cmd in (
OnOff.ServerCommandDefs.on.name,
OnOff.ServerCommandDefs.on_with_recall_global_scene.name,
):
self.cluster.update_attribute(OnOff.AttributeDefs.on_off.id, t.Bool.true)
elif cmd == OnOff.ServerCommandDefs.on_with_timed_off.name:
should_accept = args[0]
on_time = args[1]
# 0 is always accept 1 is only accept when already on
if should_accept == 0 or (should_accept == 1 and bool(self.on_off)):
if self._off_listener is not None:
self._off_listener.cancel()
self._off_listener = None
self.cluster.update_attribute(
OnOff.AttributeDefs.on_off.id, t.Bool.true
)
if on_time > 0:
self._off_listener = asyncio.get_running_loop().call_later(
(on_time / 10), # value is in 10ths of a second
self.set_to_off,
)
elif cmd == "toggle":
self.cluster.update_attribute(
OnOff.AttributeDefs.on_off.id, not bool(self.on_off)
)

def set_to_off(self, *_):
"""Set the state to off."""
self._off_listener = None
self.cluster.update_attribute(OnOff.AttributeDefs.on_off.id, t.Bool.false)


@registries.BINDABLE_CLUSTERS.register(OnOff.cluster_id)
@registries.CLUSTER_HANDLER_REGISTRY.register(OnOff.cluster_id)
Expand All @@ -558,7 +611,6 @@ class OnOffClusterHandler(ClusterHandler):
def __init__(self, cluster: zigpy.zcl.Cluster, endpoint: Endpoint) -> None:
"""Initialize OnOffClusterHandler."""
super().__init__(cluster, endpoint)
self._off_listener: asyncio.TimerHandle | None = None

if TUYA_PLUG_ONOFF in endpoint.device.exposes_features:
self.ZCL_INIT_ATTRS = self.ZCL_INIT_ATTRS.copy()
Expand Down Expand Up @@ -594,46 +646,6 @@ async def turn_off(self) -> None:
raise ZHAException(f"Failed to turn off: {result[1]}")
self.cluster.update_attribute(OnOff.AttributeDefs.on_off.id, t.Bool.false)

def cluster_command(self, tsn, command_id, args):
"""Handle commands received to this cluster."""
cmd = parse_and_log_command(self, tsn, command_id, args)

if cmd in (
OnOff.ServerCommandDefs.off.name,
OnOff.ServerCommandDefs.off_with_effect.name,
):
self.cluster.update_attribute(OnOff.AttributeDefs.on_off.id, t.Bool.false)
elif cmd in (
OnOff.ServerCommandDefs.on.name,
OnOff.ServerCommandDefs.on_with_recall_global_scene.name,
):
self.cluster.update_attribute(OnOff.AttributeDefs.on_off.id, t.Bool.true)
elif cmd == OnOff.ServerCommandDefs.on_with_timed_off.name:
should_accept = args[0]
on_time = args[1]
# 0 is always accept 1 is only accept when already on
if should_accept == 0 or (should_accept == 1 and bool(self.on_off)):
if self._off_listener is not None:
self._off_listener.cancel()
self._off_listener = None
self.cluster.update_attribute(
OnOff.AttributeDefs.on_off.id, t.Bool.true
)
if on_time > 0:
self._off_listener = asyncio.get_running_loop().call_later(
(on_time / 10), # value is in 10ths of a second
self.set_to_off,
)
elif cmd == "toggle":
self.cluster.update_attribute(
OnOff.AttributeDefs.on_off.id, not bool(self.on_off)
)

def set_to_off(self, *_):
"""Set the state to off."""
self._off_listener = None
self.cluster.update_attribute(OnOff.AttributeDefs.on_off.id, t.Bool.false)
Comment on lines -597 to -635
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed the entire cluster_command handling for the server OnOff cluster handler. Unfortunately, it was never covered by tests, hence we didn't catch this.

I don't think it can be used at all for the server handler, so should be fine to remove.


async def async_update(self):
"""Initialize cluster handler."""
if self.cluster.is_client:
Expand Down
Loading