-
Notifications
You must be signed in to change notification settings - Fork 50
/
Copy pathBLEManager.py
71 lines (60 loc) · 3.29 KB
/
BLEManager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import asyncio
import logging
import sys
from bleak import BleakClient, BleakScanner, BLEDevice
DISCOVERY_TIMEOUT = 5 # max wait time to complete the bluetooth scanning (seconds)
class BLEManager:
def __init__(self, mac_address, alias, on_data, on_connect_fail, write_service_uuid, notify_char_uuid, write_char_uuid):
self.mac_address = mac_address
self.device_alias = alias
self.data_callback = on_data
self.connect_fail_callback = on_connect_fail
self.write_service_uuid = write_service_uuid
self.notify_char_uuid = notify_char_uuid
self.write_char_uuid = write_char_uuid
self.write_char_handle = None
self.device: BLEDevice = None
self.client: BleakClient = None
self.discovered_devices = []
async def discover(self):
mac_address = self.mac_address.upper()
logging.info("Starting discovery...")
self.discovered_devices = await BleakScanner.discover(timeout=DISCOVERY_TIMEOUT)
logging.info("Devices found: %s", len(self.discovered_devices))
for dev in self.discovered_devices:
if dev.address != None and (dev.address.upper() == mac_address or (dev.name and dev.name.strip() == self.device_alias)):
logging.info(f"Found matching device {dev.name} => {dev.address}")
self.device = dev
async def connect(self):
if not self.device: return logging.error("No device connected!")
self.client = BleakClient(self.device)
try:
await self.client.connect()
logging.info(f"Client connection: {self.client.is_connected}")
if not self.client.is_connected: return logging.error("Unable to connect")
for service in self.client.services:
for characteristic in service.characteristics:
if characteristic.uuid == self.notify_char_uuid:
await self.client.start_notify(characteristic, self.notification_callback)
logging.info(f"subscribed to notification {characteristic.uuid}")
if characteristic.uuid == self.write_char_uuid and service.uuid == self.write_service_uuid:
self.write_char_handle = characteristic.handle
logging.info(f"found write characteristic {characteristic.uuid}, service {service.uuid}")
except Exception:
logging.error(f"Error connecting to device")
self.connect_fail_callback(sys.exc_info())
async def notification_callback(self, characteristic, data: bytearray):
logging.info("notification_callback")
await self.data_callback(data)
async def characteristic_write_value(self, data):
try:
logging.info(f'writing to {self.write_char_uuid} {data}')
await self.client.write_gatt_char(self.write_char_handle, bytearray(data), response=False)
logging.info('characteristic_write_value succeeded')
await asyncio.sleep(0.5)
except Exception as e:
logging.info(f'characteristic_write_value failed {e}')
async def disconnect(self):
if self.client and self.client.is_connected:
logging.info(f"Exit: Disconnecting device: {self.device.name} {self.device.address}")
await self.client.disconnect()