forked from kbialek/deye-inverter-mqtt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
deye_sensor.py
196 lines (159 loc) · 6.89 KB
/
deye_sensor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from abc import abstractmethod
class Sensor():
"""
Models solar inverter sensor.
This is an abstract class. Method 'read_value' must be provided by the extending subclass.
"""
def __init__(self, name: str, mqtt_topic_suffix='', unit='', print_format='{:s}', groups=[]):
self.name = name
self.mqtt_topic_suffix = mqtt_topic_suffix
self.unit = unit
self.print_format = print_format
assert len(groups) > 0, f'Sensor {name} must belong to at least one group'
self.groups = groups
@abstractmethod
def read_value(self, registers: dict[int, int]):
"""
Reads sensor value from Modbus registers
"""
pass
def format_value(self, value):
"""
Formats sensor value using configured format string
"""
return self.print_format.format(value)
def in_any_group(self, active_groups: set[str]) -> bool:
"""
Checks if this sensor is included in at least one of the given active_groups.
Sensor matches any group when its groups set is empty (default behavior)
"""
return not self.groups or len(active_groups.intersection(self.groups)) > 0
@abstractmethod
def get_registers(self) -> list[int]:
"""Returns the list of Modbus registers read by this sensor
"""
class SingleRegisterSensor(Sensor):
"""
Solar inverter sensor with value stored as 32-bit integer in a single Modbus register.
"""
def __init__(
self, name: str, reg_address: int, factor: float, offset: float = 0,
signed=False, mqtt_topic_suffix='', unit='', print_format='{:0.1f}', groups=[]):
super().__init__(name, mqtt_topic_suffix, unit, print_format, groups)
self.reg_address = reg_address
self.factor = factor
self.offset = offset
self.signed = signed
def read_value(self, registers: dict[int, int]):
if self.reg_address in registers:
reg_value = registers[self.reg_address]
return int.from_bytes(reg_value, 'big', signed=self.signed) * self.factor + self.offset
else:
return None
@abstractmethod
def get_registers(self) -> list[int]:
return [self.reg_address]
class DoubleRegisterSensor(Sensor):
"""
Solar inverter sensor with value stored as 64-bit integer in two Modbus registers.
"""
def __init__(
self, name: str, reg_address: int, factor: float, offset: float = 0,
signed=False, mqtt_topic_suffix='', unit='', print_format='{:0.1f}', groups=[]):
super().__init__(name, mqtt_topic_suffix, unit, print_format, groups)
self.reg_address = reg_address
self.factor = factor
self.offset = offset
self.signed = signed
def read_value(self, registers: dict[int, int]):
low_word_reg_address = self.reg_address
high_word_reg_address = self.reg_address + 1
if low_word_reg_address in registers and high_word_reg_address in registers:
low_word = registers[low_word_reg_address]
high_word = registers[high_word_reg_address]
return int.from_bytes(high_word + low_word, 'big', signed=self.signed) * self.factor + self.offset
else:
return None
@abstractmethod
def get_registers(self) -> list[int]:
return [self.reg_address, self.reg_address+1]
class ComputedPowerSensor(Sensor):
"""
Electric Power sensor with value computed as multiplication of values read by voltage and current sensors.
"""
def __init__(
self, name: str, voltage_sensor: Sensor, current_sensor: Sensor, mqtt_topic_suffix='',
unit='', print_format='{:0.1f}', groups=[]):
super().__init__(name, mqtt_topic_suffix, unit, print_format, groups)
self.voltage_sensor = voltage_sensor
self.current_sensor = current_sensor
def read_value(self, registers: dict[int, int]):
voltage = self.voltage_sensor.read_value(registers)
current = self.current_sensor.read_value(registers)
if voltage is not None and current is not None:
return voltage * current
else:
return None
@abstractmethod
def get_registers(self) -> list[int]:
return []
class ComputedSumSensor(Sensor):
"""
Computes a sum of values read by given list of sensors.
"""
def __init__(
self, name: str, sensors: list[Sensor], mqtt_topic_suffix='', unit='',
print_format='{:0.1f}', groups=[]):
super().__init__(name, mqtt_topic_suffix, unit, print_format, groups)
self.sensors = sensors
def read_value(self, registers: dict[int, int]):
result = 0
sensor_values = [s.read_value(registers) for s in self.sensors]
for value in sensor_values:
if value is None:
return None
result += value
return result
@abstractmethod
def get_registers(self) -> list[int]:
return []
class SensorRegisterRange:
"""
Declares a Modbus register range that must be read to provide values for sensors within a metrics group
"""
def __init__(self, group: str, first_reg_address: int, last_reg_address: int):
self.group = group
self.first_reg_address = first_reg_address
self.last_reg_address = last_reg_address
def in_any_group(self, active_groups: set[str]) -> bool:
"""
Checks if this range is included in at least one of the given active_groups.
"""
return self.group in active_groups
def is_same_range(self, other: 'SensorRegisterRange') -> bool:
"""Checks if the other range has this same first and last reg address.
Args:
other (SensorRegisterRange): to check against
Returns:
bool: True when both ranges define this same registers addresses, False otherwise
"""
return self.first_reg_address == other.first_reg_address and self.last_reg_address == other.last_reg_address
def __str__(self):
return 'metrics group: {}, range: {:04x}-{:04x}'.format(
self.group, self.first_reg_address, self.last_reg_address)