Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

Commit

Permalink
Merge pull request #2 from Cyr-ius/new_generation
Browse files Browse the repository at this point in the history
Redesign of the calculation module
  • Loading branch information
cyr-ius authored Dec 18, 2022
2 parents 80bbe18 + 6ea6379 commit 2773df2
Show file tree
Hide file tree
Showing 12 changed files with 868 additions and 586 deletions.
67 changes: 59 additions & 8 deletions custom_components/enedis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,86 @@

import logging

import homeassistant.helpers.config_validation as cv
import voluptuous as vol

from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.const import CONF_AFTER, CONF_BEFORE
from homeassistant.core import HomeAssistant, ServiceCall

from .const import DOMAIN, PLATFORMS, RELOAD_HISTORY
from .const import (
CONF_POWER_MODE,
CONF_STATISTIC_ID,
DOMAIN,
PLATFORMS,
FETCH_SERVICE,
CLEAR_SERVICE,
CONF_RULES,
CONF_RULE_START_TIME,
CONF_RULE_END_TIME,
CONF_ENTRY,
)
from .coordinator import EnedisDataUpdateCoordinator
from .helpers import async_service_load_datas_history, async_service_datas_clear

_LOGGER = logging.getLogger(__name__)

HISTORY_SERVICE_SCHEMA = vol.Schema(
{
vol.Optional(CONF_ENTRY): str,
vol.Optional(CONF_POWER_MODE): str,
vol.Optional(CONF_AFTER): cv.date,
vol.Optional(CONF_BEFORE): cv.date,
}
)
CLEAR_SERVICE_SCHEMA = vol.Schema(
{
vol.Required(CONF_STATISTIC_ID): str,
}
)


