-
Notifications
You must be signed in to change notification settings - Fork 2
Improve reading energy_logs from cache #314
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
78d973a
8daa87c
3f6126d
375551a
771f8eb
d668c17
d931a50
35e041a
136f9ef
c9e7606
1a99b5f
b6f4e81
02944f5
8c7128b
160bf90
058b0d2
4e1bc54
19ca623
9e0a3be
6ad8523
8d7daa7
c4e0e74
7f9255d
c5d044a
5b5b3a6
bbeedb2
c1dbf8c
4950318
1225a1a
98f5368
6bec531
fa0f93f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,7 +5,7 @@ | |
| from asyncio import Task, create_task, gather | ||
| from collections.abc import Awaitable, Callable | ||
| from dataclasses import replace | ||
| from datetime import UTC, datetime | ||
| from datetime import UTC, datetime, timedelta | ||
| from functools import wraps | ||
| import logging | ||
| from math import ceil | ||
|
|
@@ -80,6 +80,42 @@ | |
| _LOGGER = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| def _collect_records(data: str) -> dict[int, dict[int, tuple[datetime, int]]]: | ||
| """Collect logs from a cache data string.""" | ||
| logs: dict[int, dict[int, tuple[datetime, int]]] = {} | ||
| log_data = data.split("|") | ||
| for log_record in log_data: | ||
| log_fields = log_record.split(":") | ||
| if len(log_fields) == 4: | ||
| address = int(log_fields[0]) | ||
| slot = int(log_fields[1]) | ||
| pulses = int(log_fields[3]) | ||
| # Parse zero-padded timestamp, fallback to manual split | ||
| try: | ||
| timestamp = datetime.strptime( | ||
| log_fields[2], "%Y-%m-%d-%H-%M-%S" | ||
| ).replace(tzinfo=UTC) | ||
| except ValueError: | ||
| parts = log_fields[2].split("-") | ||
| if len(parts) != 6: | ||
| continue | ||
| timestamp = datetime( | ||
| year=int(parts[0]), | ||
| month=int(parts[1]), | ||
| day=int(parts[2]), | ||
| hour=int(parts[3]), | ||
| minute=int(parts[4]), | ||
| second=int(parts[5]), | ||
| tzinfo=UTC, | ||
| ) | ||
| bucket = logs.setdefault(address, {}) | ||
| # Keep the first occurrence (cache is newest-first), skip older duplicates | ||
| if slot not in bucket: | ||
| bucket[slot] = (timestamp, pulses) | ||
|
|
||
| return logs | ||
|
|
||
|
|
||
| def raise_calibration_missing(func: FuncT) -> FuncT: | ||
| """Validate energy calibration settings are available.""" | ||
|
|
||
|
|
@@ -381,7 +417,9 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09 | |
| return None | ||
|
|
||
| # Try collecting energy-stats for _current_log_address | ||
| result = await self.energy_log_update(self._current_log_address, save_cache=True) | ||
| result = await self.energy_log_update( | ||
| self._current_log_address, save_cache=True | ||
| ) | ||
| if not result: | ||
| _LOGGER.debug( | ||
| "async_energy_update | %s | Log rollover | energy_log_update from address %s failed", | ||
|
|
@@ -415,7 +453,9 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09 | |
| return self._energy_counters.energy_statistics | ||
|
|
||
| if len(missing_addresses) == 1: | ||
| result = await self.energy_log_update(missing_addresses[0], save_cache=True) | ||
| result = await self.energy_log_update( | ||
| missing_addresses[0], save_cache=True | ||
| ) | ||
| if result: | ||
| await self.power_update() | ||
| _LOGGER.debug( | ||
|
|
@@ -528,9 +568,10 @@ async def get_missing_energy_logs(self) -> None: | |
| if self._cache_enabled: | ||
| await self._energy_log_records_save_to_cache() | ||
|
|
||
| async def energy_log_update(self, address: int | None, save_cache: bool = True) -> bool: | ||
| async def energy_log_update( | ||
| self, address: int | None, save_cache: bool = True | ||
| ) -> bool: | ||
| """Request energy logs and return True only when at least one recent, non-empty record was stored; otherwise return False.""" | ||
bouwew marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| any_record_stored = False | ||
| if address is None: | ||
| return False | ||
|
|
||
|
|
@@ -553,6 +594,7 @@ async def energy_log_update(self, address: int | None, save_cache: bool = True) | |
| # Forward historical energy log information to energy counters | ||
| # Each response message contains 4 log counters (slots) of the | ||
| # energy pulses collected during the previous hour of given timestamp | ||
| cache_updated = False | ||
| for _slot in range(4, 0, -1): | ||
| log_timestamp, log_pulses = response.log_data[_slot] | ||
| _LOGGER.debug( | ||
|
|
@@ -567,34 +609,32 @@ async def energy_log_update(self, address: int | None, save_cache: bool = True) | |
| self._energy_counters.add_empty_log(response.log_address, _slot) | ||
| continue | ||
|
|
||
| await self._energy_log_record_update_state( | ||
| cache_updated = await self._energy_log_record_update_state( | ||
| response.log_address, | ||
| _slot, | ||
| log_timestamp.replace(tzinfo=UTC), | ||
| log_pulses, | ||
| import_only=True, | ||
| ) | ||
| any_record_stored = True | ||
|
|
||
| self._energy_counters.update() | ||
| if any_record_stored and self._cache_enabled and save_cache: | ||
| if cache_updated and save_cache: | ||
| _LOGGER.debug( | ||
| "Saving energy record update to cache for %s", self._mac_in_str | ||
| ) | ||
| await self.save_cache() | ||
|
|
||
| return any_record_stored | ||
| return True | ||
|
|
||
| def _check_timestamp_is_recent( | ||
| self, address: int, slot: int, timestamp: datetime | ||
| ) -> bool: | ||
| """Check if a log record timestamp is within the last MAX_LOG_HOURS hours.""" | ||
| age_seconds = max( | ||
| 0.0, | ||
| (datetime.now(tz=UTC) - timestamp.replace(tzinfo=UTC)).total_seconds() | ||
| 0.0, (datetime.now(tz=UTC) - timestamp.replace(tzinfo=UTC)).total_seconds() | ||
| ) | ||
| if age_seconds > MAX_LOG_HOURS * 3600: | ||
| _LOGGER.warning( | ||
| _LOGGER.info( | ||
| "EnergyLog from Node %s | address %s | slot %s | timestamp %s is outdated, ignoring...", | ||
| self._mac_in_str, | ||
| address, | ||
|
|
@@ -611,37 +651,29 @@ async def _energy_log_records_load_from_cache(self) -> bool: | |
| "Failed to restore energy log records from cache for node %s", self.name | ||
| ) | ||
| return False | ||
| restored_logs: dict[int, list[int]] = {} | ||
| if cache_data == "": | ||
| _LOGGER.debug("Cache-record is empty") | ||
| return False | ||
|
|
||
| log_data = cache_data.split("|") | ||
| for log_record in log_data: | ||
| log_fields = log_record.split(":") | ||
| if len(log_fields) == 4: | ||
| timestamp_energy_log = log_fields[2].split("-") | ||
| if len(timestamp_energy_log) == 6: | ||
| address = int(log_fields[0]) | ||
| slot = int(log_fields[1]) | ||
| self._energy_counters.add_pulse_log( | ||
| address=address, | ||
| slot=slot, | ||
| timestamp=datetime( | ||
| year=int(timestamp_energy_log[0]), | ||
| month=int(timestamp_energy_log[1]), | ||
| day=int(timestamp_energy_log[2]), | ||
| hour=int(timestamp_energy_log[3]), | ||
| minute=int(timestamp_energy_log[4]), | ||
| second=int(timestamp_energy_log[5]), | ||
| tzinfo=UTC, | ||
| ), | ||
| pulses=int(log_fields[3]), | ||
| import_only=True, | ||
| ) | ||
| if restored_logs.get(address) is None: | ||
| restored_logs[address] = [] | ||
| restored_logs[address].append(slot) | ||
| collected_logs = _collect_records(cache_data) | ||
|
|
||
| # Cutoff timestamp for filtering | ||
| skip_before = datetime.now(tz=UTC) - timedelta(hours=MAX_LOG_HOURS) | ||
|
|
||
| # Iterate in reverse sorted order directly | ||
| for address in sorted(collected_logs, reverse=True): | ||
| for slot in sorted(collected_logs[address].keys(), reverse=True): | ||
| (timestamp, pulses) = collected_logs[address][slot] | ||
| # Keep only recent entries; prune older-or-equal than cutoff | ||
| if timestamp <= skip_before: | ||
| continue | ||
| self._energy_counters.add_pulse_log( | ||
| address=address, | ||
| slot=slot, | ||
| pulses=pulses, | ||
| timestamp=timestamp, | ||
| import_only=True, | ||
| ) | ||
|
|
||
| self._energy_counters.update() | ||
|
|
||
|
|
@@ -670,19 +702,19 @@ async def _energy_log_records_save_to_cache(self) -> None: | |
| logs: dict[int, dict[int, PulseLogRecord]] = ( | ||
| self._energy_counters.get_pulse_logs() | ||
| ) | ||
| cached_logs = "" | ||
| for address in sorted(logs.keys(), reverse=True): | ||
| for slot in sorted(logs[address].keys(), reverse=True): | ||
| log = logs[address][slot] | ||
| if cached_logs != "": | ||
| cached_logs += "|" | ||
| cached_logs += f"{address}:{slot}:{log.timestamp.year}" | ||
| cached_logs += f"-{log.timestamp.month}-{log.timestamp.day}" | ||
| cached_logs += f"-{log.timestamp.hour}-{log.timestamp.minute}" | ||
| cached_logs += f"-{log.timestamp.second}:{log.pulses}" | ||
|
|
||
| # Efficiently serialize newest-first (logs is already sorted) | ||
| records: list[str] = [] | ||
| for address, record in logs.items(): | ||
| for slot, log in record.items(): | ||
| ts = log.timestamp | ||
| records.append( | ||
| f"{address}:{slot}:{ts.strftime('%Y-%m-%d-%H-%M-%S')}:{log.pulses}" | ||
| ) | ||
| cached_logs = "|".join(records) | ||
| _LOGGER.debug("Saving energy logrecords to cache for %s", self._mac_in_str) | ||
| self._set_cache(CACHE_ENERGY_COLLECTION, cached_logs) | ||
| # Persist new cache entries to disk immediately | ||
| await self.save_cache(trigger_only=True) | ||
|
|
||
| async def _energy_log_record_update_state( | ||
| self, | ||
|
|
@@ -699,21 +731,25 @@ async def _energy_log_record_update_state( | |
| if not self._cache_enabled: | ||
| return False | ||
|
|
||
| log_cache_record = f"{address}:{slot}:{timestamp.year}" | ||
| log_cache_record += f"-{timestamp.month}-{timestamp.day}" | ||
| log_cache_record += f"-{timestamp.hour}-{timestamp.minute}" | ||
| log_cache_record += f"-{timestamp.second}:{pulses}" | ||
| log_cache_record = ( | ||
| f"{address}:{slot}:{timestamp.strftime('%Y-%m-%d-%H-%M-%S')}:{pulses}" | ||
| ) | ||
| if (cached_logs := self._get_cache(CACHE_ENERGY_COLLECTION)) is not None: | ||
| if log_cache_record not in cached_logs: | ||
| entries = cached_logs.split("|") if cached_logs else [] | ||
| if log_cache_record not in entries: | ||
| _LOGGER.debug( | ||
| "Adding logrecord (%s, %s) to cache of %s", | ||
| str(address), | ||
| str(slot), | ||
| self._mac_in_str, | ||
| ) | ||
| self._set_cache( | ||
| CACHE_ENERGY_COLLECTION, cached_logs + "|" + log_cache_record | ||
| new_cache = ( | ||
| f"{log_cache_record}|{cached_logs}" | ||
| if cached_logs | ||
| else log_cache_record | ||
| ) | ||
| self._set_cache(CACHE_ENERGY_COLLECTION, new_cache) | ||
| await self.save_cache(trigger_only=True) | ||
| return True | ||
|
Comment on lines
737
to
753
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Prevent cache update races when multiple energy_log_update tasks run concurrently get_missing_energy_logs starts multiple energy_log_update coroutines concurrently; each calls _energy_log_record_update_state which reads-modifies-writes the shared cache string. Without synchronization, updates can be lost. Apply this diff to serialize cache mutations with an asyncio lock: - if (cached_logs := self._get_cache(CACHE_ENERGY_COLLECTION)) is not None:
- entries = cached_logs.split("|") if cached_logs else []
- if log_cache_record not in entries:
+ async with self._cache_lock:
+ cached_logs = self._get_cache(CACHE_ENERGY_COLLECTION)
+ if cached_logs is not None:
+ entries = cached_logs.split("|") if cached_logs else []
+ if log_cache_record not in entries:
_LOGGER.debug(
"Adding logrecord (%s, %s) to cache of %s",
str(address),
str(slot),
self._mac_in_str,
)
- new_cache = (
- f"{log_cache_record}|{cached_logs}"
- if cached_logs
- else log_cache_record
- )
- self._set_cache(CACHE_ENERGY_COLLECTION, new_cache)
- await self.save_cache(trigger_only=True)
- return True
- _LOGGER.debug(
- "Energy logrecord already present for %s, ignoring", self._mac_in_str
- )
- return False
+ new_cache = (
+ f"{log_cache_record}|{cached_logs}"
+ if cached_logs
+ else log_cache_record
+ )
+ self._set_cache(CACHE_ENERGY_COLLECTION, new_cache)
+ await self.save_cache(trigger_only=True)
+ return True
+ _LOGGER.debug(
+ "Energy logrecord already present for %s, ignoring",
+ self._mac_in_str,
+ )
+ return FalseAdditionally, define the lock once on the instance and import it: # imports (top of file)
from asyncio import Task, create_task, gather, Lock
# in PlugwiseCircle.__init__
self._cache_lock: Lock = Lock()Optional: For consistency, also guard the “cache is empty” path (lines 763–770) with the same lock and consider persisting immediately there as well. 🤖 Prompt for AI Agents |
||
|
|
||
| _LOGGER.debug( | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.