55from asyncio import Task , create_task , gather
66from collections .abc import Awaitable , Callable
77from dataclasses import replace
8- from datetime import UTC , datetime
8+ from datetime import UTC , datetime , timedelta
99from functools import wraps
1010import logging
1111from math import ceil
8080_LOGGER = logging .getLogger (__name__ )
8181
8282
83+ def _collect_records (data : str ) -> dict [int , dict [int , tuple [datetime , int ]]]:
84+ """Collect logs from a cache data string."""
85+ logs : dict [int , dict [int , tuple [datetime , int ]]] = {}
86+ log_data = data .split ("|" )
87+ for log_record in log_data :
88+ log_fields = log_record .split (":" )
89+ if len (log_fields ) == 4 :
90+ address = int (log_fields [0 ])
91+ slot = int (log_fields [1 ])
92+ pulses = int (log_fields [3 ])
93+ # Parse zero-padded timestamp, fallback to manual split
94+ try :
95+ timestamp = datetime .strptime (
96+ log_fields [2 ], "%Y-%m-%d-%H-%M-%S"
97+ ).replace (tzinfo = UTC )
98+ except ValueError :
99+ parts = log_fields [2 ].split ("-" )
100+ if len (parts ) != 6 :
101+ continue
102+ timestamp = datetime (
103+ year = int (parts [0 ]),
104+ month = int (parts [1 ]),
105+ day = int (parts [2 ]),
106+ hour = int (parts [3 ]),
107+ minute = int (parts [4 ]),
108+ second = int (parts [5 ]),
109+ tzinfo = UTC ,
110+ )
111+ bucket = logs .setdefault (address , {})
112+ # Keep the first occurrence (cache is newest-first), skip older duplicates
113+ if slot not in bucket :
114+ bucket [slot ] = (timestamp , pulses )
115+
116+ return logs
117+
118+
83119def raise_calibration_missing (func : FuncT ) -> FuncT :
84120 """Validate energy calibration settings are available."""
85121
@@ -381,7 +417,9 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09
381417 return None
382418
383419 # Try collecting energy-stats for _current_log_address
384- result = await self .energy_log_update (self ._current_log_address , save_cache = True )
420+ result = await self .energy_log_update (
421+ self ._current_log_address , save_cache = True
422+ )
385423 if not result :
386424 _LOGGER .debug (
387425 "async_energy_update | %s | Log rollover | energy_log_update from address %s failed" ,
@@ -415,7 +453,9 @@ async def energy_update(self) -> EnergyStatistics | None: # noqa: PLR0911 PLR09
415453 return self ._energy_counters .energy_statistics
416454
417455 if len (missing_addresses ) == 1 :
418- result = await self .energy_log_update (missing_addresses [0 ], save_cache = True )
456+ result = await self .energy_log_update (
457+ missing_addresses [0 ], save_cache = True
458+ )
419459 if result :
420460 await self .power_update ()
421461 _LOGGER .debug (
@@ -528,9 +568,10 @@ async def get_missing_energy_logs(self) -> None:
528568 if self ._cache_enabled :
529569 await self ._energy_log_records_save_to_cache ()
530570
531- async def energy_log_update (self , address : int | None , save_cache : bool = True ) -> bool :
571+ async def energy_log_update (
572+ self , address : int | None , save_cache : bool = True
573+ ) -> bool :
532574 """Request energy logs and return True only when at least one recent, non-empty record was stored; otherwise return False."""
533- any_record_stored = False
534575 if address is None :
535576 return False
536577
@@ -553,6 +594,7 @@ async def energy_log_update(self, address: int | None, save_cache: bool = True)
553594 # Forward historical energy log information to energy counters
554595 # Each response message contains 4 log counters (slots) of the
555596 # energy pulses collected during the previous hour of given timestamp
597+ cache_updated = False
556598 for _slot in range (4 , 0 , - 1 ):
557599 log_timestamp , log_pulses = response .log_data [_slot ]
558600 _LOGGER .debug (
@@ -567,34 +609,32 @@ async def energy_log_update(self, address: int | None, save_cache: bool = True)
567609 self ._energy_counters .add_empty_log (response .log_address , _slot )
568610 continue
569611
570- await self ._energy_log_record_update_state (
612+ cache_updated = await self ._energy_log_record_update_state (
571613 response .log_address ,
572614 _slot ,
573615 log_timestamp .replace (tzinfo = UTC ),
574616 log_pulses ,
575617 import_only = True ,
576618 )
577- any_record_stored = True
578619
579620 self ._energy_counters .update ()
580- if any_record_stored and self . _cache_enabled and save_cache :
621+ if cache_updated and save_cache :
581622 _LOGGER .debug (
582623 "Saving energy record update to cache for %s" , self ._mac_in_str
583624 )
584625 await self .save_cache ()
585626
586- return any_record_stored
627+ return True
587628
588629 def _check_timestamp_is_recent (
589630 self , address : int , slot : int , timestamp : datetime
590631 ) -> bool :
591632 """Check if a log record timestamp is within the last MAX_LOG_HOURS hours."""
592633 age_seconds = max (
593- 0.0 ,
594- (datetime .now (tz = UTC ) - timestamp .replace (tzinfo = UTC )).total_seconds ()
634+ 0.0 , (datetime .now (tz = UTC ) - timestamp .replace (tzinfo = UTC )).total_seconds ()
595635 )
596636 if age_seconds > MAX_LOG_HOURS * 3600 :
597- _LOGGER .warning (
637+ _LOGGER .info (
598638 "EnergyLog from Node %s | address %s | slot %s | timestamp %s is outdated, ignoring..." ,
599639 self ._mac_in_str ,
600640 address ,
@@ -611,37 +651,29 @@ async def _energy_log_records_load_from_cache(self) -> bool:
611651 "Failed to restore energy log records from cache for node %s" , self .name
612652 )
613653 return False
614- restored_logs : dict [int , list [int ]] = {}
615654 if cache_data == "" :
616655 _LOGGER .debug ("Cache-record is empty" )
617656 return False
618657
619- log_data = cache_data .split ("|" )
620- for log_record in log_data :
621- log_fields = log_record .split (":" )
622- if len (log_fields ) == 4 :
623- timestamp_energy_log = log_fields [2 ].split ("-" )
624- if len (timestamp_energy_log ) == 6 :
625- address = int (log_fields [0 ])
626- slot = int (log_fields [1 ])
627- self ._energy_counters .add_pulse_log (
628- address = address ,
629- slot = slot ,
630- timestamp = datetime (
631- year = int (timestamp_energy_log [0 ]),
632- month = int (timestamp_energy_log [1 ]),
633- day = int (timestamp_energy_log [2 ]),
634- hour = int (timestamp_energy_log [3 ]),
635- minute = int (timestamp_energy_log [4 ]),
636- second = int (timestamp_energy_log [5 ]),
637- tzinfo = UTC ,
638- ),
639- pulses = int (log_fields [3 ]),
640- import_only = True ,
641- )
642- if restored_logs .get (address ) is None :
643- restored_logs [address ] = []
644- restored_logs [address ].append (slot )
658+ collected_logs = _collect_records (cache_data )
659+
660+ # Cutoff timestamp for filtering
661+ skip_before = datetime .now (tz = UTC ) - timedelta (hours = MAX_LOG_HOURS )
662+
663+ # Iterate in reverse sorted order directly
664+ for address in sorted (collected_logs , reverse = True ):
665+ for slot in sorted (collected_logs [address ].keys (), reverse = True ):
666+ (timestamp , pulses ) = collected_logs [address ][slot ]
667+ # Keep only recent entries; prune older-or-equal than cutoff
668+ if timestamp <= skip_before :
669+ continue
670+ self ._energy_counters .add_pulse_log (
671+ address = address ,
672+ slot = slot ,
673+ pulses = pulses ,
674+ timestamp = timestamp ,
675+ import_only = True ,
676+ )
645677
646678 self ._energy_counters .update ()
647679
@@ -670,19 +702,19 @@ async def _energy_log_records_save_to_cache(self) -> None:
670702 logs : dict [int , dict [int , PulseLogRecord ]] = (
671703 self ._energy_counters .get_pulse_logs ()
672704 )
673- cached_logs = ""
674- for address in sorted (logs .keys (), reverse = True ):
675- for slot in sorted (logs [address ].keys (), reverse = True ):
676- log = logs [address ][slot ]
677- if cached_logs != "" :
678- cached_logs += "|"
679- cached_logs += f"{ address } :{ slot } :{ log .timestamp .year } "
680- cached_logs += f"-{ log .timestamp .month } -{ log .timestamp .day } "
681- cached_logs += f"-{ log .timestamp .hour } -{ log .timestamp .minute } "
682- cached_logs += f"-{ log .timestamp .second } :{ log .pulses } "
683-
705+ # Efficiently serialize newest-first (logs is already sorted)
706+ records : list [str ] = []
707+ for address , record in logs .items ():
708+ for slot , log in record .items ():
709+ ts = log .timestamp
710+ records .append (
711+ f"{ address } :{ slot } :{ ts .strftime ('%Y-%m-%d-%H-%M-%S' )} :{ log .pulses } "
712+ )
713+ cached_logs = "|" .join (records )
684714 _LOGGER .debug ("Saving energy logrecords to cache for %s" , self ._mac_in_str )
685715 self ._set_cache (CACHE_ENERGY_COLLECTION , cached_logs )
716+ # Persist new cache entries to disk immediately
717+ await self .save_cache (trigger_only = True )
686718
687719 async def _energy_log_record_update_state (
688720 self ,
@@ -699,21 +731,25 @@ async def _energy_log_record_update_state(
699731 if not self ._cache_enabled :
700732 return False
701733
702- log_cache_record = f"{ address } :{ slot } :{ timestamp .year } "
703- log_cache_record += f"-{ timestamp .month } -{ timestamp .day } "
704- log_cache_record += f"-{ timestamp .hour } -{ timestamp .minute } "
705- log_cache_record += f"-{ timestamp .second } :{ pulses } "
734+ log_cache_record = (
735+ f"{ address } :{ slot } :{ timestamp .strftime ('%Y-%m-%d-%H-%M-%S' )} :{ pulses } "
736+ )
706737 if (cached_logs := self ._get_cache (CACHE_ENERGY_COLLECTION )) is not None :
707- if log_cache_record not in cached_logs :
738+ entries = cached_logs .split ("|" ) if cached_logs else []
739+ if log_cache_record not in entries :
708740 _LOGGER .debug (
709741 "Adding logrecord (%s, %s) to cache of %s" ,
710742 str (address ),
711743 str (slot ),
712744 self ._mac_in_str ,
713745 )
714- self ._set_cache (
715- CACHE_ENERGY_COLLECTION , cached_logs + "|" + log_cache_record
746+ new_cache = (
747+ f"{ log_cache_record } |{ cached_logs } "
748+ if cached_logs
749+ else log_cache_record
716750 )
751+ self ._set_cache (CACHE_ENERGY_COLLECTION , new_cache )
752+ await self .save_cache (trigger_only = True )
717753 return True
718754
719755 _LOGGER .debug (
0 commit comments