33import re
44import time
55
6- from typing import Any , Optional , Union
6+ from typing import Any
77
88from pylontech import Pylontech
9-
109from qtoggleserver .core import ports as core_ports
1110from qtoggleserver .lib import polled
1211
@@ -15,24 +14,24 @@ class ImprovedPylontech(Pylontech):
1514 CHUNK_SIZE = 1024
1615
1716 def __init__ (self , * args , ** kwargs ) -> None :
18- self ._buffer : bytes = b''
17+ self ._buffer : bytes = b""
1918 super ().__init__ (* args , ** kwargs )
2019
2120 def read_line (self ) -> bytes :
22- parts = re .split (rb' [\r\n]' , self ._buffer , maxsplit = 1 )
21+ parts = re .split (rb" [\r\n]" , self ._buffer , maxsplit = 1 )
2322 if len (parts ) > 1 :
2423 line , self ._buffer = parts
25- return line .strip (b' \xff ' ) + b' \n '
24+ return line .strip (b" \xff " ) + b" \n "
2625
2726 data = self .s .read (self .CHUNK_SIZE )
28- parts = re .split (rb' [\r\n]' , data , maxsplit = 1 )
27+ parts = re .split (rb" [\r\n]" , data , maxsplit = 1 )
2928 if len (parts ) > 1 :
3029 line , rest = parts
3130 self ._buffer += rest
32- return line .strip (b' \xff ' ) + b' \n '
31+ return line .strip (b" \xff " ) + b" \n "
3332 else :
3433 self ._buffer += data
35- return b''
34+ return b""
3635
3736 def read_frame (self ) -> Any :
3837 raw_frame = self .read_line ()
@@ -43,7 +42,7 @@ def read_frame(self) -> Any:
4342
4443class Battery (polled .PolledPeripheral ):
4544 DEFAULT_POLL_INTERVAL = 60
46- DEFAULT_SERIAL_PORT = ' /dev/ttyUSB0'
45+ DEFAULT_SERIAL_PORT = " /dev/ttyUSB0"
4746 DEFAULT_SERIAL_BAUD = 115200
4847 READ_RETRY_COUNT = 5
4948 READ_RETRY_SLEEP = 3
@@ -66,14 +65,14 @@ def __init__(
6665 self ._dev_ids : list [int ] = dev_ids
6766 self ._statuses_by_dev_id : dict [int , dict [str , Any ]] = {}
6867
69- async def make_port_args (self ) -> list [Union [ dict [str , Any ], type [core_ports .BasePort ] ]]:
70- from .ports import SocPort , TemperaturePort , CurrentPort , VoltagePort , PowerPort , CyclesPort
68+ async def make_port_args (self ) -> list [dict [str , Any ] | type [core_ports .BasePort ]]:
69+ from .ports import CurrentPort , CyclesPort , PowerPort , SocPort , TemperaturePort , VoltagePort
7170
7271 status_port_drivers = [SocPort , TemperaturePort , CurrentPort , VoltagePort , PowerPort , CyclesPort ]
7372 port_args = [
7473 {
75- ' driver' : driver ,
76- 'id' : driver .get_property (),
74+ " driver" : driver ,
75+ "id" : driver .get_property (),
7776 }
7877 for driver in status_port_drivers
7978 ]
@@ -98,41 +97,41 @@ def _poll_dev(self, dev_id: int) -> dict:
9897 raise
9998 else :
10099 self .warning (
101- ' reading values for device %s failed (retry=%d/%d)' ,
100+ " reading values for device %s failed (retry=%d/%d)" ,
102101 dev_id ,
103102 count + 1 ,
104- self .READ_RETRY_COUNT - 1
103+ self .READ_RETRY_COUNT - 1 ,
105104 )
106105 time .sleep (self .READ_RETRY_SLEEP )
107106 finally :
108107 pylontech .s .close ()
109108
110109 return {
111- ' timestamp' : int (time .time ()),
112- ' soc' : values .StateOfCharge ,
113- ' temperature' : values .AverageBMSTemperature ,
114- ' current' : values .Current ,
115- ' voltage' : values .Voltage ,
116- ' power' : values .Power ,
117- ' cycles' : values .CycleNumber ,
110+ " timestamp" : int (time .time ()),
111+ " soc" : values .StateOfCharge ,
112+ " temperature" : values .AverageBMSTemperature ,
113+ " current" : values .Current ,
114+ " voltage" : values .Voltage ,
115+ " power" : values .Power ,
116+ " cycles" : values .CycleNumber ,
118117 }
119118
120- def get_aggregated_status (self ) -> Optional [ dict [str , Any ]] :
119+ def get_aggregated_status (self ) -> dict [str , Any ] | None :
121120 if not self ._statuses_by_dev_id :
122121 return None
123122
124123 statuses = list (self ._statuses_by_dev_id .values ())
125124 agg_status = dict (statuses [0 ])
126125
127126 for status in statuses [1 :]:
128- agg_status [' timestamp' ] = min (agg_status [' timestamp' ], status [' timestamp' ])
129- agg_status [' soc' ] = min (agg_status [' soc' ], status [' soc' ])
130- agg_status [' temperature' ] = max (agg_status [' temperature' ], status [' temperature' ])
131- agg_status [' current' ] += status [' current' ]
132- agg_status [' voltage' ] += status [' voltage' ]
133- agg_status [' power' ] += status [' power' ]
134- agg_status [' cycles' ] = max (agg_status [' cycles' ], status [' cycles' ])
135-
136- agg_status [' voltage' ] /= len (statuses )
127+ agg_status [" timestamp" ] = min (agg_status [" timestamp" ], status [" timestamp" ])
128+ agg_status [" soc" ] = min (agg_status [" soc" ], status [" soc" ])
129+ agg_status [" temperature" ] = max (agg_status [" temperature" ], status [" temperature" ])
130+ agg_status [" current" ] += status [" current" ]
131+ agg_status [" voltage" ] += status [" voltage" ]
132+ agg_status [" power" ] += status [" power" ]
133+ agg_status [" cycles" ] = max (agg_status [" cycles" ], status [" cycles" ])
134+
135+ agg_status [" voltage" ] /= len (statuses )
137136
138137 return agg_status
0 commit comments