async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Enedis as config entry."""
hass.data.setdefault(DOMAIN, {})

if (
entry.options.get("peak_cost") is not None
and entry.options.get("offpeak_cost") is not None
):
options = dict(entry.options).copy()
for k, rule in entry.options.get(CONF_RULES, {}).items():
rule[
CONF_RULE_START_TIME
] = f'{rule[CONF_RULE_START_TIME].replace("H", ":")}:00'
rule[
CONF_RULE_END_TIME
] = f'{rule[CONF_RULE_END_TIME].replace("H", ":")}:00'
options[CONF_RULES] = entry.options.get(CONF_RULES)
options.pop("peak_cost")
options.pop("offpeak_cost")
hass.config_entries.async_update_entry(entry=entry, options=options)

coordinator = EnedisDataUpdateCoordinator(hass, entry)
await coordinator.async_config_entry_first_refresh()
if coordinator.data is None:
return False

entry.async_on_unload(entry.add_update_listener(_async_update_listener))
hass.data[DOMAIN][entry.entry_id] = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)

async def async_reload_history(call) -> None:
await coordinator.async_load_datas_history(call)
async def async_reload_history(call: ServiceCall) -> None:
await async_service_load_datas_history(hass, coordinator.api, call)

async def async_clear(call: ServiceCall) -> None:
await async_service_datas_clear(hass, call)

hass.services.async_register(
DOMAIN, RELOAD_HISTORY, async_reload_history, schema=vol.Schema({})
DOMAIN, FETCH_SERVICE, async_reload_history, schema=HISTORY_SERVICE_SCHEMA
)
hass.services.async_register(
DOMAIN, CLEAR_SERVICE, async_clear, schema=CLEAR_SERVICE_SCHEMA
)

entry.async_on_unload(entry.add_update_listener(_async_update_listener))

return True

Expand Down
255 changes: 198 additions & 57 deletions custom_components/enedis/config_flow.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,63 @@
"""Config flow to configure integration."""
import logging
from typing import Any

import voluptuous as vol
import homeassistant.helpers.config_validation as cv
import voluptuous as vol
from enedisgatewaypy import EnedisByPDL, EnedisException
from homeassistant.config_entries import ConfigEntry, ConfigFlow, OptionsFlow
from homeassistant.const import CONF_TOKEN, CONF_SOURCE
from homeassistant.const import CONF_TOKEN
from homeassistant.core import callback
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers.aiohttp_client import async_create_clientsession
from homeassistant.helpers.selector import (
SelectOptionDict,
SelectSelector,
SelectSelectorConfig,
SelectSelectorMode,
TimeSelector,
TimeSelectorConfig,
)

from .const import CONF_PDL, DOMAIN, CONF_DETAIL
from .enedisgateway import (
EnedisGateway,
EnedisGatewayException,
HP,
HC,
DEFAULT_HC_PRICE,
DEFAULT_HP_PRICE,
CONSUMPTION,
PRODUCTION,
from .const import (
CONF_CONSUMTPION,
CONF_PDL,
CONF_PRODUCTION,
CONF_RULE_DELETE,
CONF_RULE_END_TIME,
CONF_RULE_ID,
CONF_RULE_NAME,
CONF_RULE_NEW_ID,
CONF_RULE_PRICE,
CONF_RULE_START_TIME,
CONF_RULES,
CONSUMPTION_DAILY,
CONSUMPTION_DETAIL,
COST_CONSUMTPION,
COST_PRODUCTION,
DEFAULT_CC_PRICE,
DEFAULT_PC_PRICE,
DOMAIN,
PRODUCTION_DAILY,
PRODUCTION_DETAIL,
)

DATA_SCHEMA = vol.Schema({vol.Required(CONF_PDL): str, vol.Required(CONF_TOKEN): str})
PRODUCTION_CHOICE = [
SelectOptionDict(value=PRODUCTION_DAILY, label="Journalier"),
SelectOptionDict(value=PRODUCTION_DETAIL, label="Détaillé"),
]

CONSUMPTION_CHOICE = [
SelectOptionDict(value=CONSUMPTION_DAILY, label="Journalier"),
SelectOptionDict(value=CONSUMPTION_DETAIL, label="Détaillé"),
]

DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_PDL): str,
vol.Required(CONF_TOKEN): str,
}
)

_LOGGER = logging.getLogger(__name__)

Expand All @@ -39,22 +76,28 @@ def async_get_options_flow(config_entry):
async def async_step_user(self, user_input=None):
"""Handle a flow initialized by the user."""
errors = {}
options = {CONF_SOURCE: CONSUMPTION, HP: DEFAULT_HP_PRICE, CONF_DETAIL: False}
if user_input is not None:
self._async_abort_entries_match({CONF_PDL: user_input[CONF_PDL]})
api = EnedisByPDL(
token=user_input[CONF_TOKEN],
session=async_create_clientsession(self.hass),
timeout=30,
)
try:
await self.async_set_unique_id(user_input[CONF_PDL])
self._abort_if_unique_id_configured()
api = EnedisGateway(
pdl=user_input[CONF_PDL],
token=user_input[CONF_TOKEN],
session=async_create_clientsession(self.hass),
)
await api.async_get_identity()
except EnedisGatewayException:
await api.async_get_identity(user_input[CONF_PDL])
except EnedisException as error:
_LOGGER.error(error)
errors["base"] = "cannot_connect"
else:
return self.async_create_entry(
title=DOMAIN, data=user_input, options=options
title=f"Linky ({user_input[CONF_PDL]})",
data=user_input,
options={
CONF_CONSUMTPION: user_input.get(CONF_CONSUMTPION),
COST_CONSUMTPION: DEFAULT_CC_PRICE,
CONF_PRODUCTION: user_input.get(CONF_PRODUCTION),
COST_PRODUCTION: DEFAULT_PC_PRICE,
},
)

return self.async_show_form(
Expand All @@ -67,51 +110,149 @@ class EnedisOptionsFlowHandler(OptionsFlow):

def __init__(self, config_entry: ConfigEntry) -> None:
"""Initialize options flow."""
self.init_input = None
self.config_entry = config_entry
rules = config_entry.options.get(CONF_RULES, {})
self._rules: dict[str, Any] = rules.copy()
self._conf_rule_id: int | None = None

async def async_step_init(self, user_input=None):
"""Handle a flow initialized by the user."""
async def async_step_init(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle options flow."""
if user_input is not None:
if (
self.hass.data[DOMAIN][self.config_entry.entry_id]
.data.get("contracts", {})
.get("offpeak_hours")
is not None
):
self.init_input = user_input
return await self.async_step_offpeak()
return self.async_create_entry(title="", data=user_input)
if sel_rule := user_input.get(CONF_RULES):
return await self.async_step_rules(None, sel_rule)
return self._save_config(user_input)

return self._async_init_form()

@callback
def _save_config(self, data: dict[str, Any]) -> FlowResult:
"""Save the updated options."""
new_data = {k: v for k, v in data.items() if k not in [CONF_RULES]}
if self._rules:
new_data[CONF_RULES] = self._rules

return self.async_create_entry(title="", data=new_data)

@callback
def _async_init_form(self) -> FlowResult:
"""Handle a flow initialized by the user."""
rules_list = {
k: f"{v.get(CONF_RULE_NAME)} {v.get(CONF_RULE_START_TIME)}-{v.get(CONF_RULE_END_TIME)} {v.get(CONF_RULE_PRICE)}"
if v
else k
for k, v in self._rules.items()
}
rules = {CONF_RULE_NEW_ID: "Add new", **rules_list}
options = self.config_entry.options

