Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
44db2f3
* improve eg4 handling
jaredmauch Jun 20, 2025
30902c8
s/type/trasport
jaredmauch Jun 20, 2025
8afe3f0
revert
jaredmauch Jun 20, 2025
d5632a5
attempt to fix issue with analyze_protocol = true
jaredmauch Jun 20, 2025
f9d5fc2
attempt to fix issue with analyze_protocol = true
jaredmauch Jun 20, 2025
a52c418
attempt to fix issue with analyze_protocol = true
jaredmauch Jun 20, 2025
0248882
attempt to fix issue with analyze_protocol = true
jaredmauch Jun 20, 2025
e8be5e9
Fix pymodbus 3.7+ compatibility issues
jaredmauch Jun 20, 2025
7f6e1ed
remove testing files from branch
jaredmauch Jun 20, 2025
fcad1f2
Fix UnboundLocalError and improve validation robustness
jaredmauch Jun 20, 2025
7650f0b
Fix analyze_protocol validation timing issue
jaredmauch Jun 20, 2025
9c69c2e
Fix analyze_protocol to use configured protocol register ranges
jaredmauch Jun 20, 2025
e36fa93
address connection issue
jaredmauch Jun 20, 2025
663e76d
address issue with analyze_protocol
jaredmauch Jun 20, 2025
55e2a68
address issue with analyze_protocol
jaredmauch Jun 20, 2025
635e3ba
address issue with analyze_protocol
jaredmauch Jun 20, 2025
e8d7c56
address issue with analyze_protocol
jaredmauch Jun 20, 2025
5374391
address issue with analyze_protocol
jaredmauch Jun 20, 2025
20e18b1
address issue with analyze_protocol
jaredmauch Jun 20, 2025
1970e23
address issue with analyze_protocol
jaredmauch Jun 20, 2025
d3b4166
address issue with analyze_protocol
jaredmauch Jun 20, 2025
bbf19e3
address issue with analyze_protocol
jaredmauch Jun 20, 2025
021736f
address issue with analyze_protocol baudrate
jaredmauch Jun 20, 2025
c0510f6
address issue with analyze_protocol baudrate
jaredmauch Jun 20, 2025
224d4ed
restore file accidentally deleted in 7f6e1ed
jaredmauch Jun 20, 2025
d6d14d1
sync over 4db83627f1ec637851e5acf0906003b0341b4344
jaredmauch Jun 20, 2025
64fb1ef
Merge branch 'main' of github.com:jaredmauch/PythonProtocolGateway
jaredmauch Jun 20, 2025
af51f7d
cleanup logging, place DEBUG level messages behind debug
jaredmauch Jun 20, 2025
df9c9c6
cleanup logging, place DEBUG level messages behind debug
jaredmauch Jun 20, 2025
a8f606a
influxdb floating point fixup
jaredmauch Jun 20, 2025
fb49a90
promote serial number from inverter to device
jaredmauch Jun 20, 2025
e7d78b0
promote serial number from inverter to device
jaredmauch Jun 20, 2025
d03a843
simplify device_serial_number
jaredmauch Jun 20, 2025
950fd4b
Merge branch 'v1.1.10-pr92' into main
HotNoob Jun 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 70 additions & 29 deletions classes/protocol_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ def process_row(row):
concatenate_registers.append(i)

if concatenate_registers:
r = range(len(concatenate_registers))
r = range(1) # Only create one entry for concatenated variables
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. i dont remember enough to know if this is ok... :P

else:
r = range(1)

