-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy path__init__.py
165 lines (130 loc) · 5.89 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import asyncio
import fnmatch
import logging
from collections.abc import Mapping
from typing import Any
import aiohttp
import homeassistant.helpers.entity_registry as er
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
from homeassistant.core import Event, EventStateChangedData, HomeAssistant, State, callback
from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.helpers.start import async_at_started
from .const import (
CONF_ENTITY_DOMAIN,
CONF_ENTITY_ID,
CONF_ENTITY_ID_GLOB,
CONF_ENTITY_LABELS,
CONF_FILTER_MODE,
CONF_PAYLOAD_ATTRIBUTES,
CONF_PAYLOAD_OLD_STATE,
CONF_RETRY_LIMIT,
CONF_WEBHOOK_AUTH_HEADER,
CONF_WEBHOOK_HEADERS,
CONF_WEBHOOK_URL,
DEFAULT_RETRY_LIMIT,
FilterMode,
)
_LOGGER = logging.getLogger(__name__)
RETRY_DELAY = 5
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def _register_webhook(_: Any) -> None: # noqa ANN401
await register_webhook(hass, entry)
async_at_started(hass, _register_webhook)
return True
async def register_webhook(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Register webhook for state changes."""
entities_to_track = await resolve_tracking_entities(hass, entry)
if not entities_to_track:
_LOGGER.warning("No entities found to track")
return
webhook_url = str(entry.options.get(CONF_WEBHOOK_URL))
headers = prepare_headers(entry.options)
retry_limit = int(entry.options.get(CONF_RETRY_LIMIT, DEFAULT_RETRY_LIMIT))
_LOGGER.debug("Start webhook tracking using URL: %s", webhook_url)
_LOGGER.debug("Tracking the following entities: %s", entities_to_track)
# Create a single http session for reuse
session = aiohttp.ClientSession()
async def cleanup_session(_: Any) -> None: # noqa ANN401
"""Cleanup the aiohttp session on shutdown."""
await session.close()
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, cleanup_session)
@callback
async def handle_state_change(event: Event[EventStateChangedData]) -> None:
entity_id = event.data.get("entity_id")
old_state = event.data.get("old_state")
new_state = event.data.get("new_state")
if new_state is None:
return
_LOGGER.debug(
"State change detected for %s: %s -> %s",
entity_id,
old_state.state if old_state else "None",
new_state.state,
)
result = False
retry_count = 0
while not result and retry_count < retry_limit:
result = await call_webhook(
session,
webhook_url,
headers,
build_payload(entry.options, entity_id, old_state, new_state),
)
retry_count += 1
await asyncio.sleep(RETRY_DELAY)
async_track_state_change_event(hass, entities_to_track, handle_state_change)
async def call_webhook(session: aiohttp.ClientSession, webhook_url: str, headers: Mapping[str, str], payload: dict[str, Any]) -> bool:
"""Call webhook with custom payload"""
_LOGGER.debug("Calling webhook using URL: %s", webhook_url)
try:
async with session.post(webhook_url, json=payload, headers=headers) as response:
if 200 <= response.status < 300:
_LOGGER.debug("Webhook successfully called")
return True
_LOGGER.error("Webhook failed, HTTP status: %d", response.status)
except Exception as e: # noqa BLE001
_LOGGER.error("Error calling webhook: %s", e)
return False
def build_payload(options: Mapping[str, Any], entity_id: str, old_state: State | None, new_state: State) -> dict[str, Any]:
"""Build payload for webhook request"""
payload = {
"entity_id": entity_id,
"time": new_state.last_updated.isoformat(),
"new_state": new_state.state,
}
include_old_state = bool(options.get(CONF_PAYLOAD_OLD_STATE, True))
if include_old_state and old_state:
payload["old_state"] = old_state.state
include_attributes = bool(options.get(CONF_PAYLOAD_ATTRIBUTES, False))
if include_attributes:
payload["new_state_attributes"] = new_state.attributes
return payload
def prepare_headers(options: Mapping[str, Any]) -> dict[str, str]:
"""Prepare headers for webhook request"""
headers = options.get(CONF_WEBHOOK_HEADERS) or {}
auth_header = options.get(CONF_WEBHOOK_AUTH_HEADER)
if auth_header:
headers["Authorization"] = auth_header
return headers
async def resolve_tracking_entities(hass: HomeAssistant, entry: ConfigEntry) -> set[str]:
"""Resolve entities to track based on conditions"""
filter_mode: FilterMode = FilterMode(entry.options.get(CONF_FILTER_MODE, FilterMode.OR))
entity_id_glob: str | None = entry.options.get(CONF_ENTITY_ID_GLOB)
entity_ids: list[str] | None = entry.options.get(CONF_ENTITY_ID)
domain: str | None = entry.options.get(CONF_ENTITY_DOMAIN)
labels: list[str] | None = entry.options.get(CONF_ENTITY_LABELS)
glob_entities = set(fnmatch.filter(hass.states.async_entity_ids(), entity_id_glob)) if entity_id_glob else set()
id_entities = {entity_id for entity_id in hass.states.async_entity_ids() if entity_id in entity_ids} if entity_ids else set()
domain_entities = set(hass.states.async_entity_ids(domain)) if domain else set()
label_entities = set()
if labels:
entity_registry = er.async_get(hass)
label_entities = {
entity_id for entity_id, entity in entity_registry.entities.items()
if entity.labels and any(label in entity.labels for label in labels)
}
all_results = [glob_entities, id_entities, domain_entities, label_entities]
if filter_mode == FilterMode.AND:
return set.intersection(*(res for res in all_results if res))
return set.union(*all_results)