Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 68 additions & 106 deletions plugwise_usb/nodes/circle.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,10 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09
self._log_no_energy_stats_update()
return None

# Always request last energy log records at initial startup
# Always request the most recent energy log records at initial startup, check if the current
# address is actually reported by the node even when all slots at that address are empty.
if not self._last_energy_log_requested:
self._last_energy_log_requested = await self.energy_log_update(
self._last_energy_log_requested, _ = await self.energy_log_update(
self._current_log_address, save_cache=False
)

Expand All @@ -416,9 +417,9 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09
)
return None

# Try collecting energy-stats for _current_log_address
result = await self.energy_log_update(
self._current_log_address, save_cache=True
# Try collecting energy-stats from _current_log_address
result, slots_empty_cur = await self.energy_log_update(
self._current_log_address, save_cache=False
)
if not result:
_LOGGER.debug(
Expand All @@ -428,18 +429,20 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09
)
return None

if self._current_log_address is not None:
# Retry with previous log address as Circle node pointer to self._current_log_address
# could be rolled over while the last log is at previous address/slot
prev_log_address, _ = calc_log_address(self._current_log_address, 1, -4)
result = await self.energy_log_update(prev_log_address, save_cache=True)
if not result:
_LOGGER.debug(
"async_energy_update | %s | Log rollover | energy_log_update from address %s failed",
self._mac_in_str,
prev_log_address,
)
return None
# Retry with previous log address as Circle node pointer to self._current_log_address
# could be rolled over while the last log is at previous address
prev_log_address, _ = calc_log_address(self._current_log_address, 1, -4)
result, slots_empty_prev = await self.energy_log_update(prev_log_address, save_cache=False)
if not result:
_LOGGER.debug(
"async_energy_update | %s | Log rollover | energy_log_update from address %s failed",
self._mac_in_str,
prev_log_address,
)
return None

if self._cache_enabled and (not slots_empty_cur or not slots_empty_prev):
await self.save_cache()

if (
missing_addresses := self._energy_counters.log_addresses_missing
Expand All @@ -453,7 +456,7 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09
return self._energy_counters.energy_statistics

if len(missing_addresses) == 1:
result = await self.energy_log_update(
result, _ = await self.energy_log_update(
missing_addresses[0], save_cache=True
)
if result:
Expand Down Expand Up @@ -515,9 +518,12 @@ async def _get_initial_energy_logs(self) -> None:
max_addresses_to_collect, ceil(datetime.now(tz=UTC).hour / factor) + 1
)
log_address = self._current_log_address
any_updates = False
while total_addresses > 0:
result = await self.energy_log_update(log_address, save_cache=False)
if not result:
result, slots_empty = await self.energy_log_update(
log_address, save_cache=False
)
if result and slots_empty:
# Stop initial log collection when an address contains no (None) or outdated data
# Outdated data can indicate a EnergyLog address rollover: from address 6014 to 0
_LOGGER.debug(
Expand All @@ -526,11 +532,12 @@ async def _get_initial_energy_logs(self) -> None:
)
break

any_updates |= (not slots_empty)
log_address, _ = calc_log_address(log_address, 1, -4)
total_addresses -= 1

if self._cache_enabled:
await self._energy_log_records_save_to_cache()
if self._cache_enabled and any_updates:
await self.save_cache()

async def get_missing_energy_logs(self) -> None:
"""Task to retrieve missing energy logs."""
Expand All @@ -554,8 +561,9 @@ async def get_missing_energy_logs(self) -> None:
create_task(self.energy_log_update(address, save_cache=False))
for address in missing_addresses
]
any_updates = False
for idx, task in enumerate(tasks):
result = await task
result, slots_empty = await task
# When an energy log collection task returns False, stop and cancel the remaining tasks
if not result:
to_cancel = tasks[idx + 1 :]
Expand All @@ -565,15 +573,25 @@ async def get_missing_energy_logs(self) -> None:
await gather(*to_cancel, return_exceptions=True)
break

if self._cache_enabled:
await self._energy_log_records_save_to_cache()
any_updates |= (not slots_empty)

if self._cache_enabled and any_updates:
await self.save_cache()

async def energy_log_update(
self, address: int | None, save_cache: bool = True
) -> bool:
"""Request energy logs and return True only when at least one recent, non-empty record was stored; otherwise return False."""
) -> tuple[bool, bool]:
"""Request energy logs from node and store them.

First bool: True when processing succeeded (records stored in memory, possibly all-empty);
False only on transport or address errors.
Second bool: slots_empty — True when all four slots at the address are empty or outdated;
False when at least one recent, non-empty record was stored.
"""
result = False
slots_empty = True
if address is None:
return False
return result, slots_empty

