This repository has been archived by the owner on Oct 26, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 117
/
Copy pathmiscale.py
67 lines (49 loc) · 1.76 KB
/
miscale.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import time
from interruptingcow import timeout
from exceptions import DeviceTimeoutError
from mqtt import MqttMessage
from workers.base import BaseWorker
REQUIREMENTS = ["bluepy"]
# Bluepy might need special settings
# sudo setcap 'cap_net_raw,cap_net_admin+eip' /usr/local/lib/python3.6/dist-packages/bluepy/bluepy-helper
class MiscaleWorker(BaseWorker):
SCAN_TIMEOUT = 5
def status_update(self):
return [
MqttMessage(
topic=self.format_topic("weight/kg"), payload=self._get_weight()
)
]
def _get_weight(self):
from bluepy import btle
scan_processor = ScanProcessor(self.mac)
scanner = btle.Scanner().withDelegate(scan_processor)
scanner.scan(self.SCAN_TIMEOUT, passive=True)
with timeout(
self.SCAN_TIMEOUT,
exception=DeviceTimeoutError(
"Retrieving the weight from {} device {} timed out after {} seconds".format(
repr(self), self.mac, self.SCAN_TIMEOUT
)
),
):
while scan_processor.weight is None:
time.sleep(1)
return scan_processor.weight
return -1
class ScanProcessor:
def __init__(self, mac):
self._mac = mac
self._weight = None
def handleDiscovery(self, dev, isNewDev, _):
if dev.addr == self.mac.lower() and isNewDev:
for (sdid, desc, data) in dev.getScanData():
if data.startswith("1d18") and sdid == 22:
measured = int((data[8:10] + data[6:8]), 16) * 0.01
self._weight = round(measured / 2, 2)
@property
def mac(self):
return self._mac
@property
def weight(self):
return self._weight