Skip to content

Commit bdf8812

Browse files
committed
per serial port locking
1 parent 3036753 commit bdf8812

File tree

2 files changed

+118
-63
lines changed

2 files changed

+118
-63
lines changed

classes/transports/modbus_base.py

Lines changed: 85 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import os
44
import re
55
import time
6+
import threading
67
from typing import TYPE_CHECKING
78
from dataclasses import dataclass
89
from datetime import datetime, timedelta
@@ -75,9 +76,15 @@ def get_remaining_disable_time(self) -> float:
7576
class modbus_base(transport_base):
7677

7778

78-
#this is specifically static
79+
#this is specifically static
7980
clients : dict[str, "BaseModbusClient"] = {}
8081
''' str is identifier, dict of clients when multiple transports use the same ports '''
82+
83+
# Thread safety for client access - port-specific locks
84+
_client_locks : dict[str, threading.Lock] = {}
85+
''' Port-specific locks for protecting client access '''
86+
_clients_lock : threading.Lock = threading.Lock()
87+
''' Lock for protecting client dictionary access '''
8188

8289
#non-static here for reference, type hinting, python bs ect...
8390
modbus_delay_increament : float = 0.05
@@ -88,6 +95,10 @@ class modbus_base(transport_base):
8895

8996
modbus_delay : float = 0.85
9097
'''time inbetween requests'''
98+
99+
# Instance-specific delay to prevent timing conflicts between transports
100+
instance_delay_offset : float = 0.0
101+
''' Additional delay offset for this specific transport instance '''
91102

92103
analyze_protocol_enabled : bool = False
93104
analyze_protocol_save_load : bool = False
@@ -101,9 +112,20 @@ class modbus_base(transport_base):
101112
enable_register_failure_tracking: bool = True
102113
max_failures_before_disable: int = 5
103114
disable_duration_hours: int = 12
115+
116+
# Instance-specific lock for this transport
117+
_transport_lock : threading.Lock = None
118+
''' Lock for protecting this transport's operations '''
119+
120+
# Port identifier for this transport
121+
_port_identifier : str = None
122+
''' Port identifier for this transport instance '''
104123

105124
def __init__(self, settings : "SectionProxy", protocolSettings : "protocol_settings" = None):
106125
super().__init__(settings)
126+
127+
# Initialize transport-specific lock
128+
self._transport_lock = threading.Lock()
107129

