Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 135 additions & 131 deletions classes/protocol_settings.py

Large diffs are not rendered by default.

46 changes: 24 additions & 22 deletions classes/transports/canbus.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
import re
import time
import can
import asyncio
import threading
import platform
import os
import platform
import re
import threading
import time
from collections import OrderedDict
from typing import TYPE_CHECKING

import can

from defs.common import strtoint

from ..protocol_settings import Registry_Type, protocol_settings, registry_map_entry
from .transport_base import transport_base
from ..protocol_settings import Registry_Type, registry_map_entry, protocol_settings
from defs.common import strtoint
from collections import OrderedDict

from typing import TYPE_CHECKING
if TYPE_CHECKING:
from configparser import SectionProxy

class canbus(transport_base):
''' canbus is a more passive protocol; todo to include active commands to trigger canbus responses '''

interface : str = 'socketcan'
interface : str = "socketcan"
''' bustype / interface for canbus device '''

port : str = ''
port : str = ""
''' 'can0' '''

baudrate : int = 500000
Expand Down Expand Up @@ -55,11 +55,11 @@ class canbus(transport_base):
linux : bool = True


def __init__(self, settings : 'SectionProxy', protocolSettings : 'protocol_settings' = None):
def __init__(self, settings : "SectionProxy", protocolSettings : "protocol_settings" = None):
super().__init__(settings, protocolSettings=protocolSettings)

#check if running on windows or linux
self.linux = platform.system() != 'Windows'
self.linux = platform.system() != "Windows"


self.port = settings.get(["port", "channel"], "")
Expand Down Expand Up @@ -107,6 +107,7 @@ def setup_socketcan(self):
print("socketcan setup not implemented for windows")
return

# ruff: noqa: S605, S607
self._log.info("restart and configure socketcan")
os.system("ip link set can0 down")
os.system("ip link set can0 type can restart-ms 100")
Expand All @@ -118,11 +119,12 @@ def is_socketcan_up(self) -> bool:
return True

try:
with open(f'/sys/class/net/{self.port}/operstate', 'r') as f:
with open(f"/sys/class/net/{self.port}/operstate", "r") as f:
state = f.read().strip()
return state == 'up'
except FileNotFoundError:
return False
else:
return state == "up"

def start_loop(self):
self.read_bus(self.bus)
Expand Down Expand Up @@ -182,30 +184,30 @@ def init_after_connect(self):

def read_serial_number(self) -> str:
''' not so simple in canbus'''
return ''
return ""
serial_number = str(self.read_variable("Serial Number", Registry_Type.HOLDING))
print("read SN: " +serial_number)
if serial_number:
return serial_number

sn2 = ""
sn3 = ""
fields = ['Serial No 1', 'Serial No 2', 'Serial No 3', 'Serial No 4', 'Serial No 5']
fields = ["Serial No 1", "Serial No 2", "Serial No 3", "Serial No 4", "Serial No 5"]
for field in fields:
self._log.info("Reading " + field)
registry_entry = self.protocolSettings.get_holding_registry_entry(field)
if registry_entry is not None:
self._log.info("Reading " + field + "("+str(registry_entry.register)+")")
data = self.read_modbus_registers(registry_entry.register, registry_type=Registry_Type.HOLDING)
if not hasattr(data, 'registers') or data.registers is None:
if not hasattr(data, "registers") or data.registers is None:
self._log.critical("Failed to get serial number register ("+field+") ; exiting")
exit()

serial_number = serial_number + str(data.registers[0])