options_schema = vol.Schema(
{
vol.Required(
CONF_SOURCE,
default=self.config_entry.options.get(CONF_SOURCE, CONSUMPTION),
): vol.In([CONSUMPTION, PRODUCTION]),
vol.Optional(
CONF_DETAIL,
default=self.config_entry.options.get(CONF_DETAIL),
): bool,
CONF_PRODUCTION,
description={"suggested_value": options.get(CONF_PRODUCTION)},
): SelectSelector(
SelectSelectorConfig(
options=PRODUCTION_CHOICE,
mode=SelectSelectorMode.DROPDOWN,
custom_value=True,
)
),
vol.Optional(
COST_PRODUCTION,
default=options.get(COST_PRODUCTION, DEFAULT_PC_PRICE),
): cv.positive_float,
vol.Optional(
CONF_CONSUMTPION,
description={"suggested_value": options.get(CONF_CONSUMTPION)},
): SelectSelector(
SelectSelectorConfig(
options=CONSUMPTION_CHOICE,
mode=SelectSelectorMode.DROPDOWN,
custom_value=True,
)
),
vol.Optional(
COST_CONSUMTPION,
default=options.get(COST_CONSUMTPION, DEFAULT_CC_PRICE),
): cv.positive_float,
vol.Optional(CONF_RULES): vol.In(rules),
}
)

return self.async_show_form(step_id="init", data_schema=options_schema)

async def async_step_offpeak(self, user_input=None):
"""Handle a flow offpeak."""
async def async_step_rules(
self, user_input: dict[str, Any] | None = None, rule_id: str | None = None
) -> FlowResult:
"""Handle options flow for apps list."""
if rule_id is not None:
self._conf_rule_id = rule_id if rule_id != CONF_RULE_NEW_ID else None
return self._async_rules_form(rule_id)

if user_input is not None:
self.init_input.update(user_input)
return self.async_create_entry(title="", data=self.init_input)
rule_id = user_input.get(CONF_RULE_ID, self._conf_rule_id)
if rule_id:
if user_input.get(CONF_RULE_DELETE, False):
self._rules.pop(rule_id)
else:
self._rules[rule_id] = {
CONF_RULE_NAME: user_input.get(CONF_RULE_NAME),
CONF_RULE_START_TIME: user_input.get(CONF_RULE_START_TIME),
CONF_RULE_END_TIME: user_input.get(CONF_RULE_END_TIME),
CONF_RULE_PRICE: float(
user_input.get(CONF_RULE_PRICE, DEFAULT_CC_PRICE)
),
}

offpeak_schema = vol.Schema(
{
vol.Optional(
HC, default=self.config_entry.options.get(HC, DEFAULT_HC_PRICE)
): cv.positive_float,
vol.Optional(
HP, default=self.config_entry.options.get(HP, DEFAULT_HP_PRICE)
): cv.positive_float,
}
return await self.async_step_init()

@callback
def _async_rules_form(self, rule_id: str) -> FlowResult:
"""Return configuration form for rules."""
rule_schema = {
vol.Optional(
CONF_RULE_NAME,
description={
"suggested_value": self._rules.get(rule_id, {}).get(CONF_RULE_NAME)
},
): str,
vol.Optional(
CONF_RULE_START_TIME,
description={
"suggested_value": self._rules.get(rule_id, {}).get(
CONF_RULE_START_TIME
)
},
): TimeSelector(TimeSelectorConfig()),
vol.Optional(
CONF_RULE_END_TIME,
description={
"suggested_value": self._rules.get(rule_id, {}).get(
CONF_RULE_END_TIME
)
},
): TimeSelector(TimeSelectorConfig()),
vol.Optional(
CONF_RULE_PRICE,
description={
"suggested_value": self._rules.get(rule_id, {}).get(CONF_RULE_PRICE)
},
): cv.positive_float,
}
if rule_id == CONF_RULE_NEW_ID:
id = str(len(self._rules.keys()) + 1)
data_schema = vol.Schema({vol.Required(CONF_RULE_ID): id, **rule_schema})
else:
data_schema = vol.Schema(
{**rule_schema, vol.Optional(CONF_RULE_DELETE, default=False): bool}
)

return self.async_show_form(
step_id="rules",
data_schema=data_schema,
description_placeholders={
"rule_id": f"`{rule_id}`" if rule_id != CONF_RULE_NEW_ID else "",
},
)
return self.async_show_form(step_id="offpeak", data_schema=offpeak_schema)
Loading

0 comments on commit 2773df2

Please sign in to comment.