33import os
44import re
55import time
6+ from typing import TYPE_CHECKING
7+
68from pymodbus .exceptions import ModbusIOException
79
8- from .transport_base import transport_base
9- from ..protocol_settings import Data_Type , Registry_Type , registry_map_entry , protocol_settings
1010from defs .common import strtobool
1111
12- from typing import TYPE_CHECKING
12+ from ..protocol_settings import (
13+ Data_Type ,
14+ Registry_Type ,
15+ protocol_settings ,
16+ registry_map_entry ,
17+ )
18+ from .transport_base import transport_base
19+
1320if TYPE_CHECKING :
1421 from configparser import SectionProxy
1522 try :
@@ -22,7 +29,7 @@ class modbus_base(transport_base):
2229
2330
2431 #this is specifically static
25- clients : dict [str , ' BaseModbusClient' ] = {}
32+ clients : dict [str , " BaseModbusClient" ] = {}
2633 ''' str is identifier, dict of clients when multiple transports use the same ports '''
2734
2835 #non-static here for reference, type hinting, python bs ect...
@@ -42,27 +49,27 @@ class modbus_base(transport_base):
4249 send_holding_register : bool = True
4350 send_input_register : bool = True
4451
45- def __init__ (self , settings : ' SectionProxy' , protocolSettings : ' protocol_settings' = None ):
52+ def __init__ (self , settings : " SectionProxy" , protocolSettings : " protocol_settings" = None ):
4653 super ().__init__ (settings )
4754
48- self .analyze_protocol_enabled = settings .getboolean (' analyze_protocol' , fallback = self .analyze_protocol_enabled )
49- self .analyze_protocol_save_load = settings .getboolean (' analyze_protocol_save_load' , fallback = self .analyze_protocol_save_load )
55+ self .analyze_protocol_enabled = settings .getboolean (" analyze_protocol" , fallback = self .analyze_protocol_enabled )
56+ self .analyze_protocol_save_load = settings .getboolean (" analyze_protocol_save_load" , fallback = self .analyze_protocol_save_load )
5057
5158
5259 #get defaults from protocol settings
53- if ' send_input_register' in self .protocolSettings .settings :
54- self .send_input_register = strtobool (self .protocolSettings .settings [' send_input_register' ])
60+ if " send_input_register" in self .protocolSettings .settings :
61+ self .send_input_register = strtobool (self .protocolSettings .settings [" send_input_register" ])
5562
56- if ' send_holding_register' in self .protocolSettings .settings :
57- self .send_holding_register = strtobool (self .protocolSettings .settings [' send_holding_register' ])
63+ if " send_holding_register" in self .protocolSettings .settings :
64+ self .send_holding_register = strtobool (self .protocolSettings .settings [" send_holding_register" ])
5865
59- if ' batch_delay' in self .protocolSettings .settings :
60- self .modbus_delay = float (self .protocolSettings .settings [' batch_delay' ])
66+ if " batch_delay" in self .protocolSettings .settings :
67+ self .modbus_delay = float (self .protocolSettings .settings [" batch_delay" ])
6168
6269 #allow enable/disable of which registers to send
63- self .send_holding_register = settings .getboolean (' send_holding_register' , fallback = self .send_holding_register )
64- self .send_input_register = settings .getboolean (' send_input_register' , fallback = self .send_input_register )
65- self .modbus_delay = settings .getfloat ([' batch_delay' , ' modbus_delay' ], fallback = self .modbus_delay )
70+ self .send_holding_register = settings .getboolean (" send_holding_register" , fallback = self .send_holding_register )
71+ self .send_input_register = settings .getboolean (" send_input_register" , fallback = self .send_input_register )
72+ self .modbus_delay = settings .getfloat ([" batch_delay" , " modbus_delay" ], fallback = self .modbus_delay )
6673 self .modbus_delay_setting = self .modbus_delay
6774
6875
@@ -94,22 +101,22 @@ def read_serial_number(self) -> str:
94101
95102 sn2 = ""
96103 sn3 = ""
97- fields = [' Serial No 1' , ' Serial No 2' , ' Serial No 3' , ' Serial No 4' , ' Serial No 5' ]
104+ fields = [" Serial No 1" , " Serial No 2" , " Serial No 3" , " Serial No 4" , " Serial No 5" ]
98105 for field in fields :
99106 self ._log .info ("Reading " + field )
100107 registry_entry = self .protocolSettings .get_holding_registry_entry (field )
101108 if registry_entry is not None :
102109 self ._log .info ("Reading " + field + "(" + str (registry_entry .register )+ ")" )
103110 data = self .read_modbus_registers (registry_entry .register , registry_type = Registry_Type .HOLDING )
104- if not hasattr (data , ' registers' ) or data .registers is None :
111+ if not hasattr (data , " registers" ) or data .registers is None :
105112 self ._log .critical ("Failed to get serial number register (" + field + ") ; exiting" )
106113 exit ()
107114
108115 serial_number = serial_number + str (data .registers [0 ])
109116
110- data_bytes = data .registers [0 ].to_bytes ((data .registers [0 ].bit_length () + 7 ) // 8 , byteorder = ' big' )
111- sn2 = sn2 + str (data_bytes .decode (' utf-8' ))
112- sn3 = str (data_bytes .decode (' utf-8' )) + sn3
117+ data_bytes = data .registers [0 ].to_bytes ((data .registers [0 ].bit_length () + 7 ) // 8 , byteorder = " big" )
118+ sn2 = sn2 + str (data_bytes .decode (" utf-8" ))
119+ sn3 = str (data_bytes .decode (" utf-8" )) + sn3
113120
114121 time .sleep (self .modbus_delay * 2 ) #sleep inbetween requests so modbus can rest
115122
@@ -171,7 +178,7 @@ def read_data(self) -> dict[str, str]:
171178
172179 return info
173180
174- def validate_protocol (self , protocolSettings : ' protocol_settings' ) -> float :
181+ def validate_protocol (self , protocolSettings : " protocol_settings" ) -> float :
175182 score_percent = self .validate_registry (Registry_Type .HOLDING )
176183 return score_percent
177184
@@ -197,13 +204,13 @@ def validate_registry(self, registry_type : Registry_Type = Registry_Type.INPUT)
197204 self ._log .info ("validation score: " + str (score ) + " of " + str (maxScore ) + " : " + str (round (percent )) + "%" )
198205 return percent
199206
200- def analyze_protocol (self , settings_dir : str = ' protocols' ):
207+ def analyze_protocol (self , settings_dir : str = " protocols" ):
201208 print ("=== PROTOCOL ANALYZER ===" )
202209 protocol_names : list [str ] = []
203210 protocols : dict [str ,protocol_settings ] = {}
204211
205212 for file in glob .glob (settings_dir + "/*.json" ):
206- file = file .lower ().replace (settings_dir , '' ).replace ('/' , '' ).replace (' \\ ' , '' ).replace (' \\ ' , '' ).replace (' .json' , '' )
213+ file = file .lower ().replace (settings_dir , "" ).replace ("/" , "" ).replace (" \\ " , "" ).replace (" \\ " , "" ).replace (" .json" , "" )
207214 print (file )
208215 protocol_names .append (file )
209216
@@ -276,7 +283,7 @@ def analyze_protocol(self, settings_dir : str = 'protocols'):
276283 def evaluate_score (entry : registry_map_entry , val ):
277284 score = 0
278285 if entry .data_type == Data_Type .ASCII :
279- if val and not re .match (' [^a-zA-Z0-9_-]' , val ): #validate ascii
286+ if val and not re .match (" [^a-zA-Z0-9_-]" , val ): #validate ascii
280287 mod = 1
281288 if entry .concatenate :
282289 mod = len (entry .concatenate_registers )
@@ -357,10 +364,12 @@ def write_variable(self, entry : registry_map_entry, value : str, registry_type
357364 current_value = current_registers [entry .register ]
358365
359366 if not self .protocolSettings .validate_registry_entry (entry , current_value ):
360- raise ValueError (f"Invalid value in register '{ current_value } '. Unsafe to write" )
367+ err = f"Invalid value in register '{ current_value } '. Unsafe to write"
368+ raise ValueError (err )
361369
362370 if not self .protocolSettings .validate_registry_entry (entry , value ):
363- raise ValueError (f"Invalid new value, '{ value } '. Unsafe to write" )
371+ err = f"Invalid new value, '{ value } '. Unsafe to write"
372+ raise ValueError (err )
364373
365374 #handle codes
366375 if entry .variable_name + "_codes" in self .protocolSettings .codes :
@@ -410,7 +419,7 @@ def write_variable(self, entry : registry_map_entry, value : str, registry_type
410419 def read_variable (self , variable_name : str , registry_type : Registry_Type , entry : registry_map_entry = None ):
411420 ##clean for convinecne
412421 if variable_name :
413- variable_name = variable_name .strip ().lower ().replace (' ' , '_' )
422+ variable_name = variable_name .strip ().lower ().replace (" " , "_" )
414423
415424 registry_map = self .protocolSettings .get_registry_map (registry_type )
416425
@@ -476,7 +485,7 @@ def read_modbus_registers(self, ranges : list[tuple] = None, start : int = 0, en
476485
477486 if isinstance (register , bytes ) or register .isError () or isError : #sometimes weird errors are handled incorrectly and response is a ascii error string
478487 if isinstance (register , bytes ):
479- self ._log .error (register .decode (' utf-8' ))
488+ self ._log .error (register .decode (" utf-8" ))
480489 else :
481490 self ._log .error (register .__str__ )
482491 self .modbus_delay += self .modbus_delay_increament #increase delay, error is likely due to modbus being busy
0 commit comments