Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added BLE terminal #52

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/read_firmware_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ def characteristic_value_updated(self, characteristic, value):
device = AnyDevice(manager=manager, mac_address=args.mac_address)
device.connect()


manager.run()
Binary file added gatt/__init__.pyc
Binary file not shown.
Binary file added gatt/errors.pyc
Binary file not shown.
Binary file added gatt/gatt.pyc
Binary file not shown.
5 changes: 4 additions & 1 deletion gatt/gatt_linux.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def run(self):
This call blocks until you call `stop()` to stop the main loop.
"""


if self._main_loop:
return

Expand All @@ -86,7 +87,7 @@ def disconnect_signals():
self._properties_changed_signal.remove()
self._interface_added_signal.remove()

self._main_loop = GObject.MainLoop()
self._main_loop = GObject.MainLoop.new(None, False) # modification to launch run in a thread
try:
self._main_loop.run()
disconnect_signals()
Expand Down Expand Up @@ -605,6 +606,7 @@ def write_value(self, value, offset=0):
:param value: array of bytes to be written
:param offset: offset from where to start writing the bytes (defaults to 0)
"""
print("Sent : ", value)
bytes = [dbus.Byte(b) for b in value]

try:
Expand Down Expand Up @@ -640,6 +642,7 @@ def enable_notifications(self, enabled=True):
Each time when the device notifies a new value, `characteristic_value_updated()` of the related
device will be called.
"""
print("enable_notifications called")
try:
if enabled:
self._object.StartNotify(
Expand Down
Binary file added gatt/gatt_linux.pyc
Binary file not shown.
107 changes: 103 additions & 4 deletions gattctl.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
#!/usr/bin/env python3

from argparse import ArgumentParser
from threading import Thread
import keyboard
from termios import tcflush, TCIFLUSH
import gatt
import time
import sys

device_manager = None

Expand All @@ -11,13 +16,28 @@ class AnyDeviceManager(gatt.DeviceManager):
An implementation of ``gatt.DeviceManager`` that discovers any GATT device
and prints all discovered devices.
"""
def __init__(self, adapter_name):
super().__init__(adapter_name)
self.nb_device_connected = 0
self.discover_filter = 'True'

def device_discovered(self, device):
print("[%s] Discovered, alias = %s" % (device.mac_address, device.alias()))
if self.discover_filter == 'True':
print("[%s] Discovered, alias = %s" % (device.mac_address, device.alias()))
elif self.discover_filter in device.alias() or self.discover_filter in device.mac_address:
print("[%s] Discovered, alias = %s" % (device.mac_address, device.alias()))

def make_device(self, mac_address):
return AnyDevice(mac_address=mac_address, manager=self)

def run(self):
print("Running Manager")
super().run()

def quit(self):
print("Stopping Manager")
super().stop()


class AnyDevice(gatt.Device):
"""
Expand All @@ -28,20 +48,26 @@ class AnyDevice(gatt.Device):
def __init__(self, mac_address, manager, auto_reconnect=False):
super().__init__(mac_address=mac_address, manager=manager)
self.auto_reconnect = auto_reconnect
self.message = ''
self.transmition = ""
self.reception = ""

def connect(self):
print("Connecting...")
super().connect()

def connect_succeeded(self):
super().connect_succeeded()
device_manager.nb_device_connected += 1
print("[%s] Connected" % (self.mac_address))

def connect_failed(self, error):
super().connect_failed(error)
print("[%s] Connection failed: %s" % (self.mac_address, str(error)))

def disconnect_succeeded(self):
if device_manager.nb_device_connected:
device_manager.nb_device_connected -= 1
super().disconnect_succeeded()

print("[%s] Disconnected" % (self.mac_address))
Expand All @@ -55,7 +81,48 @@ def services_resolved(self):
for service in self.services:
print("[%s] Service [%s]" % (self.mac_address, service.uuid))
for characteristic in service.characteristics:
if characteristic.uuid == '6e400003-b5a3-f393-e0a9-e50e24dcca9e':
characteristic.enable_notifications()

print("[%s] Characteristic [%s]" % (self.mac_address, characteristic.uuid))
print("[%s] Resolved services" % (self.mac_address))

device_service = next(
s for s in self.services
if s.uuid == '6e400001-b5a3-f393-e0a9-e50e24dcca9e')

self.reception = next(
c for c in device_service.characteristics
if c.uuid == '6e400003-b5a3-f393-e0a9-e50e24dcca9e')

self.transmition = next(
c for c in device_service.characteristics
if c.uuid == '6e400002-b5a3-f393-e0a9-e50e24dcca9e')

if self.message:
bar = bytearray(self.message.encode())
self.transmition.write_value(bar)

time.sleep(2)
self.reception.read_value()

def characteristic_value_updated(self, characteristic, value):
print("RX:", (value.decode("utf-8")).replace('\n', ''))

def characteristic_read_value_failed(self, characteristic, error):
print("RX fail : ", error)

def characteristic_enable_notifications_succeeded(self, characteristic):
print("enable notification succeeded for characteristic ", characteristic.uuid)

def characteristic_enable_notifications_failed(self, characteristic, error):
print("enable notification succeeded for characteristic ", characteristic.uuid, ", error, ", error)

def characteristic_write_value_succeeded(self, characteristic):
print("write value succeeded for characteristic ", characteristic.uuid)

def characteristic_write_value_failed(self, characteristic, error):
print("write value fail for characteristic ", characteristic.uuid, ", error, ", error)


def main():
Expand All @@ -64,6 +131,11 @@ def main():
'--adapter',
default='hci0',
help="Name of Bluetooth adapter, defaults to 'hci0'")
arg_parser.add_argument(
'--message',
metavar='string',
type=str,
help="String send by ble, usable with --connect only. All 'n' of the message will be remplace by '\ n' ")
arg_commands_group = arg_parser.add_mutually_exclusive_group(required=True)
arg_commands_group.add_argument(
'--power-on',
Expand All @@ -81,6 +153,11 @@ def main():
'--discover',
action='store_true',
help="Lists all nearby GATT devices")
arg_commands_group.add_argument(
'--discover-filter',
metavar='filter',
type=str,
help="Lists all nearby GATT devices containing filter in their MAC address or alias")
arg_commands_group.add_argument(
'--connect',
metavar='address',
Expand Down Expand Up @@ -114,8 +191,14 @@ def main():
return
if args.discover:
device_manager.start_discovery()
if args.discover_filter:
print("discover filter : ", args.discover_filter)
device_manager.discover_filter = args.discover_filter
device_manager.start_discovery()
elif args.connect:
device = AnyDevice(mac_address=args.connect, manager=device_manager)
if args.message:
device.message = (args.message.replace('%n','\n'))
device.connect()
elif args.auto:
device = AnyDevice(mac_address=args.auto, manager=device_manager, auto_reconnect=True)
Expand All @@ -125,12 +208,28 @@ def main():
device.disconnect()
return

thread = Thread(target=device_manager.run)
thread.start()

print("Terminate with Ctrl+C")
print("Started Thread.")

try:
device_manager.run()
while True:
if device_manager.nb_device_connected:
# If ESCAPE key is pressed you can start to write the message, ENTER to send it to the device
if keyboard.is_pressed(chr(27)):
tcflush(sys.stdin, TCIFLUSH)
message = bytearray(input(">> ").replace('%n','\n').encode())
device.transmition.write_value(message)

pass
except KeyboardInterrupt:
pass

print('KeyboardInterrupt!')
if device_manager.nb_device_connected:
device.disconnect()
device_manager.quit()
thread.join()

if __name__ == '__main__':
main()