Expand Down Expand Up @@ -1111,41 +1111,82 @@ def process_registery(self, registry : Union[dict[int, int], dict[int, bytes]] ,

concatenate_registry : dict = {}
info = {}

# First pass: process all non-concatenated entries
for entry in map:

if entry.register not in registry:
continue
value = ""

if isinstance(registry[entry.register], bytes):
value = self.process_register_bytes(registry, entry)
else:
value = self.process_register_ushort(registry, entry)

#if item.unit:
# value = str(value) + item.unit

if not entry.concatenate:
value = ""
if isinstance(registry[entry.register], bytes):
value = self.process_register_bytes(registry, entry)
else:
value = self.process_register_ushort(registry, entry)
info[entry.variable_name] = value

# Second pass: process concatenated entries
for entry in map:
if entry.register not in registry:
continue

if entry.concatenate:
concatenate_registry[entry.register] = value

all_exist = True
for key in entry.concatenate_registers:
if key not in concatenate_registry:
all_exist = False
break
if all_exist:
#if all(key in concatenate_registry for key in item.concatenate_registers):
concatenated_value = ""
for key in entry.concatenate_registers:
concatenated_value = concatenated_value + str(concatenate_registry[key])
del concatenate_registry[key]

#replace null characters with spaces and trim
# For concatenated entries, we need to process each register in the concatenate_registers list
concatenated_value = ""
all_registers_exist = True

# For ASCII concatenated variables, extract 8-bit characters from 16-bit registers
if entry.data_type == Data_Type.ASCII:
for reg in entry.concatenate_registers:
if reg not in registry:
all_registers_exist = False
break

reg_value = registry[reg]
# Extract high byte (bits 8-15) and low byte (bits 0-7)
high_byte = (reg_value >> 8) & 0xFF
low_byte = reg_value & 0xFF

# Convert each byte to ASCII character (low byte first, then high byte)
low_char = chr(low_byte)
high_char = chr(high_byte)
concatenated_value += low_char + high_char
else:
for reg in entry.concatenate_registers:
if reg not in registry:
all_registers_exist = False
break

# Create a temporary entry for this register to process it
temp_entry = registry_map_entry(
registry_type=entry.registry_type,
register=reg,
register_bit=0,
register_byte=0,
variable_name=f"temp_{reg}",
documented_name=f"temp_{reg}",
unit="",
unit_mod=1.0,
concatenate=False,
concatenate_registers=[],
values=[],
data_type=entry.data_type,
data_type_size=entry.data_type_size
)

if isinstance(registry[reg], bytes):
value = self.process_register_bytes(registry, temp_entry)
else:
value = self.process_register_ushort(registry, temp_entry)

concatenated_value += str(value)

if all_registers_exist:
# Replace null characters with spaces and trim for ASCII
if entry.data_type == Data_Type.ASCII:
concatenated_value = concatenated_value.replace("\x00", " ").strip()

info[entry.variable_name] = concatenated_value
else:
info[entry.variable_name] = value

return info

Expand Down
163 changes: 163 additions & 0 deletions classes/transports/influxdb_out.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import sys
from configparser import SectionProxy
from typing import TextIO
import time

from defs.common import strtobool

from ..protocol_settings import Registry_Type, WriteMode, registry_map_entry
from .transport_base import transport_base


class influxdb_out(transport_base):
''' InfluxDB v1 output transport that writes data to an InfluxDB server '''
host: str = "localhost"
port: int = 8086
database: str = "solar"
username: str = ""
password: str = ""
measurement: str = "device_data"
include_timestamp: bool = True
include_device_info: bool = True
batch_size: int = 100
batch_timeout: float = 10.0

client = None
batch_points = []
last_batch_time = 0

def __init__(self, settings: SectionProxy):
self.host = settings.get("host", fallback=self.host)
self.port = settings.getint("port", fallback=self.port)
self.database = settings.get("database", fallback=self.database)
self.username = settings.get("username", fallback=self.username)
self.password = settings.get("password", fallback=self.password)
self.measurement = settings.get("measurement", fallback=self.measurement)
self.include_timestamp = strtobool(settings.get("include_timestamp", fallback=self.include_timestamp))
self.include_device_info = strtobool(settings.get("include_device_info", fallback=self.include_device_info))
self.batch_size = settings.getint("batch_size", fallback=self.batch_size)
self.batch_timeout = settings.getfloat("batch_timeout", fallback=self.batch_timeout)

self.write_enabled = True # InfluxDB output is always write-enabled
super().__init__(settings)

def connect(self):
"""Initialize the InfluxDB client connection"""
self._log.info("influxdb_out connect")

try:
from influxdb import InfluxDBClient

# Create InfluxDB client
self.client = InfluxDBClient(
host=self.host,
port=self.port,
username=self.username if self.username else None,
password=self.password if self.password else None,
database=self.database
)

# Test connection
self.client.ping()

# Create database if it doesn't exist
databases = self.client.get_list_database()
if not any(db['name'] == self.database for db in databases):
self._log.info(f"Creating database: {self.database}")
self.client.create_database(self.database)

self.connected = True
self._log.info(f"Connected to InfluxDB at {self.host}:{self.port}")

except ImportError:
self._log.error("InfluxDB client not installed. Please install with: pip install influxdb")
self.connected = False
except Exception as e:
self._log.error(f"Failed to connect to InfluxDB: {e}")
self.connected = False

def write_data(self, data: dict[str, str], from_transport: transport_base):
"""Write data to InfluxDB"""
if not self.write_enabled or not self.connected:
return

self._log.info(f"write data from [{from_transport.transport_name}] to influxdb_out transport")
self._log.info(data)

# Prepare tags for InfluxDB
tags = {}

# Add device information as tags if enabled
if self.include_device_info:
tags.update({
"device_identifier": from_transport.device_identifier,
"device_name": from_transport.device_name,
"device_manufacturer": from_transport.device_manufacturer,
"device_model": from_transport.device_model,
"device_serial_number": from_transport.device_serial_number,
"transport": from_transport.transport_name
})

# Prepare fields (the actual data values)
fields = {}
for key, value in data.items():
# Try to convert to numeric values for InfluxDB
try:
# Try to convert to float first
float_val = float(value)
# If it's an integer, store as int
if float_val.is_integer():
fields[key] = int(float_val)
else:
fields[key] = float_val
except (ValueError, TypeError):
# If conversion fails, store as string
fields[key] = str(value)

# Create InfluxDB point
point = {
"measurement": self.measurement,
"tags": tags,
"fields": fields
}

# Add timestamp if enabled
if self.include_timestamp:
point["time"] = int(time.time() * 1e9) # Convert to nanoseconds

# Add to batch
self.batch_points.append(point)

# Check if we should flush the batch
current_time = time.time()
if (len(self.batch_points) >= self.batch_size or
(current_time - self.last_batch_time) >= self.batch_timeout):
self._flush_batch()

def _flush_batch(self):
"""Flush the batch of points to InfluxDB"""
if not self.batch_points:
return

try:
self.client.write_points(self.batch_points)
self._log.info(f"Wrote {len(self.batch_points)} points to InfluxDB")
self.batch_points = []
self.last_batch_time = time.time()
except Exception as e:
self._log.error(f"Failed to write batch to InfluxDB: {e}")
self.connected = False

def init_bridge(self, from_transport: transport_base):
"""Initialize bridge - not needed for InfluxDB output"""
pass

def __del__(self):
"""Cleanup on destruction - flush any remaining points"""
if self.batch_points:
self._flush_batch()
if self.client:
try:
self.client.close()
except Exception:
pass
109 changes: 109 additions & 0 deletions classes/transports/json_out.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import json
import sys
from configparser import SectionProxy
from typing import TextIO

from defs.common import strtobool

from ..protocol_settings import Registry_Type, WriteMode, registry_map_entry
from .transport_base import transport_base


class json_out(transport_base):
''' JSON output transport that writes data to a file or stdout '''
output_file: str = "stdout"
pretty_print: bool = True
append_mode: bool = False
include_timestamp: bool = True
include_device_info: bool = True

file_handle: TextIO = None

def __init__(self, settings: SectionProxy):
self.output_file = settings.get("output_file", fallback=self.output_file)
self.pretty_print = strtobool(settings.get("pretty_print", fallback=self.pretty_print))
self.append_mode = strtobool(settings.get("append_mode", fallback=self.append_mode))
self.include_timestamp = strtobool(settings.get("include_timestamp", fallback=self.include_timestamp))
self.include_device_info = strtobool(settings.get("include_device_info", fallback=self.include_device_info))

self.write_enabled = True # JSON output is always write-enabled
super().__init__(settings)

def connect(self):
"""Initialize the output file handle"""
self._log.info("json_out connect")

if self.output_file.lower() == "stdout":
self.file_handle = sys.stdout
else:
try:
mode = "a" if self.append_mode else "w"
self.file_handle = open(self.output_file, mode, encoding='utf-8')
self.connected = True
except Exception as e:
self._log.error(f"Failed to open output file {self.output_file}: {e}")
self.connected = False
return

self.connected = True

def write_data(self, data: dict[str, str], from_transport: transport_base):
"""Write data as JSON to the output file"""
if not self.write_enabled or not self.connected:
return

self._log.info(f"write data from [{from_transport.transport_name}] to json_out transport")
self._log.info(data)

# Prepare the JSON output structure
output_data = {}

# Add device information if enabled
if self.include_device_info:
output_data["device"] = {
"identifier": from_transport.device_identifier,
"name": from_transport.device_name,
"manufacturer": from_transport.device_manufacturer,
"model": from_transport.device_model,
"serial_number": from_transport.device_serial_number,
"transport": from_transport.transport_name
}

# Add timestamp if enabled
if self.include_timestamp:
import time
output_data["timestamp"] = time.time()

# Add the actual data
output_data["data"] = data

# Convert to JSON
if self.pretty_print:
json_string = json.dumps(output_data, indent=2, ensure_ascii=False)
else:
json_string = json.dumps(output_data, ensure_ascii=False)

# Write to file
try:
if self.output_file.lower() != "stdout":
# For files, add a newline and flush
self.file_handle.write(json_string + "\n")
self.file_handle.flush()
else:
# For stdout, just print
print(json_string)
except Exception as e:
self._log.error(f"Failed to write to output: {e}")
self.connected = False

def init_bridge(self, from_transport: transport_base):
"""Initialize bridge - not needed for JSON output"""
pass

def __del__(self):
"""Cleanup file handle on destruction"""
if self.file_handle and self.output_file.lower() != "stdout":
try:
self.file_handle.close()
except:
pass
Loading