108130
self.analyze_protocol_enabled = settings.getboolean("analyze_protocol", fallback=self.analyze_protocol_enabled)
109131
self.analyze_protocol_save_load = settings.getboolean("analyze_protocol_save_load", fallback=self.analyze_protocol_save_load)
@@ -128,9 +150,24 @@ def __init__(self, settings : "SectionProxy", protocolSettings : "protocol_setti
128150
self.send_input_register = settings.getboolean("send_input_register", fallback=self.send_input_register)
129151
self.modbus_delay = settings.getfloat(["batch_delay", "modbus_delay"], fallback=self.modbus_delay)
130152
self.modbus_delay_setting = self.modbus_delay
153+
154+
# Get instance-specific delay offset to prevent timing conflicts
155+
self.instance_delay_offset = settings.getfloat("instance_delay_offset", fallback=self.instance_delay_offset)
131156

132157
# Note: Connection and analyze_protocol will be called after subclass initialization is complete
133158

159+
def _get_port_lock(self) -> threading.Lock:
160+
"""Get or create a port-specific lock for this transport"""
161+
if self._port_identifier is None:
162+
# Default to transport name if no port identifier is set
163+
self._port_identifier = self.transport_name
164+
165+
with self._clients_lock:
166+
if self._port_identifier not in self._client_locks:
167+
self._client_locks[self._port_identifier] = threading.Lock()
168+
169+
return self._client_locks[self._port_identifier]
170+
134171
def _get_register_range_key(self, register_range: tuple[int, int], registry_type: Registry_Type) -> str:
135172
"""Generate a unique key for a register range"""
136173
return f"{registry_type.name}_{register_range[0]}_{register_range[1]}"
@@ -367,45 +404,47 @@ def write_data(self, data : dict[str, str], from_transport : transport_base) ->
367404
time.sleep(self.modbus_delay) #sleep inbetween requests so modbus can rest
368405

369406
def read_data(self) -> dict[str, str]:
370-
info = {}
371-
#modbus - only read input/holding registries
372-
for registry_type in (Registry_Type.INPUT, Registry_Type.HOLDING):
407+
# Use transport lock to prevent concurrent access to this transport
408+
with self._transport_lock:
409+
info = {}
410+
#modbus - only read input/holding registries
411+
for registry_type in (Registry_Type.INPUT, Registry_Type.HOLDING):
412+
413+
#enable / disable input/holding register
414+
if registry_type == Registry_Type.INPUT and not self.send_input_register:
415+
continue
373416

374-
#enable / disable input/holding register
375-
if registry_type == Registry_Type.INPUT and not self.send_input_register:
376-
continue
417+
if registry_type == Registry_Type.HOLDING and not self.send_holding_register:
418+
continue
377419

378-
if registry_type == Registry_Type.HOLDING and not self.send_holding_register:
379-
continue
420+
#calculate ranges dynamically -- for variable read timing
421+
ranges = self.protocolSettings.calculate_registry_ranges(self.protocolSettings.registry_map[registry_type],
422+
self.protocolSettings.registry_map_size[registry_type],
423+
timestamp=self.last_read_time)
380424

381-
#calculate ranges dynamically -- for variable read timing
382-
ranges = self.protocolSettings.calculate_registry_ranges(self.protocolSettings.registry_map[registry_type],
383-
self.protocolSettings.registry_map_size[registry_type],
384-
timestamp=self.last_read_time)
425+
registry = self.read_modbus_registers(ranges=ranges, registry_type=registry_type)
426+
new_info = self.protocolSettings.process_registery(registry, self.protocolSettings.get_registry_map(registry_type))
385427

386-
registry = self.read_modbus_registers(ranges=ranges, registry_type=registry_type)
387-
new_info = self.protocolSettings.process_registery(registry, self.protocolSettings.get_registry_map(registry_type))
428+
if False:
429+
new_info = {self.__input_register_prefix + key: value for key, value in new_info.items()}
388430

389-
if False:
390-
new_info = {self.__input_register_prefix + key: value for key, value in new_info.items()}
431+
info.update(new_info)
391432

392-
info.update(new_info)
433+
if not info:
434+
self._log.info("Register is Empty; transport busy?")
393435

394-
if not info:
395-
self._log.info("Register is Empty; transport busy?")
436+
# Log disabled ranges status periodically (every 10 minutes)
437+
if self.enable_register_failure_tracking and hasattr(self, '_last_disabled_status_log') and time.time() - self._last_disabled_status_log > 600:
438+
disabled_ranges = self._get_disabled_ranges_info()
439+
if disabled_ranges:
440+
self._log.info(f"Currently disabled register ranges: {len(disabled_ranges)}")
441+
for range_info in disabled_ranges:
442+
self._log.info(f" - {range_info}")
443+
self._last_disabled_status_log = time.time()
444+
elif not hasattr(self, '_last_disabled_status_log'):
445+
self._last_disabled_status_log = time.time()
396446

397-
# Log disabled ranges status periodically (every 10 minutes)
398-
if self.enable_register_failure_tracking and hasattr(self, '_last_disabled_status_log') and time.time() - self._last_disabled_status_log > 600:
399-
disabled_ranges = self._get_disabled_ranges_info()
400-
if disabled_ranges:
401-
self._log.info(f"Currently disabled register ranges: {len(disabled_ranges)}")
402-
for range_info in disabled_ranges:
403-
self._log.info(f" - {range_info}")
404-
self._last_disabled_status_log = time.time()
405-
elif not hasattr(self, '_last_disabled_status_log'):
406-
self._last_disabled_status_log = time.time()
407-
408-
return info
447+
return info
409448

410449
def validate_protocol(self, protocolSettings : "protocol_settings") -> float:
411450
score_percent = self.validate_registry(Registry_Type.HOLDING)
@@ -762,19 +801,24 @@ def read_modbus_registers(self, ranges : list[tuple] = None, start : int = 0, en
762801
continue
763802

764803
self._log.info("get registers ("+str(index)+"): " +str(registry_type)+ " - " + str(range[0]) + " to " + str(range[0]+range[1]-1) + " ("+str(range[1])+")")
765-
time.sleep(self.modbus_delay) #sleep for 1ms to give bus a rest #manual recommends 1s between commands
804+
# Sleep with instance-specific offset to prevent timing conflicts between transports
805+
total_delay = self.modbus_delay + self.instance_delay_offset
806+
time.sleep(total_delay) #sleep to give bus a rest #manual recommends 1s between commands
766807

767808
isError = False
768809
register = None # Initialize register variable
769-
try:
770-
register = self.read_registers(range[0], range[1], registry_type=registry_type)
771-
772-
except ModbusIOException as e:
773-
self._log.error("ModbusIOException: " + str(e))
774-
# In pymodbus 3.7+, ModbusIOException doesn't have error_code attribute
775-
# Treat all ModbusIOException as retryable errors
776-
isError = True
810+
811+
# Use port-specific lock to prevent concurrent access to the same port
812+
port_lock = self._get_port_lock()
813+
with port_lock:
814+
try:
815+
register = self.read_registers(range[0], range[1], registry_type=registry_type)
777816

817+
except ModbusIOException as e:
818+
self._log.error("ModbusIOException: " + str(e))
819+
# In pymodbus 3.7+, ModbusIOException doesn't have error_code attribute
820+
# Treat all ModbusIOException as retryable errors
821+
isError = True
778822

779823
if register is None or isinstance(register, bytes) or (hasattr(register, 'isError') and register.isError()) or isError: #sometimes weird errors are handled incorrectly and response is a ascii error string
780824
if register is None:

classes/transports/modbus_rtu.py

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings
3535
raise ValueError("Port is not valid / not found")
3636

3737
print("Serial Port : " + self.port + " = ", get_usb_serial_port_info(self.port)) #print for config convience
38+
39+
# Set port identifier for port-specific locking
40+
self._port_identifier = self.port
3841

3942
if "baud" in self.protocolSettings.settings:
4043
self.baudrate = strtoint(self.protocolSettings.settings["baud"])
@@ -54,26 +57,28 @@ def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings
5457

5558
client_str = self.port+"("+str(self.baudrate)+")"
5659

57-
if client_str in modbus_base.clients:
58-
self.client = modbus_base.clients[client_str]
59-
return
60+
# Use client lock to prevent concurrent access to the client dictionary
61+
with self._clients_lock:
62+
if client_str in modbus_base.clients:
63+
self.client = modbus_base.clients[client_str]
64+
return
6065

6166
self._log.debug(f"Creating new client with baud rate: {self.baudrate}")
6267

63-
if "method" in init_signature.parameters:
64-
self.client = ModbusSerialClient(method="rtu", port=self.port,
65-
baudrate=int(self.baudrate),
66-
stopbits=1, parity="N", bytesize=8, timeout=2
67-
)
68-
else:
69-
self.client = ModbusSerialClient(
70-
port=self.port,
71-
baudrate=int(self.baudrate),
72-
stopbits=1, parity="N", bytesize=8, timeout=2
73-
)
74-
75-
#add to clients
76-
modbus_base.clients[client_str] = self.client
68+
if "method" in init_signature.parameters:
69+
self.client = ModbusSerialClient(method="rtu", port=self.port,
70+
baudrate=int(self.baudrate),
71+
stopbits=1, parity="N", bytesize=8, timeout=2
72+
)
73+
else:
74+
self.client = ModbusSerialClient(
75+
port=self.port,
76+
baudrate=int(self.baudrate),
77+
stopbits=1, parity="N", bytesize=8, timeout=2
78+
)
79+
80+
#add to clients
81+
modbus_base.clients[client_str] = self.client
7782

7883
def read_registers(self, start, count=1, registry_type : Registry_Type = Registry_Type.INPUT, **kwargs):
7984

@@ -84,10 +89,13 @@ def read_registers(self, start, count=1, registry_type : Registry_Type = Registr
8489
if self.pymodbus_slave_arg != "unit":
8590
kwargs["slave"] = kwargs.pop("unit")
8691

87-
if registry_type == Registry_Type.INPUT:
88-
return self.client.read_input_registers(address=start, count=count, **kwargs)
89-
elif registry_type == Registry_Type.HOLDING:
90-
return self.client.read_holding_registers(address=start, count=count, **kwargs)
92+
# Use port-specific lock to prevent concurrent access to the same port
93+
port_lock = self._get_port_lock()
94+
with port_lock:
95+
if registry_type == Registry_Type.INPUT:
96+
return self.client.read_input_registers(address=start, count=count, **kwargs)
97+
elif registry_type == Registry_Type.HOLDING:
98+
return self.client.read_holding_registers(address=start, count=count, **kwargs)
9199

92100
def write_register(self, register : int, value : int, **kwargs):
93101
if not self.write_enabled:
@@ -100,7 +108,10 @@ def write_register(self, register : int, value : int, **kwargs):
100108
if self.pymodbus_slave_arg != "unit":
101109
kwargs["slave"] = kwargs.pop("unit")
102110

103-
self.client.write_register(register, value, **kwargs) #function code 0x06 writes to holding register
111+
# Use port-specific lock to prevent concurrent access to the same port
112+
port_lock = self._get_port_lock()
113+
with port_lock:
114+
self.client.write_register(register, value, **kwargs) #function code 0x06 writes to holding register
104115

105116
def connect(self):
106117
self.connected = self.client.connect()

0 commit comments

Comments
 (0)