forked from home-assistant/core
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Keba charging station/wallbox as component (home-assistant#24484)
* Add Keba charging station wallbox component * Added start/stop commands (ena 0 and ena 1) * added refresh_interval parameter and fixed authorization * fixed max line length * deactivate failsafe mode if not set in configuration * extracted I/O code to pypi library * updated services.yaml * pinned version of requirements * fixed typos, indent and comments * simplified sensor generation, fixed unique_id and name of sensors * cleaned up data extraction * flake8 fixes * added fast polling, fixed unique_id, code cleanup * updated requirements * fixed pylint * integrated code styling suggestions * fixed pylint * code style changes according to suggestions and pylint fixes * formatted with black * clarefied variables * Update homeassistant/components/keba/__init__.py Co-Authored-By: Martin Hjelmare <marhje52@kth.se> * Update homeassistant/components/keba/__init__.py Co-Authored-By: Martin Hjelmare <marhje52@kth.se> * Update homeassistant/components/keba/__init__.py Co-Authored-By: Martin Hjelmare <marhje52@kth.se> * Update homeassistant/components/keba/__init__.py Co-Authored-By: Martin Hjelmare <marhje52@kth.se> * fixed behaviour if no charging station was found * fix pylint * Update homeassistant/components/keba/__init__.py Co-Authored-By: Martin Hjelmare <marhje52@kth.se>
- Loading branch information
1 parent
15ab004
commit 75e18d4
Showing
9 changed files
with
586 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,229 @@ | ||
"""Support for KEBA charging stations.""" | ||
import asyncio | ||
import logging | ||
|
||
from keba_kecontact.connection import KebaKeContact | ||
import voluptuous as vol | ||
|
||
from homeassistant.const import CONF_HOST | ||
from homeassistant.helpers import discovery | ||
import homeassistant.helpers.config_validation as cv | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
DOMAIN = "keba" | ||
SUPPORTED_COMPONENTS = ["binary_sensor", "sensor", "lock"] | ||
|
||
CONF_RFID = "rfid" | ||
CONF_FS = "failsafe" | ||
CONF_FS_TIMEOUT = "failsafe_timeout" | ||
CONF_FS_FALLBACK = "failsafe_fallback" | ||
CONF_FS_PERSIST = "failsafe_persist" | ||
CONF_FS_INTERVAL = "refresh_interval" | ||
|
||
MAX_POLLING_INTERVAL = 5 # in seconds | ||
MAX_FAST_POLLING_COUNT = 4 | ||
|
||
CONFIG_SCHEMA = vol.Schema( | ||
{ | ||
DOMAIN: vol.Schema( | ||
{ | ||
vol.Required(CONF_HOST): cv.string, | ||
vol.Optional(CONF_RFID, default="00845500"): cv.string, | ||
vol.Optional(CONF_FS, default=False): cv.boolean, | ||
vol.Optional(CONF_FS_TIMEOUT, default=30): cv.positive_int, | ||
vol.Optional(CONF_FS_FALLBACK, default=6): cv.positive_int, | ||
vol.Optional(CONF_FS_PERSIST, default=0): cv.positive_int, | ||
vol.Optional(CONF_FS_INTERVAL, default=5): cv.positive_int, | ||
} | ||
) | ||
}, | ||
extra=vol.ALLOW_EXTRA, | ||
) | ||
|
||
_SERVICE_MAP = { | ||
"request_data": "request_data", | ||
"set_energy": "async_set_energy", | ||
"set_current": "async_set_current", | ||
"authorize": "async_start", | ||
"deauthorize": "async_stop", | ||
"enable": "async_enable_ev", | ||
"disable": "async_disable_ev", | ||
"set_failsafe": "async_set_failsafe", | ||
} | ||
|
||
|
||
async def async_setup(hass, config): | ||
"""Check connectivity and version of KEBA charging station.""" | ||
host = config[DOMAIN][CONF_HOST] | ||
rfid = config[DOMAIN][CONF_RFID] | ||
refresh_interval = config[DOMAIN][CONF_FS_INTERVAL] | ||
keba = KebaHandler(hass, host, rfid, refresh_interval) | ||
hass.data[DOMAIN] = keba | ||
|
||
# Wait for KebaHandler setup complete (initial values loaded) | ||
if not await keba.setup(): | ||
_LOGGER.error("Could not find a charging station at %s", host) | ||
return False | ||
|
||
# Set failsafe mode at start up of home assistant | ||
failsafe = config[DOMAIN][CONF_FS] | ||
timeout = config[DOMAIN][CONF_FS_TIMEOUT] if failsafe else 0 | ||
fallback = config[DOMAIN][CONF_FS_FALLBACK] if failsafe else 0 | ||
persist = config[DOMAIN][CONF_FS_PERSIST] if failsafe else 0 | ||
try: | ||
hass.loop.create_task(keba.set_failsafe(timeout, fallback, persist)) | ||
except ValueError as ex: | ||
_LOGGER.warning("Could not set failsafe mode %s", ex) | ||
|
||
# Register services to hass | ||
async def execute_service(call): | ||
"""Execute a service to KEBA charging station. | ||
This must be a member function as we need access to the keba | ||
object here. | ||
""" | ||
function_name = _SERVICE_MAP[call.service] | ||
function_call = getattr(keba, function_name) | ||
await function_call(call.data) | ||
|
||
for service in _SERVICE_MAP: | ||
hass.services.async_register(DOMAIN, service, execute_service) | ||
|
||
# Load components | ||
for domain in SUPPORTED_COMPONENTS: | ||
hass.async_create_task( | ||
discovery.async_load_platform(hass, domain, DOMAIN, {}, config) | ||
) | ||
|
||
# Start periodic polling of charging station data | ||
keba.start_periodic_request() | ||
|
||
return True | ||
|
||
|
||
class KebaHandler(KebaKeContact): | ||
"""Representation of a KEBA charging station connection.""" | ||
|
||
def __init__(self, hass, host, rfid, refresh_interval): | ||
"""Constructor.""" | ||
super().__init__(host, self.hass_callback) | ||
|
||
self._update_listeners = [] | ||
self._hass = hass | ||
self.rfid = rfid | ||
self.device_name = "keba_wallbox_" | ||
|
||
# Ensure at least MAX_POLLING_INTERVAL seconds delay | ||
self._refresh_interval = max(MAX_POLLING_INTERVAL, refresh_interval) | ||
self._fast_polling_count = MAX_FAST_POLLING_COUNT | ||
self._polling_task = None | ||
|
||
def start_periodic_request(self): | ||
"""Start periodic data polling.""" | ||
self._polling_task = self._hass.loop.create_task(self._periodic_request()) | ||
|
||
async def _periodic_request(self): | ||
"""Send periodic update requests.""" | ||
await self.request_data() | ||
|
||
if self._fast_polling_count < MAX_FAST_POLLING_COUNT: | ||
self._fast_polling_count += 1 | ||
_LOGGER.debug("Periodic data request executed, now wait for 2 seconds") | ||
await asyncio.sleep(2) | ||
else: | ||
_LOGGER.debug( | ||
"Periodic data request executed, now wait for %s seconds", | ||
self._refresh_interval, | ||
) | ||
await asyncio.sleep(self._refresh_interval) | ||
|
||
_LOGGER.debug("Periodic data request rescheduled") | ||
self._polling_task = self._hass.loop.create_task(self._periodic_request()) | ||
|
||
async def setup(self, loop=None): | ||
"""Initialize KebaHandler object.""" | ||
await super().setup(loop) | ||
|
||
# Request initial values and extract serial number | ||
await self.request_data() | ||
if self.get_value("Serial") is not None: | ||
self.device_name = f"keba_wallbox_{self.get_value('Serial')}" | ||
return True | ||
|
||
return False | ||
|
||
def hass_callback(self, data): | ||
"""Handle component notification via callback.""" | ||
|
||
# Inform entities about updated values | ||
for listener in self._update_listeners: | ||
listener() | ||
|
||
_LOGGER.debug("Notifying %d listeners", len(self._update_listeners)) | ||
|
||
def _set_fast_polling(self): | ||
_LOGGER.debug("Fast polling enabled") | ||
self._fast_polling_count = 0 | ||
self._polling_task.cancel() | ||
self._polling_task = self._hass.loop.create_task(self._periodic_request()) | ||
|
||
def add_update_listener(self, listener): | ||
"""Add a listener for update notifications.""" | ||
self._update_listeners.append(listener) | ||
|
||
# initial data is already loaded, thus update the component | ||
listener() | ||
|
||
async def async_set_energy(self, param): | ||
"""Set energy target in async way.""" | ||
try: | ||
energy = param["energy"] | ||
await self.set_energy(energy) | ||
self._set_fast_polling() | ||
except (KeyError, ValueError) as ex: | ||
_LOGGER.warning("Energy value is not correct. %s", ex) | ||
|
||
async def async_set_current(self, param): | ||
"""Set current maximum in async way.""" | ||
try: | ||
current = param["current"] | ||
await self.set_current(current) | ||
# No fast polling as this function might be called regularly | ||
except (KeyError, ValueError) as ex: | ||
_LOGGER.warning("Current value is not correct. %s", ex) | ||
|
||
async def async_start(self, param=None): | ||
"""Authorize EV in async way.""" | ||
await self.start(self.rfid) | ||
self._set_fast_polling() | ||
|
||
async def async_stop(self, param=None): | ||
"""De-authorize EV in async way.""" | ||
await self.stop(self.rfid) | ||
self._set_fast_polling() | ||
|
||
async def async_enable_ev(self, param=None): | ||
"""Enable EV in async way.""" | ||
await self.enable(True) | ||
self._set_fast_polling() | ||
|
||
async def async_disable_ev(self, param=None): | ||
"""Disable EV in async way.""" | ||
await self.enable(False) | ||
self._set_fast_polling() | ||
|
||
async def async_set_failsafe(self, param=None): | ||
"""Set failsafe mode in async way.""" | ||
try: | ||
timout = param[CONF_FS_TIMEOUT] | ||
fallback = param[CONF_FS_FALLBACK] | ||
persist = param[CONF_FS_PERSIST] | ||
await self.set_failsafe(timout, fallback, persist) | ||
self._set_fast_polling() | ||
except (KeyError, ValueError) as ex: | ||
_LOGGER.warning( | ||
"failsafe_timeout, failsafe_fallback and/or " | ||
"failsafe_persist value are not correct. %s", | ||
ex, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
"""Support for KEBA charging station binary sensors.""" | ||
import logging | ||
|
||
from homeassistant.components.binary_sensor import BinarySensorDevice | ||
from homeassistant.components.binary_sensor import ( | ||
DEVICE_CLASS_PLUG, | ||
DEVICE_CLASS_CONNECTIVITY, | ||
DEVICE_CLASS_POWER, | ||
DEVICE_CLASS_SAFETY, | ||
) | ||
|
||
from . import DOMAIN | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
async def async_setup_platform(hass, config, async_add_entities, discovery_info=None): | ||
"""Set up the KEBA charging station platform.""" | ||
if discovery_info is None: | ||
return | ||
|
||
keba = hass.data[DOMAIN] | ||
|
||
sensors = [ | ||
KebaBinarySensor(keba, "Online", "Wallbox", DEVICE_CLASS_CONNECTIVITY), | ||
KebaBinarySensor(keba, "Plug", "Plug", DEVICE_CLASS_PLUG), | ||
KebaBinarySensor(keba, "State", "Charging state", DEVICE_CLASS_POWER), | ||
KebaBinarySensor(keba, "Tmo FS", "Failsafe Mode", DEVICE_CLASS_SAFETY), | ||
] | ||
async_add_entities(sensors) | ||
|
||
|
||
class KebaBinarySensor(BinarySensorDevice): | ||
"""Representation of a binary sensor of a KEBA charging station.""" | ||
|
||
def __init__(self, keba, key, sensor_name, device_class): | ||
"""Initialize the KEBA Sensor.""" | ||
self._key = key | ||
self._keba = keba | ||
self._name = sensor_name | ||
self._device_class = device_class | ||
self._is_on = None | ||
self._attributes = {} | ||
|
||
@property | ||
def should_poll(self): | ||
"""Deactivate polling. Data updated by KebaHandler.""" | ||
return False | ||
|
||
@property | ||
def unique_id(self): | ||
"""Return the unique ID of the binary sensor.""" | ||
return f"{self._keba.device_name}_{self._name}" | ||
|
||
@property | ||
def name(self): | ||
"""Return the name of the device.""" | ||
return self._name | ||
|
||
@property | ||
def device_class(self): | ||
"""Return the class of this sensor.""" | ||
return self._device_class | ||
|
||
@property | ||
def is_on(self): | ||
"""Return true if sensor is on.""" | ||
return self._is_on | ||
|
||
@property | ||
def device_state_attributes(self): | ||
"""Return the state attributes of the binary sensor.""" | ||
return self._attributes | ||
|
||
async def async_update(self): | ||
"""Get latest cached states from the device.""" | ||
if self._key == "Online": | ||
self._is_on = self._keba.get_value(self._key) | ||
|
||
elif self._key == "Plug": | ||
self._is_on = self._keba.get_value("Plug_plugged") | ||
self._attributes["plugged_on_wallbox"] = self._keba.get_value( | ||
"Plug_wallbox" | ||
) | ||
self._attributes["plug_locked"] = self._keba.get_value("Plug_locked") | ||
self._attributes["plugged_on_EV"] = self._keba.get_value("Plug_EV") | ||
|
||
elif self._key == "State": | ||
self._is_on = self._keba.get_value("State_on") | ||
self._attributes["status"] = self._keba.get_value("State_details") | ||
self._attributes["max_charging_rate"] = str( | ||
self._keba.get_value("Max curr") | ||
) | ||
|
||
elif self._key == "Tmo FS": | ||
self._is_on = not self._keba.get_value("FS_on") | ||
self._attributes["failsafe_timeout"] = str(self._keba.get_value("Tmo FS")) | ||
self._attributes["fallback_current"] = str(self._keba.get_value("Curr FS")) | ||
elif self._key == "Authreq": | ||
self._is_on = self._keba.get_value(self._key) == 0 | ||
|
||
def update_callback(self): | ||
"""Schedule a state update.""" | ||
self.async_schedule_update_ha_state(True) | ||
|
||
async def async_added_to_hass(self): | ||
"""Add update callback after being added to hass.""" | ||
self._keba.add_update_listener(self.update_callback) |
Oops, something went wrong.