-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path__init__.py
292 lines (243 loc) · 9.89 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# coding=utf-8
"""
Copyright (C) 2022 Daniele Borgo
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
from __future__ import absolute_import
import subprocess
from copy import deepcopy
from shutil import which
import gpiozero
import octoprint.plugin
from flask import jsonify
__plugin_pythoncompat__ = ">=3,<4" # only python 3
class GPIOStatusPlugin(
octoprint.plugin.SettingsPlugin,
octoprint.plugin.AssetPlugin,
octoprint.plugin.TemplatePlugin,
octoprint.plugin.StartupPlugin,
octoprint.plugin.ShutdownPlugin,
octoprint.plugin.SimpleApiPlugin
):
def __init__(self):
super().__init__()
self.__hardware_data = None
self.__physical_pins = None
self.__max_bcm_pin = None
self.__raw_funcs = None
def on_startup(self, host, port):
info = gpiozero.pi_info()
self.__compute_hardware_data(info)
self.__prepare_physical_pins(info)
self.__max_bcm_pin = GPIOStatusPlugin.__get_max_bcm_pin_val(self.__physical_pins["pins"])
self._logger.info("Plugin ready")
def get_api_commands(self):
return dict(
gpio_status=[]
)
def on_api_command(self, command, data):
if command == "gpio_status":
self._logger.info("Refresh request received")
return jsonify(self.__get_status(
hw_required=("hw" not in data) or data["hw"],
funcs_required=("funcs" not in data) or data["funcs"]
))
return None
def __get_status(self, hw_required=True, funcs_required=True):
commands = {
"raspi_config": which("raspi-config") is not None,
"raspi_gpio": which("raspi-gpio") is not None
}
if not (commands["raspi_config"] and commands["raspi_gpio"]):
return {
"commands": commands
}
self.__prepare_raw_funcs_if_necessary()
status = self.__get_physical_pins_copied()
self.__inject_bcm_data(status["pins"], funcs_required)
formatted_status = {
"commands": commands,
"services": GPIOStatusPlugin.__get_services_status(),
"status": status
}
if hw_required:
formatted_status["hardware"] = self.__hardware_data
return formatted_status
def __compute_hardware_data(self, info):
# This data cannot change during execution
self.__hardware_data = {
"revision": info.revision,
"model": info.model,
"pcb_revision": info.pcb_revision,
"released": info.released,
"soc": info.soc,
"manufacturer": info.manufacturer,
"memory": info.memory,
"storage": info.storage,
"usb": info.usb,
"usb3": info.usb3,
"ethernet": info.ethernet,
"eth_speed": info.eth_speed,
"wifi": info.wifi,
"bluetooth": info.bluetooth,
"csi": info.csi,
"dsi": info.dsi
}
@staticmethod
def __get_services_status():
return {
"camera": GPIOStatusPlugin.__get_service_status("raspi-config nonint get_camera"),
"ssh": GPIOStatusPlugin.__get_service_status("raspi-config nonint get_ssh"),
"spi": GPIOStatusPlugin.__get_service_status("raspi-config nonint get_spi"),
"i2c": GPIOStatusPlugin.__get_service_status("raspi-config nonint get_i2c"),
"serial": GPIOStatusPlugin.__get_service_status("raspi-config nonint get_serial"),
"serial_hw": GPIOStatusPlugin.__get_service_status("raspi-config nonint get_serial_hw"),
"one_wire": GPIOStatusPlugin.__get_service_status("raspi-config nonint get_onewire"),
"remote_gpio": GPIOStatusPlugin.__get_service_status("raspi-config nonint get_rgpio")
}
@staticmethod
def __get_service_status(command):
return GPIOStatusPlugin.__execute_command(command) == "0"
@staticmethod
def __execute_command(command):
return subprocess.run(command.split(), capture_output=True).stdout.decode("utf-8").strip()
def __prepare_physical_pins(self, info: gpiozero.PiBoardInfo):
pinout = info.headers["J8" if "J8" in info.headers else "P1"]
self.__physical_pins = {
"rows": pinout.rows,
"columns": pinout.columns,
"pins": sorted([
{
"physical_name": pinout.pins[key].number,
"name": pinout.pins[key].function
}
for key in pinout.pins
], key=lambda pin: pin["physical_name"])
}
def __get_physical_pins_copied(self):
return deepcopy(self.__physical_pins)
def __inject_bcm_data(self, physical_pins, funcs_required=True):
bcm_pins_status = self.__get_bcm_pins_status(funcs_required)
for pin in physical_pins:
if pin["name"] in bcm_pins_status:
pin["is_bcm"] = True
pin.update(bcm_pins_status[pin["name"]])
else:
pin["is_bcm"] = False
@staticmethod
def __get_max_bcm_pin_val(pins):
return max([int(pin["name"][4:]) for pin in pins if pin["name"].startswith("GPIO")])
def __get_bcm_pins_status(self, funcs_required=True):
bcm_pins_status = self.__get_raw_status()
formatted = {}
for i in range(len(bcm_pins_status)):
status = bcm_pins_status[i]
funcs = self.__raw_funcs[i]
index = f"GPIO{status[0]}"
formatted[index] = {
"current_value": int(status[1]), # level
"pull": funcs[1]
}
formatted[index]["current_func"] = funcs[2 + int(status[2])] if status[2].isdigit() else status[2]
if funcs_required:
formatted[index]["funcs"] = funcs[2:]
return formatted
def __get_raw_status(self):
return [
[
# GPIO pin number
config[1].replace(":", ""),
# pin level
config[2][config[2].find("=") + 1:],
# fsel not needed
# pin function
config[4][config[4].find("=") + 1:]
]
for config in [
config.split()
for config in GPIOStatusPlugin.__execute_command(
f"raspi-gpio get {0}-{self.__max_bcm_pin}"
).split("\n")
]
]
def __prepare_raw_funcs_if_necessary(self):
if self.__raw_funcs is None:
self.__raw_funcs = [
config.split(", ")
for config in GPIOStatusPlugin.__execute_command(
f"raspi-gpio funcs {0}-{self.__max_bcm_pin}"
).split("\n")[1:]
]
def get_settings_defaults(self):
return dict(
reload_on_check_change=False,
load_on_startup=True,
compact_view=True,
hide_special_pins=False,
order_by_name=False,
hide_physical=False,
show_notes=False,
hide_images=False,
pins_notes_json="{}"
)
def on_settings_save(self, data):
if self.__is_data_correct(data):
octoprint.plugin.SettingsPlugin.on_settings_save(self, data)
return
# This happens in case of a JS bug
self._logger.info("Saving rejected: constraint not followed")
def __is_data_correct(self, data):
return not (self.__pick_newest("compact_view", data) and
(
self.__pick_newest("hide_special_pins", data) or
self.__pick_newest("order_by_name", data) or
self.__pick_newest("show_notes", data)
))
def __pick_newest(self, name, data):
return data[name] if name in data else self._settings.get_boolean([name])
def get_template_configs(self):
return [
dict(type="tab", custom_bindings=True),
dict(type="settings", custom_bindings=True)
]
def get_assets(self):
# Define your plugin's asset files to automatically include in the
# core UI here.
return {
"js": ["js/gpiostatus.js"],
"css": ["css/gpiostatus.css"]
# "less": ["less/gpiostatus.less"]
}
def get_update_information(self):
# Define the configuration for your plugin to use with the Software Update
# Plugin here. See https://docs.octoprint.org/en/master/bundledplugins/softwareupdate.html
# for details.
return {
"gpiostatus": {
"displayName": "GPIO Status Plugin",
"displayVersion": self._plugin_version,
# version check: github repository
"type": "github_release",
"user": "danieleborgo",
"repo": "OctoPrint-GPIOStatus",
"current": self._plugin_version,
# update method: pip
"pip": "https://github.com/danieleborgo/OctoPrint-GPIOStatus/archive/{target_version}.zip",
}
}
def __plugin_load__():
global __plugin_implementation__
__plugin_implementation__ = GPIOStatusPlugin()
global __plugin_hooks__
__plugin_hooks__ = {
"octoprint.plugin.softwareupdate.check_config": __plugin_implementation__.get_update_information
}