data_bytes = data.registers[0].to_bytes((data.registers[0].bit_length() + 7) // 8, byteorder='big')
sn2 = sn2 + str(data_bytes.decode('utf-8'))
sn3 = str(data_bytes.decode('utf-8')) + sn3
data_bytes = data.registers[0].to_bytes((data.registers[0].bit_length() + 7) // 8, byteorder="big")
sn2 = sn2 + str(data_bytes.decode("utf-8"))
sn3 = str(data_bytes.decode("utf-8")) + sn3

time.sleep(self.modbus_delay*2) #sleep inbetween requests so modbus can rest

Expand Down Expand Up @@ -257,7 +259,7 @@ def read_variable(self, variable_name : str, registry_type : Registry_Type, entr
''' read's variable from cache'''
##clean for convinecne
if variable_name:
variable_name = variable_name.strip().lower().replace(' ', '_')
variable_name = variable_name.strip().lower().replace(" ", "_")

registry_map = self.protocolSettings.get_registry_map(registry_type)

Expand Down
67 changes: 38 additions & 29 deletions classes/transports/modbus_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,20 @@
import os
import re
import time
from typing import TYPE_CHECKING

from pymodbus.exceptions import ModbusIOException

from .transport_base import transport_base
from ..protocol_settings import Data_Type, Registry_Type, registry_map_entry, protocol_settings
from defs.common import strtobool

from typing import TYPE_CHECKING
from ..protocol_settings import (
Data_Type,
Registry_Type,
protocol_settings,
registry_map_entry,
)
from .transport_base import transport_base

if TYPE_CHECKING:
from configparser import SectionProxy
try:
Expand All @@ -22,7 +29,7 @@ class modbus_base(transport_base):


#this is specifically static
clients : dict[str, 'BaseModbusClient'] = {}
clients : dict[str, "BaseModbusClient"] = {}
''' str is identifier, dict of clients when multiple transports use the same ports '''

#non-static here for reference, type hinting, python bs ect...
Expand All @@ -42,27 +49,27 @@ class modbus_base(transport_base):
send_holding_register : bool = True
send_input_register : bool = True

def __init__(self, settings : 'SectionProxy', protocolSettings : 'protocol_settings' = None):
def __init__(self, settings : "SectionProxy", protocolSettings : "protocol_settings" = None):
super().__init__(settings)

self.analyze_protocol_enabled = settings.getboolean('analyze_protocol', fallback=self.analyze_protocol_enabled)
self.analyze_protocol_save_load = settings.getboolean('analyze_protocol_save_load', fallback=self.analyze_protocol_save_load)
self.analyze_protocol_enabled = settings.getboolean("analyze_protocol", fallback=self.analyze_protocol_enabled)
self.analyze_protocol_save_load = settings.getboolean("analyze_protocol_save_load", fallback=self.analyze_protocol_save_load)


#get defaults from protocol settings
if 'send_input_register' in self.protocolSettings.settings:
self.send_input_register = strtobool(self.protocolSettings.settings['send_input_register'])
if "send_input_register" in self.protocolSettings.settings:
self.send_input_register = strtobool(self.protocolSettings.settings["send_input_register"])

if 'send_holding_register' in self.protocolSettings.settings:
self.send_holding_register = strtobool(self.protocolSettings.settings['send_holding_register'])
if "send_holding_register" in self.protocolSettings.settings:
self.send_holding_register = strtobool(self.protocolSettings.settings["send_holding_register"])

if 'batch_delay' in self.protocolSettings.settings:
self.modbus_delay = float(self.protocolSettings.settings['batch_delay'])
if "batch_delay" in self.protocolSettings.settings:
self.modbus_delay = float(self.protocolSettings.settings["batch_delay"])

#allow enable/disable of which registers to send
self.send_holding_register = settings.getboolean('send_holding_register', fallback=self.send_holding_register)
self.send_input_register = settings.getboolean('send_input_register', fallback=self.send_input_register)
self.modbus_delay = settings.getfloat(['batch_delay', 'modbus_delay'], fallback=self.modbus_delay)
self.send_holding_register = settings.getboolean("send_holding_register", fallback=self.send_holding_register)
self.send_input_register = settings.getboolean("send_input_register", fallback=self.send_input_register)
self.modbus_delay = settings.getfloat(["batch_delay", "modbus_delay"], fallback=self.modbus_delay)
self.modbus_delay_setting = self.modbus_delay


Expand Down Expand Up @@ -94,22 +101,22 @@ def read_serial_number(self) -> str:

sn2 = ""
sn3 = ""
fields = ['Serial No 1', 'Serial No 2', 'Serial No 3', 'Serial No 4', 'Serial No 5']
fields = ["Serial No 1", "Serial No 2", "Serial No 3", "Serial No 4", "Serial No 5"]
for field in fields:
self._log.info("Reading " + field)
registry_entry = self.protocolSettings.get_holding_registry_entry(field)
if registry_entry is not None:
self._log.info("Reading " + field + "("+str(registry_entry.register)+")")
data = self.read_modbus_registers(registry_entry.register, registry_type=Registry_Type.HOLDING)
if not hasattr(data, 'registers') or data.registers is None:
if not hasattr(data, "registers") or data.registers is None:
self._log.critical("Failed to get serial number register ("+field+") ; exiting")
exit()

serial_number = serial_number + str(data.registers[0])

data_bytes = data.registers[0].to_bytes((data.registers[0].bit_length() + 7) // 8, byteorder='big')
sn2 = sn2 + str(data_bytes.decode('utf-8'))
sn3 = str(data_bytes.decode('utf-8')) + sn3
data_bytes = data.registers[0].to_bytes((data.registers[0].bit_length() + 7) // 8, byteorder="big")
sn2 = sn2 + str(data_bytes.decode("utf-8"))
sn3 = str(data_bytes.decode("utf-8")) + sn3

time.sleep(self.modbus_delay*2) #sleep inbetween requests so modbus can rest

Expand Down Expand Up @@ -171,7 +178,7 @@ def read_data(self) -> dict[str, str]:

return info

def validate_protocol(self, protocolSettings : 'protocol_settings') -> float:
def validate_protocol(self, protocolSettings : "protocol_settings") -> float:
score_percent = self.validate_registry(Registry_Type.HOLDING)
return score_percent

Expand All @@ -197,13 +204,13 @@ def validate_registry(self, registry_type : Registry_Type = Registry_Type.INPUT)
self._log.info("validation score: " + str(score) + " of " + str(maxScore) + " : " + str(round(percent)) + "%")
return percent

def analyze_protocol(self, settings_dir : str = 'protocols'):
def analyze_protocol(self, settings_dir : str = "protocols"):
print("=== PROTOCOL ANALYZER ===")
protocol_names : list[str] = []
protocols : dict[str,protocol_settings] = {}

for file in glob.glob(settings_dir + "/*.json"):
file = file.lower().replace(settings_dir, '').replace('/', '').replace('\\', '').replace('\\', '').replace('.json', '')
file = file.lower().replace(settings_dir, "").replace("/", "").replace("\\", "").replace("\\", "").replace(".json", "")
print(file)
protocol_names.append(file)

Expand Down Expand Up @@ -276,7 +283,7 @@ def analyze_protocol(self, settings_dir : str = 'protocols'):
def evaluate_score(entry : registry_map_entry, val):
score = 0
if entry.data_type == Data_Type.ASCII:
if val and not re.match('[^a-zA-Z0-9_-]', val): #validate ascii
if val and not re.match("[^a-zA-Z0-9_-]", val): #validate ascii
mod = 1
if entry.concatenate:
mod = len(entry.concatenate_registers)
Expand Down Expand Up @@ -357,10 +364,12 @@ def write_variable(self, entry : registry_map_entry, value : str, registry_type
current_value = current_registers[entry.register]

if not self.protocolSettings.validate_registry_entry(entry, current_value):
raise ValueError(f"Invalid value in register '{current_value}'. Unsafe to write")
err = f"Invalid value in register '{current_value}'. Unsafe to write"
raise ValueError(err)

if not self.protocolSettings.validate_registry_entry(entry, value):
raise ValueError(f"Invalid new value, '{value}'. Unsafe to write")
err = f"Invalid new value, '{value}'. Unsafe to write"
raise ValueError(err)

#handle codes
if entry.variable_name+"_codes" in self.protocolSettings.codes:
Expand Down Expand Up @@ -410,7 +419,7 @@ def write_variable(self, entry : registry_map_entry, value : str, registry_type
def read_variable(self, variable_name : str, registry_type : Registry_Type, entry : registry_map_entry = None):
##clean for convinecne
if variable_name:
variable_name = variable_name.strip().lower().replace(' ', '_')
variable_name = variable_name.strip().lower().replace(" ", "_")

registry_map = self.protocolSettings.get_registry_map(registry_type)

Expand Down Expand Up @@ -476,7 +485,7 @@ def read_modbus_registers(self, ranges : list[tuple] = None, start : int = 0, en

if isinstance(register, bytes) or register.isError() or isError: #sometimes weird errors are handled incorrectly and response is a ascii error string
if isinstance(register, bytes):
self._log.error(register.decode('utf-8'))
self._log.error(register.decode("utf-8"))
else:
self._log.error(register.__str__)
self.modbus_delay += self.modbus_delay_increament #increase delay, error is likely due to modbus being busy
Expand Down
38 changes: 20 additions & 18 deletions classes/transports/modbus_rtu.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
from classes.protocol_settings import Registry_Type, protocol_settings

import inspect

from classes.protocol_settings import Registry_Type, protocol_settings

try:
from pymodbus.client.sync import ModbusSerialClient
except ImportError:
from pymodbus.client import ModbusSerialClient


from .modbus_base import modbus_base
from configparser import SectionProxy

from defs.common import find_usb_serial_port, get_usb_serial_port_info, strtoint

from .modbus_base import modbus_base


class modbus_rtu(modbus_base):
port : str = "/dev/ttyUSB0"
addresses : list[int] = []
baudrate : int = 9600
client : ModbusSerialClient

pymodbus_slave_arg = 'unit'
pymodbus_slave_arg = "unit"

def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings = None):
super().__init__(settings, protocolSettings=protocolSettings)
Expand All @@ -43,8 +45,8 @@ def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings
self.addresses = [address]

# pymodbus compatability; unit was renamed to address
if 'slave' in inspect.signature(ModbusSerialClient.read_holding_registers).parameters:
self.pymodbus_slave_arg = 'slave'
if "slave" in inspect.signature(ModbusSerialClient.read_holding_registers).parameters:
self.pymodbus_slave_arg = "slave"


# Get the signature of the __init__ method
Expand All @@ -56,29 +58,29 @@ def __init__(self, settings : SectionProxy, protocolSettings : protocol_settings
self.client = modbus_base.clients[client_str]
return

if 'method' in init_signature.parameters:
self.client = ModbusSerialClient(method='rtu', port=self.port,
if "method" in init_signature.parameters:
self.client = ModbusSerialClient(method="rtu", port=self.port,
baudrate=int(self.baudrate),
stopbits=1, parity='N', bytesize=8, timeout=2
stopbits=1, parity="N", bytesize=8, timeout=2
)
else:
self.client = ModbusSerialClient(
port=self.port,
baudrate=int(self.baudrate),
stopbits=1, parity='N', bytesize=8, timeout=2
stopbits=1, parity="N", bytesize=8, timeout=2
)

#add to clients
modbus_base.clients[client_str] = self.client

def read_registers(self, start, count=1, registry_type : Registry_Type = Registry_Type.INPUT, **kwargs):

if 'unit' not in kwargs:
kwargs = {'unit': int(self.addresses[0]), **kwargs}
if "unit" not in kwargs:
kwargs = {"unit": int(self.addresses[0]), **kwargs}

#compatability
if self.pymodbus_slave_arg != 'unit':
kwargs['slave'] = kwargs.pop('unit')
if self.pymodbus_slave_arg != "unit":
kwargs["slave"] = kwargs.pop("unit")

if registry_type == Registry_Type.INPUT:
return self.client.read_input_registers(address=start, count=count, **kwargs)
Expand All @@ -89,12 +91,12 @@ def write_register(self, register : int, value : int, **kwargs):
if not self.write_enabled:
return

if 'unit' not in kwargs:
kwargs = {'unit': self.addresses[0], **kwargs}
if "unit" not in kwargs:
kwargs = {"unit": self.addresses[0], **kwargs}

#compatability
if self.pymodbus_slave_arg != 'unit':
kwargs['slave'] = kwargs.pop('unit')
if self.pymodbus_slave_arg != "unit":
kwargs["slave"] = kwargs.pop("unit")

self.client.write_register(register, value, **kwargs) #function code 0x06 writes to holding register

Expand Down
Loading