diff --git a/custom_components/sma_webbox/__init__.py b/custom_components/sma_webbox/__init__.py index 2f8795d..f88027c 100644 --- a/custom_components/sma_webbox/__init__.py +++ b/custom_components/sma_webbox/__init__.py @@ -1,6 +1,6 @@ """SMA Webbox component entry point.""" import logging -from asyncio import DatagramProtocol, DatagramTransport +import asyncio from datetime import timedelta from typing import Tuple @@ -20,25 +20,24 @@ UpdateFailed, ) -from .const import ( - DEFAULT_SCAN_INTERVAL, - DOMAIN, - SMA_WEBBOX_COORDINATOR, - SMA_WEBBOX_PROTOCOL, - SMA_WEBBOX_REMOVE_LISTENER, -) +from .const import * + from .sma_webbox import ( + WEBBOX_PORT, SmaWebboxBadResponseException, SmaWebboxConnectionException, SmaWebboxTimeoutException, WebboxClientProtocol, + WebboxClientInstance, ) _LOGGER = logging.getLogger(__name__) -async def async_setup(hass: HomeAssistant, config: ConfigType): - """Initiate a configflow from a configuration.yaml entry if any.""" +async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: + """Setup component.""" + + # Initiate a configflow from a configuration.yaml entry if any. if DOMAIN in config: _LOGGER.info("Setting up %s component from configuration.yaml", DOMAIN) hass.async_create_task( @@ -52,35 +51,67 @@ async def async_setup(hass: HomeAssistant, config: ConfigType): return True -async def async_setup_connection( +async def async_setup_api(hass: HomeAssistant) -> asyncio.DatagramProtocol: + """Setup api (udp connection) proxy.""" + + try: + api = hass.data[DOMAIN][SMA_WEBBOX_API] + except KeyError: + # Create UDP client proxy + on_connected = hass.loop.create_future() + _, api = await hass.loop.create_datagram_endpoint( + lambda: WebboxClientProtocol(on_connected), + local_addr=("0.0.0.0", WEBBOX_PORT), + reuse_port=True, + ) + + # Wait for socket ready signal + try: + await asyncio.wait_for( + on_connected, timeout=10 + ) + except TimeoutError: + _LOGGER.error( + "Unable to setup UDP client for port %d", WEBBOX_PORT) + + # Initialize domain data structure + hass.data[DOMAIN] = {SMA_WEBBOX_API: api} + _LOGGER.info("%s API created", DOMAIN) + + # Close asyncio protocol on shutdown + async def async_close_api(event): # pylint: disable=unused-argument + """Close the transport/protocol.""" + api.close() + + # TODO: close API upon component removal ? pylint: disable=fixme + hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, async_close_api) + + return api + + +async def async_setup_instance( hass: HomeAssistant, ip_address: str, udp_port: int -) -> Tuple[DatagramTransport, DatagramProtocol]: +) -> WebboxClientInstance: + + api = await async_setup_api(hass) + """Open a connection to the webbox and build device model.""" - transport, protocol = await hass.loop.create_datagram_endpoint( - lambda: WebboxClientProtocol(hass.loop, (ip_address, udp_port)), - local_addr=("0.0.0.0", udp_port), - reuse_port=True, + instance = WebboxClientInstance( + hass.loop, + api, + (ip_address, udp_port) ) - - # Wait for socket ready signal - await protocol.on_connected # Build webbox model (fetch device tree) - await protocol.create_webbox_model() + await instance.create_webbox_model() - return (transport, protocol) + return instance async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Set up sma webbox from a config entry.""" - _LOGGER.info( - "SMA Webbox instance created(%s:%d)", - entry.data[CONF_IP_ADDRESS], - entry.data[CONF_PORT], - ) - # Setup connection try: - transport, protocol = await async_setup_connection( + instance = await async_setup_instance( hass, entry.data[CONF_IP_ADDRESS], entry.data[CONF_PORT] ) except (OSError, SmaWebboxConnectionException) as exc: @@ -90,7 +121,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: async def async_update_data(): """Update SMA webbox sensors.""" try: - await protocol.fetch_webbox_data() + await instance.fetch_webbox_data() except ( SmaWebboxBadResponseException, SmaWebboxTimeoutException, @@ -111,29 +142,21 @@ async def async_update_data(): ) # Try to fetch initial data, bail out otherwise - try: - await coordinator.async_config_entry_first_refresh() - except ConfigEntryNotReady: - transport.close() - raise - - # Close asyncio protocol on shutdown - async def async_close_session(event): # pylint: disable=unused-argument - """Close the protocol.""" - transport.close() - - remove_stop_listener = hass.bus.async_listen_once( - EVENT_HOMEASSISTANT_STOP, async_close_session - ) + await coordinator.async_config_entry_first_refresh() # Expose data required by coordinated entities - hass.data.setdefault(DOMAIN, {}) - hass.data[DOMAIN][entry.entry_id] = { - SMA_WEBBOX_PROTOCOL: protocol, + hass.data[DOMAIN].setdefault(SMA_WEBBOX_ENTRIES, {}) + hass.data[DOMAIN][SMA_WEBBOX_ENTRIES][entry.entry_id] = { + SMA_WEBBOX_INSTANCE: instance, SMA_WEBBOX_COORDINATOR: coordinator, - SMA_WEBBOX_REMOVE_LISTENER: remove_stop_listener, } + _LOGGER.info( + "SMA Webbox instance created (%s:%d)", + entry.data[CONF_IP_ADDRESS], + entry.data[CONF_PORT], + ) + await hass.config_entries.async_forward_entry_setups(entry, [Platform.SENSOR]) return True @@ -145,9 +168,7 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: entry, [Platform.SENSOR] ) if unload_ok: - data = hass.data[DOMAIN].pop(entry.entry_id) - data[SMA_WEBBOX_PROTOCOL].transport.close() - data[SMA_WEBBOX_REMOVE_LISTENER]() + hass.data[DOMAIN][SMA_WEBBOX_ENTRIES].pop(entry.entry_id) _LOGGER.info( "SMA Webbox instance unloaded(%s:%d)", diff --git a/custom_components/sma_webbox/config_flow.py b/custom_components/sma_webbox/config_flow.py index 68f214a..319fbfc 100644 --- a/custom_components/sma_webbox/config_flow.py +++ b/custom_components/sma_webbox/config_flow.py @@ -11,7 +11,7 @@ from homeassistant.const import CONF_IP_ADDRESS, CONF_PORT, CONF_SCAN_INTERVAL from homeassistant.data_entry_flow import FlowResult -from . import async_setup_connection +from . import async_setup_instance from .const import DEFAULT_SCAN_INTERVAL, DOMAIN from .sma_webbox import WEBBOX_PORT, SmaWebboxConnectionException @@ -108,12 +108,11 @@ async def async_step_user( # Verify ip address format ip_address(user_input[CONF_IP_ADDRESS]) # Try to connect to check ip:port correctness - transport,_ = await async_setup_connection( + await async_setup_instance( self.hass, user_input[CONF_IP_ADDRESS], user_input[CONF_PORT], ) - transport.close() except ValueError: errors["base"] = "invalid_host" except SmaWebboxConnectionException: diff --git a/custom_components/sma_webbox/const.py b/custom_components/sma_webbox/const.py index 1ad45f1..b237488 100644 --- a/custom_components/sma_webbox/const.py +++ b/custom_components/sma_webbox/const.py @@ -5,6 +5,7 @@ DEFAULT_SCAN_INTERVAL = 30 # seconds # DOMAIN dict entries -SMA_WEBBOX_PROTOCOL = "protocol" +SMA_WEBBOX_API = "api" +SMA_WEBBOX_ENTRIES = "entries" +SMA_WEBBOX_INSTANCE = "instance" SMA_WEBBOX_COORDINATOR = "coordinator" -SMA_WEBBOX_REMOVE_LISTENER = "remove_listener" diff --git a/custom_components/sma_webbox/sensor.py b/custom_components/sma_webbox/sensor.py index cce67ed..53e340c 100644 --- a/custom_components/sma_webbox/sensor.py +++ b/custom_components/sma_webbox/sensor.py @@ -29,7 +29,7 @@ DataUpdateCoordinator, ) -from .const import DOMAIN, SMA_WEBBOX_COORDINATOR, SMA_WEBBOX_PROTOCOL +from .const import DOMAIN, SMA_WEBBOX_ENTRIES, SMA_WEBBOX_COORDINATOR, SMA_WEBBOX_INSTANCE from .sma_webbox import ( WEBBOX_CHANNEL_VALUES, WEBBOX_REP_DEVICE_NAME, @@ -61,15 +61,15 @@ async def async_setup_entry( async_add_entities: AddEntitiesCallback, ) -> None: """Set up SMA Webbox sensors.""" - sma_webbox = hass.data[DOMAIN][config_entry.entry_id] + sma_webbox = hass.data[DOMAIN][SMA_WEBBOX_ENTRIES][config_entry.entry_id] - protocol = sma_webbox[SMA_WEBBOX_PROTOCOL] + instance = sma_webbox[SMA_WEBBOX_INSTANCE] coordinator = sma_webbox[SMA_WEBBOX_COORDINATOR] _LOGGER.info( "Creating sensors for %s:%d %s integration", - protocol.addr[0], - protocol.addr[1], + instance.addr[0], + instance.addr[1], DOMAIN, ) @@ -78,18 +78,18 @@ async def async_setup_entry( device_id = 0 # Create DeviceInfo for webbox 'plant' device_info = DeviceInfo( - configuration_url=f"http://{protocol.addr[0]}", + configuration_url=f"http://{instance.addr[0]}", identifiers={(DOMAIN, config_entry.entry_id)}, manufacturer="SMA", model="Webbox", - name=f"{DOMAIN}[{device_id}]:My Plant", + name=f"{DOMAIN}[{instance.addr[0]}:{device_id}]:My Plant", ) # Add sensors from PlantOverview - for name, data_dict in protocol.data[WEBBOX_REP_OVERVIEW].items(): + for name, data_dict in instance.data[WEBBOX_REP_OVERVIEW].items(): entities.append( SMAWebboxSensor( - f"{DOMAIN}_{device_id}_{name}", + f"{DOMAIN}_{instance.addr[0]}_{device_id}_{name}", data_dict, coordinator, config_entry.unique_id, @@ -99,21 +99,21 @@ async def async_setup_entry( # Add sensors from device list # TODO: Handle hierarchy ('children' nodes) pylint: disable=fixme - for device in protocol.data[WEBBOX_REP_DEVICES]: + for device in instance.data[WEBBOX_REP_DEVICES]: device_id += 1 # Create DeviceInfo for each webbox device device_info = DeviceInfo( - configuration_url=f"http://{protocol.addr[0]}", + configuration_url=f"http://{instance.addr[0]}", identifiers={(DOMAIN, device[WEBBOX_REP_DEVICE_NAME])}, manufacturer="SMA", model="Webbox", - name=f"{DOMAIN}[{device_id}]:{device[WEBBOX_REP_DEVICE_NAME]}", + name=f"{DOMAIN}[{instance.addr[0]}:{device_id}]:{device[WEBBOX_REP_DEVICE_NAME]}", via_device=(DOMAIN, config_entry.entry_id), ) for name, data_dict in device[WEBBOX_CHANNEL_VALUES].items(): entities.append( SMAWebboxSensor( - f"{DOMAIN}_{device_id}_{name}", + f"{DOMAIN}_{instance.addr[0]}_{device_id}_{name}", data_dict, coordinator, config_entry.unique_id, @@ -127,7 +127,7 @@ async def async_setup_entry( class SMAWebboxSensor(CoordinatorEntity, SensorEntity): """Representation of a SMA Webbox sensor.""" - def __init__( # pylint: disable=too-many-arguments + def __init__(# pylint: disable=too-many-arguments too-many-positional-arguments self, name: str, data: dict, @@ -147,7 +147,7 @@ def __init__( # pylint: disable=too-many-arguments if WEBBOX_REP_VALUE_UNIT in self._data: self.set_sensor_attributes(self._data[WEBBOX_REP_VALUE_UNIT]) - def set_sensor_attributes(self, unit) -> None: + def set_sensor_attributes(self, unit) -> None: # pylint: disable= too-many-branches too-many-statements """Define HA sensor attributes based on webbox units.""" if unit == WEBBOX_UNIT_AMPERE: self._attr_unit_of_measurement = UnitOfElectricCurrent.AMPERE diff --git a/custom_components/sma_webbox/sma_webbox.py b/custom_components/sma_webbox/sma_webbox.py index ef646f2..71dae3c 100644 --- a/custom_components/sma_webbox/sma_webbox.py +++ b/custom_components/sma_webbox/sma_webbox.py @@ -4,7 +4,7 @@ import json import logging import time -from asyncio import DatagramTransport, Future +from asyncio import DatagramTransport, DatagramProtocol, Future from asyncio.events import AbstractEventLoop from typing import Tuple @@ -73,6 +73,75 @@ def update_channel_values( current_channel_values[key] = channel_value +# - UDP client class ------------------------------------------------------------ +class WebboxClientProtocol: + """Webbox multi-client UDP protocol implementation""" + + def __init__(self, on_connected: Future) -> None: + """Instance parameter initialisation.""" + self._transport: DatagramTransport = None + self._on_connected = on_connected + self._outstanding_requests = {} + + @property + def is_connected(self) -> bool: + """Return True aslong as connection is alive.""" + return self._on_connected.done() + + def close(self): + """Close transport (and protocol) layer""" + if self._transport: + self._transport.close() + + def request(self, request: str, remote_addr: Tuple[str, int], reply: Future) -> None: + """Send request message to remote address""" + if remote_addr[0] in self._outstanding_requests: + _LOGGER.warning("Outstanding request exists for %s (removed)", remote_addr[0]) + if self._transport: + if not self._transport.is_closing(): + self._outstanding_requests[remote_addr[0]] = reply + self._transport.sendto(request.encode(), remote_addr) + else: + _LOGGER.warning( + "Request for %s ignored (missing transport layer)", + self._outstanding_requests[remote_addr[0]] + ) + + def request_cancel(self, remote_addr: Tuple[str, int]) -> None: + """Cancel outstanding request to remote address""" + if remote_addr[0] in self._outstanding_requests: + if not self._outstanding_requests[remote_addr[0]].cancelled(): + self._outstanding_requests[remote_addr[0]].cancel() + del self._outstanding_requests[remote_addr[0]] + + # - Base and Datagram protocols methods ----------------------------------- + def connection_made(self, transport: DatagramTransport) -> None: + """Store transport object and release _on_connected future.""" + _LOGGER.info("WebboxClientProtocol created") + self._transport = transport + self._on_connected.set_result(True) + + def datagram_received(self, data: bytes, remote_addr: Tuple[str, int]) -> None: + """Return Webbox response to rpc caller by releasing corresponding future.""" + if remote_addr[0] in self._outstanding_requests: + data = json.loads(data.decode('iso-8859-1').replace("\0", "")) + if not self._outstanding_requests[remote_addr[0]].cancelled(): + self._outstanding_requests[remote_addr[0]].set_result(data) + del self._outstanding_requests[remote_addr[0]] + + def error_received(self, exc: Exception) -> None: + """Close connection upon unexpected errors.""" + _LOGGER.warning("Closing connection (Error received: %s)", exc) + self.close() + + def connection_lost( + self, exc: Exception + ) -> None: # pylint: disable=unused-argument + """Destroy _onconnected future to reset is_connected.""" + _LOGGER.info("WebboxClientProtocol closed (%s)", exc) + self._outstanding_requests = {} + + # - Main class exceptions ----------------------------------------------------- class SmaWebboxException(Exception): """Base exception of the sma_webbox library.""" @@ -91,23 +160,24 @@ class SmaWebboxConnectionException(SmaWebboxException): # - Main class implementation ------------------------------------------------- -class WebboxClientProtocol: # pylint: disable=too-many-instance-attributes +class WebboxClientInstance: # pylint: disable=too-many-instance-attributes """Webbox RPC Client implementation.""" - def __init__(self, loop: AbstractEventLoop, addr: Tuple[str, int]) -> None: + def __init__( + self, + loop: AbstractEventLoop, + api: DatagramProtocol, + addr: Tuple[str, int] + ) -> None: """Instance parameter initialisation.""" self._loop: AbstractEventLoop = loop self._addr: Tuple[str, int] = addr + self._api: DatagramProtocol = api - self._transport: DatagramTransport = None self._request_id: int = 0 self._last_access_time: float = 0 self._data_cache: dict = {} - # Synchronization objects - self._on_received: Future = None - self._on_connected: Future = self._loop.create_future() - @property def addr(self) -> Tuple[str, int]: """Return IP address.""" @@ -118,47 +188,6 @@ def data(self) -> dict: """Return webbox data cache.""" return self._data_cache - @property - def transport(self) -> dict: - """Return instance's DatagramTransport object.""" - return self._transport - - @property - def on_connected(self) -> Future: - """Return Future to await on while waiting for an UDP socket.""" - return self._on_connected - - @property - def is_connected(self) -> bool: - """Return True aslong as connection is alive.""" - return self._on_connected.done() if self._on_connected else False - - # - Base and Datagram protocols methods ----------------------------------- - def connection_made(self, transport: DatagramTransport) -> None: - """Store transport object and release _on_connected future.""" - _LOGGER.info("UDP protocol created") - self._transport = transport - self._on_connected.set_result(True) - - def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None: - """Return Webbox response to rpc caller using future.""" - if self._addr[0] == addr[0]: - data = json.loads(data.decode('iso-8859-1').replace("\0", "")) - if not self._on_received.cancelled(): - self._on_received.set_result(data) - - def error_received(self, exc: Exception) -> None: - """Close connection upon unexpected errors.""" - _LOGGER.warning("Error received: {%s}, closing", exc) - self._transport.close() - - def connection_lost( - self, exc: Exception - ) -> None: # pylint: disable=unused-argument - """Destroy _onconnected future to reset is_connected.""" - _LOGGER.info("UDP protocol closed") - self._on_connected = None - # - SMA Webbox RPC API ---------------------------------------------------- async def get_plant_overview(self) -> dict: """Define wrapper for get_plant_overview procedure.""" @@ -221,19 +250,19 @@ async def _rpc(self, request: Tuple[int, str]) -> dict: request_id, payload = request # Send RPC/UDP request to SMA Webbox - _LOGGER.debug(payload) - self._transport.sendto(payload.encode(), self._addr) + _LOGGER.debug("%s:%d request %s", *self._addr, payload) + on_received = self._loop.create_future() + self._api.request(payload, self._addr, on_received) # Wait for response from SMA Webbox - self._on_received = self._loop.create_future() try: response = await asyncio.wait_for( - self._on_received, timeout=WEBBOX_TIMEOUT + on_received, timeout=WEBBOX_TIMEOUT ) - _LOGGER.debug(response) + _LOGGER.debug("%s:%d reply: %s", *self._addr, response) except asyncio.TimeoutError: - _LOGGER.warning("RPC request timed out") - self._on_received.cancel() + _LOGGER.warning("RPC request to %s timed out", self._addr[0]) + self._api.request_cancel(self._addr) raise SmaWebboxTimeoutException("RPC request timed out") from None # Raise exception upon errored response or id mismatch @@ -306,9 +335,8 @@ async def create_webbox_model(self) -> None: self._data_cache = {**self._data_cache, **response} except Exception as ex: # Error while building webbox model - self._transport.close() raise SmaWebboxConnectionException( - f"Unable to create SMA Webbox model from device at" + f"Unable to create SMA Webbox model from device at " f"{self._addr[0]}:{self._addr[1]} ({ex})" ) from ex @@ -388,31 +416,34 @@ async def main(url: str) -> None: _LOGGER.info("Starting SMA Webbox component") loop = asyncio.get_running_loop() + on_connected = loop.create_future() try: # Create an UDP socket listening on port WEBBOX_PORT # and from any interface - _, protocol = await loop.create_datagram_endpoint( - lambda: WebboxClientProtocol(loop, (url, WEBBOX_PORT)), + _, api = await loop.create_datagram_endpoint( + lambda: WebboxClientProtocol(on_connected), local_addr=("0.0.0.0", WEBBOX_PORT), reuse_port=True, ) # Wait for socket ready signal - await protocol.on_connected + await on_connected + + instance = WebboxClientInstance(loop, api, (url, WEBBOX_PORT)) # Build webbox model (fetch device tree) - await protocol.create_webbox_model() + await instance.create_webbox_model() # Loop until connection lost or user - while protocol.is_connected: + while api.is_connected: try: - await protocol.fetch_webbox_data() + await instance.fetch_webbox_data() except SmaWebboxTimeoutException as ex: _LOGGER.warning(ex) except SmaWebboxBadResponseException as ex: _LOGGER.warning(ex) - print_webbox_info(protocol.data) + print_webbox_info(instance.data) await asyncio.sleep(30) except SmaWebboxConnectionException as ex: