Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
78d973a
Add new energy_log_record at the front of the cache
bouwew Aug 9, 2025
8daa87c
Remove double reverse sorting
bouwew Aug 9, 2025
3f6126d
Improve _energy_log_records_load_from_cache()
bouwew Aug 12, 2025
375551a
Implement CRAI suggestions
bouwew Aug 13, 2025
771f8eb
Ruffed
bouwew Aug 13, 2025
d668c17
Update CHANGELOG
bouwew Aug 13, 2025
d931a50
Bump to v0.44.11a2 for testing
bouwew Aug 13, 2025
35e041a
Implement more CRAI suggestions
bouwew Aug 13, 2025
136f9ef
Bump to a3
bouwew Aug 13, 2025
c9e7606
Fix wrong ident
bouwew Aug 13, 2025
1a99b5f
Bump to a4
bouwew Aug 13, 2025
b6f4e81
energy_log_update(): revert some changes
bouwew Aug 14, 2025
02944f5
Fix typo
bouwew Aug 14, 2025
8c7128b
Bump to a5
bouwew Aug 14, 2025
160bf90
Break out _collect_records() function
bouwew Aug 14, 2025
058b0d2
Remove noqa
bouwew Aug 14, 2025
4e1bc54
Improve _energy_log_record_update_state()
bouwew Aug 14, 2025
19ca623
Improve var-name
bouwew Aug 14, 2025
9e0a3be
Optimized code by ChatGPT
bouwew Aug 14, 2025
6ad8523
Optimize energy_log_update() further
bouwew Aug 14, 2025
8d7daa7
More optimization as suggested by @dirixmjm
bouwew Aug 14, 2025
c4e0e74
fixup: improve-energy-caching Python code reformatted using Ruff
Aug 14, 2025
7f9255d
CRAI nitpick
bouwew Aug 14, 2025
c5d044a
Fix inverted filtering as suggested by CRAI
bouwew Aug 14, 2025
5b5b3a6
Bump to a6
bouwew Aug 14, 2025
bbeedb2
Follow CRAI suggestion fully
bouwew Aug 14, 2025
c1dbf8c
Bump to a7
bouwew Aug 14, 2025
4950318
Revert back to sorted slots
bouwew Aug 14, 2025
1225a1a
Set init for cache_updated bool
bouwew Aug 14, 2025
98f5368
Update CHANGELOG for v0.44.11 release
bouwew Aug 14, 2025
6bec531
Bump to a8
bouwew Aug 14, 2025
fa0f93f
Change logger to info
bouwew Aug 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Changelog

## Ongoing
## v0.44.11 - 2025-08-14

- Improve reading from energy-logs cache via PR [314](https://github.com/plugwise/python-plugwise-usb/pull/314)
- Improve energy-collection via PR [311](https://github.com/plugwise/python-plugwise-usb/pull/311)

## v0.44.10 - 2025-08-11
Expand Down
150 changes: 93 additions & 57 deletions plugwise_usb/nodes/circle.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from asyncio import Task, create_task, gather
from collections.abc import Awaitable, Callable
from dataclasses import replace
from datetime import UTC, datetime
from datetime import UTC, datetime, timedelta
from functools import wraps
import logging
from math import ceil
Expand Down Expand Up @@ -80,6 +80,42 @@
_LOGGER = logging.getLogger(__name__)


def _collect_records(data: str) -> dict[int, dict[int, tuple[datetime, int]]]:
"""Collect logs from a cache data string."""
logs: dict[int, dict[int, tuple[datetime, int]]] = {}
log_data = data.split("|")
for log_record in log_data:
log_fields = log_record.split(":")
if len(log_fields) == 4:
address = int(log_fields[0])
slot = int(log_fields[1])
pulses = int(log_fields[3])
# Parse zero-padded timestamp, fallback to manual split
try:
timestamp = datetime.strptime(
log_fields[2], "%Y-%m-%d-%H-%M-%S"
).replace(tzinfo=UTC)
except ValueError:
parts = log_fields[2].split("-")
if len(parts) != 6:
continue
timestamp = datetime(
year=int(parts[0]),
month=int(parts[1]),
day=int(parts[2]),
hour=int(parts[3]),
minute=int(parts[4]),
second=int(parts[5]),
tzinfo=UTC,
)
bucket = logs.setdefault(address, {})
# Keep the first occurrence (cache is newest-first), skip older duplicates
if slot not in bucket:
bucket[slot] = (timestamp, pulses)

return logs


def raise_calibration_missing(func: FuncT) -> FuncT:
"""Validate energy calibration settings are available."""

Expand Down Expand Up @@ -381,7 +417,9 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09
return None

# Try collecting energy-stats for _current_log_address
result = await self.energy_log_update(self._current_log_address, save_cache=True)
result = await self.energy_log_update(
self._current_log_address, save_cache=True
)
if not result:
_LOGGER.debug(
"async_energy_update | %s | Log rollover | energy_log_update from address %s failed",
Expand Down Expand Up @@ -415,7 +453,9 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09
return self._energy_counters.energy_statistics

if len(missing_addresses) == 1:
result = await self.energy_log_update(missing_addresses[0], save_cache=True)
result = await self.energy_log_update(
missing_addresses[0], save_cache=True
)
if result:
await self.power_update()
_LOGGER.debug(
Expand Down Expand Up @@ -528,9 +568,10 @@ async def get_missing_energy_logs(self) -> None:
if self._cache_enabled:
await self._energy_log_records_save_to_cache()

async def energy_log_update(self, address: int | None, save_cache: bool = True) -> bool:
async def energy_log_update(
self, address: int | None, save_cache: bool = True
) -> bool:
"""Request energy logs and return True only when at least one recent, non-empty record was stored; otherwise return False."""
any_record_stored = False
if address is None:
return False

Expand All @@ -553,6 +594,7 @@ async def energy_log_update(self, address: int | None, save_cache: bool = True)
# Forward historical energy log information to energy counters
# Each response message contains 4 log counters (slots) of the
# energy pulses collected during the previous hour of given timestamp
cache_updated = False
for _slot in range(4, 0, -1):
log_timestamp, log_pulses = response.log_data[_slot]
_LOGGER.debug(
Expand All @@ -567,34 +609,32 @@ async def energy_log_update(self, address: int | None, save_cache: bool = True)
self._energy_counters.add_empty_log(response.log_address, _slot)
continue

await self._energy_log_record_update_state(
cache_updated = await self._energy_log_record_update_state(
response.log_address,
_slot,
log_timestamp.replace(tzinfo=UTC),
log_pulses,
import_only=True,
)
any_record_stored = True

self._energy_counters.update()
if any_record_stored and self._cache_enabled and save_cache:
if cache_updated and save_cache:
_LOGGER.debug(
"Saving energy record update to cache for %s", self._mac_in_str
)
await self.save_cache()

return any_record_stored
return True

def _check_timestamp_is_recent(
self, address: int, slot: int, timestamp: datetime
) -> bool:
"""Check if a log record timestamp is within the last MAX_LOG_HOURS hours."""
age_seconds = max(
0.0,
(datetime.now(tz=UTC) - timestamp.replace(tzinfo=UTC)).total_seconds()
0.0, (datetime.now(tz=UTC) - timestamp.replace(tzinfo=UTC)).total_seconds()
)
if age_seconds > MAX_LOG_HOURS * 3600:
_LOGGER.warning(
_LOGGER.info(
"EnergyLog from Node %s | address %s | slot %s | timestamp %s is outdated, ignoring...",
self._mac_in_str,
address,
Expand All @@ -611,37 +651,29 @@ async def _energy_log_records_load_from_cache(self) -> bool:
"Failed to restore energy log records from cache for node %s", self.name
)
return False
restored_logs: dict[int, list[int]] = {}
if cache_data == "":
_LOGGER.debug("Cache-record is empty")
return False

log_data = cache_data.split("|")
for log_record in log_data:
log_fields = log_record.split(":")
if len(log_fields) == 4:
timestamp_energy_log = log_fields[2].split("-")
if len(timestamp_energy_log) == 6:
address = int(log_fields[0])
slot = int(log_fields[1])
self._energy_counters.add_pulse_log(
address=address,
slot=slot,
timestamp=datetime(
year=int(timestamp_energy_log[0]),
month=int(timestamp_energy_log[1]),
day=int(timestamp_energy_log[2]),
hour=int(timestamp_energy_log[3]),
minute=int(timestamp_energy_log[4]),
second=int(timestamp_energy_log[5]),
tzinfo=UTC,
),
pulses=int(log_fields[3]),
import_only=True,
)
if restored_logs.get(address) is None:
restored_logs[address] = []
restored_logs[address].append(slot)
collected_logs = _collect_records(cache_data)

# Cutoff timestamp for filtering
skip_before = datetime.now(tz=UTC) - timedelta(hours=MAX_LOG_HOURS)

# Iterate in reverse sorted order directly
for address in sorted(collected_logs, reverse=True):
for slot in sorted(collected_logs[address].keys(), reverse=True):
(timestamp, pulses) = collected_logs[address][slot]
# Keep only recent entries; prune older-or-equal than cutoff
if timestamp <= skip_before:
continue
self._energy_counters.add_pulse_log(
address=address,
slot=slot,
pulses=pulses,
timestamp=timestamp,
import_only=True,
)

self._energy_counters.update()

Expand Down Expand Up @@ -670,19 +702,19 @@ async def _energy_log_records_save_to_cache(self) -> None:
logs: dict[int, dict[int, PulseLogRecord]] = (
self._energy_counters.get_pulse_logs()
)
cached_logs = ""
for address in sorted(logs.keys(), reverse=True):
for slot in sorted(logs[address].keys(), reverse=True):
log = logs[address][slot]
if cached_logs != "":
cached_logs += "|"
cached_logs += f"{address}:{slot}:{log.timestamp.year}"
cached_logs += f"-{log.timestamp.month}-{log.timestamp.day}"
cached_logs += f"-{log.timestamp.hour}-{log.timestamp.minute}"
cached_logs += f"-{log.timestamp.second}:{log.pulses}"

# Efficiently serialize newest-first (logs is already sorted)
records: list[str] = []
for address, record in logs.items():
for slot, log in record.items():
ts = log.timestamp
records.append(
f"{address}:{slot}:{ts.strftime('%Y-%m-%d-%H-%M-%S')}:{log.pulses}"
)
cached_logs = "|".join(records)
_LOGGER.debug("Saving energy logrecords to cache for %s", self._mac_in_str)
self._set_cache(CACHE_ENERGY_COLLECTION, cached_logs)
# Persist new cache entries to disk immediately
await self.save_cache(trigger_only=True)

async def _energy_log_record_update_state(
self,
Expand All @@ -699,21 +731,25 @@ async def _energy_log_record_update_state(
if not self._cache_enabled:
return False

log_cache_record = f"{address}:{slot}:{timestamp.year}"
log_cache_record += f"-{timestamp.month}-{timestamp.day}"
log_cache_record += f"-{timestamp.hour}-{timestamp.minute}"
log_cache_record += f"-{timestamp.second}:{pulses}"
log_cache_record = (
f"{address}:{slot}:{timestamp.strftime('%Y-%m-%d-%H-%M-%S')}:{pulses}"
)
if (cached_logs := self._get_cache(CACHE_ENERGY_COLLECTION)) is not None:
if log_cache_record not in cached_logs:
entries = cached_logs.split("|") if cached_logs else []
if log_cache_record not in entries:
_LOGGER.debug(
"Adding logrecord (%s, %s) to cache of %s",
str(address),
str(slot),
self._mac_in_str,
)
self._set_cache(
CACHE_ENERGY_COLLECTION, cached_logs + "|" + log_cache_record
new_cache = (
f"{log_cache_record}|{cached_logs}"
if cached_logs
else log_cache_record
)
self._set_cache(CACHE_ENERGY_COLLECTION, new_cache)
await self.save_cache(trigger_only=True)
return True
Comment on lines 737 to 753
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Prevent cache update races when multiple energy_log_update tasks run concurrently

get_missing_energy_logs starts multiple energy_log_update coroutines concurrently; each calls _energy_log_record_update_state which reads-modifies-writes the shared cache string. Without synchronization, updates can be lost.

Apply this diff to serialize cache mutations with an asyncio lock:

-        if (cached_logs := self._get_cache(CACHE_ENERGY_COLLECTION)) is not None:
-            entries = cached_logs.split("|") if cached_logs else []
-            if log_cache_record not in entries:
+        async with self._cache_lock:
+            cached_logs = self._get_cache(CACHE_ENERGY_COLLECTION)
+            if cached_logs is not None:
+                entries = cached_logs.split("|") if cached_logs else []
+                if log_cache_record not in entries:
                 _LOGGER.debug(
                     "Adding logrecord (%s, %s) to cache of %s",
                     str(address),
                     str(slot),
                     self._mac_in_str,
                 )
-                new_cache = (
-                    f"{log_cache_record}|{cached_logs}"
-                    if cached_logs
-                    else log_cache_record
-                )
-                self._set_cache(CACHE_ENERGY_COLLECTION, new_cache)
-                await self.save_cache(trigger_only=True)
-                return True
-            _LOGGER.debug(
-                "Energy logrecord already present for %s, ignoring", self._mac_in_str
-            )
-            return False
+                new_cache = (
+                    f"{log_cache_record}|{cached_logs}"
+                    if cached_logs
+                    else log_cache_record
+                )
+                self._set_cache(CACHE_ENERGY_COLLECTION, new_cache)
+                await self.save_cache(trigger_only=True)
+                return True
+                _LOGGER.debug(
+                    "Energy logrecord already present for %s, ignoring",
+                    self._mac_in_str,
+                )
+                return False

Additionally, define the lock once on the instance and import it:

# imports (top of file)
from asyncio import Task, create_task, gather, Lock

# in PlugwiseCircle.__init__
self._cache_lock: Lock = Lock()

Optional: For consistency, also guard the “cache is empty” path (lines 763–770) with the same lock and consider persisting immediately there as well.

🤖 Prompt for AI Agents
In plugwise_usb/nodes/circle.py around lines 740 to 756, the cache
read-modify-write sequence can race when multiple energy_log_update coroutines
run concurrently; add an asyncio.Lock on the instance, import Lock from asyncio,
initialize self._cache_lock = Lock() in PlugwiseCircle.__init__, and wrap the
entire cache mutation block (the branch that builds new_cache, calls _set_cache
and save_cache) in an "async with self._cache_lock" to serialize mutations; also
apply the same lock to the other "cache is empty" path around lines 763–770 and
consider calling save_cache immediately there so no update is lost.


_LOGGER.debug(
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "plugwise_usb"
version = "0.44.10"
version = "0.44.11a8"
license = "MIT"
keywords = ["home", "automation", "plugwise", "module", "usb"]
classifiers = [
Expand Down