_LOGGER.debug(
"Requesting EnergyLogs from node %s address %s",
Expand All @@ -586,8 +604,9 @@ async def energy_log_update(
"Retrieving EnergyLogs data from node %s failed",
self._mac_in_str,
)
return False
return result, slots_empty

result = True
_LOGGER.debug("EnergyLogs from node %s, address=%s:", self._mac_in_str, address)
await self._available_update_state(True, response.timestamp)

Expand All @@ -600,31 +619,26 @@ async def energy_log_update(
_LOGGER.debug(
"In slot=%s: pulses=%s, timestamp=%s", _slot, log_pulses, log_timestamp
)
if (
log_timestamp is None
or log_pulses is None
# Don't store an old log record; store an empty record instead
or not self._check_timestamp_is_recent(address, _slot, log_timestamp)
):
self._energy_counters.add_empty_log(response.log_address, _slot)
continue

cache_updated = await self._energy_log_record_update_state(
response.log_address,
_slot,
log_timestamp.replace(tzinfo=UTC),
log_pulses,
import_only=True,
)
address = response.log_address
log_timestamp = log_timestamp.replace(tzinfo=UTC)
if log_timestamp is None or log_pulses is None:
self._energy_counters.add_empty_log(address, _slot)
elif self._check_timestamp_is_recent(address, _slot, log_timestamp):
self._energy_counters.add_pulse_log(
address, _slot, log_timestamp, log_pulses, import_only=True,
)
cache_updated = True

self._energy_counters.update()
if cache_updated and save_cache:
_LOGGER.debug(
"Saving energy record update to cache for %s", self._mac_in_str
)
await self.save_cache()
if cache_updated:
slots_empty = False
await self._energy_log_records_save_to_cache()
if save_cache:
_LOGGER.debug("Saving energy cache for %s", self._mac_in_str)
await self.save_cache()
return result, slots_empty

return True
return result, slots_empty

def _check_timestamp_is_recent(
self, address: int, slot: int, timestamp: datetime
Expand Down Expand Up @@ -695,7 +709,7 @@ async def _energy_log_records_load_from_cache(self) -> bool:
return True

async def _energy_log_records_save_to_cache(self) -> None:
"""Save currently collected energy logs to cached file."""
"""Update the in-memory energy log cache string (no file I/O)."""
if not self._cache_enabled:
return

Expand All @@ -711,60 +725,8 @@ async def _energy_log_records_save_to_cache(self) -> None:
f"{address}:{slot}:{ts.strftime('%Y-%m-%d-%H-%M-%S')}:{log.pulses}"
)
cached_logs = "|".join(records)
_LOGGER.debug("Saving energy logrecords to cache for %s", self._mac_in_str)
_LOGGER.debug("Updating in-memory energy log records for %s", self._mac_in_str)
self._set_cache(CACHE_ENERGY_COLLECTION, cached_logs)
# Persist new cache entries to disk immediately
await self.save_cache(trigger_only=True)

async def _energy_log_record_update_state(
self,
address: int,
slot: int,
timestamp: datetime,
pulses: int,
import_only: bool = False,
) -> bool:
"""Process new energy log record. Returns true if record is new or changed."""
self._energy_counters.add_pulse_log(
address, slot, timestamp, pulses, import_only=import_only
)
if not self._cache_enabled:
return False

log_cache_record = (
f"{address}:{slot}:{timestamp.strftime('%Y-%m-%d-%H-%M-%S')}:{pulses}"
)
if (cached_logs := self._get_cache(CACHE_ENERGY_COLLECTION)) is not None:
entries = cached_logs.split("|") if cached_logs else []
if log_cache_record not in entries:
_LOGGER.debug(
"Adding logrecord (%s, %s) to cache of %s",
str(address),
str(slot),
self._mac_in_str,
)
new_cache = (
f"{log_cache_record}|{cached_logs}"
if cached_logs
else log_cache_record
)
self._set_cache(CACHE_ENERGY_COLLECTION, new_cache)
await self.save_cache(trigger_only=True)
return True

_LOGGER.debug(
"Energy logrecord already present for %s, ignoring", self._mac_in_str
)
return False

_LOGGER.debug(
"Cache is empty, adding new logrecord (%s, %s) for %s",
str(address),
str(slot),
self._mac_in_str,
)
self._set_cache(CACHE_ENERGY_COLLECTION, log_cache_record)
return True

@raise_not_loaded
async def set_relay(self, state: bool) -> bool:
Expand Down Expand Up @@ -1187,7 +1149,7 @@ async def _relay_init_update_state(self, state: bool) -> None:
NodeFeature.RELAY_INIT, self._relay_config
)
_LOGGER.debug(
"Saving relay_init state update to cachefor %s", self._mac_in_str
"Saving relay_init state update to cache for %s", self._mac_in_str
)
await self.save_cache()

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "plugwise_usb"
version = "0.44.13"
version = "0.44.14a2"
license = "MIT"
keywords = ["home", "automation", "plugwise", "module", "usb"]
classifiers = [
Expand Down
Loading