generated from cepdnaclk/eYY-XXX-project-template
-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
implement the driver software with serial and bluethooth agents
- Loading branch information
1 parent
ed3a683
commit 3e45b39
Showing
5 changed files
with
157 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
import asyncio | ||
|
||
from src.SerialReader import * | ||
from src.BluetoothReader import * | ||
|
||
if __name__ == "__main__": | ||
#agent = ESPSerialReader() | ||
#async def bluetooth(): | ||
#agent = ESPBluetoothReader() | ||
#await agent.connect() | ||
#while True: | ||
#data = await agent.read() | ||
#if data: | ||
#print(data) | ||
|
||
#asyncio.run(bluetooth()) | ||
agent |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
# Author : Dasun Theekshana | ||
# Date : 19/09/2023 | ||
# File : SerialReader.py | ||
|
||
import asyncio | ||
from bleak import BleakScanner, BleakClient | ||
|
||
service_uuid = '4fafc201-1fb5-459e-8fcc-c5c9c331914b' # Service UUID | ||
characteristic_uuid = 'beb5483e-36e1-4688-b7f5-ea07361b26a8' # Characteristic UUID | ||
|
||
class ESPBluetoothReader: | ||
def __init__(self): | ||
self.macaddrs = None | ||
self.name = None | ||
self.devices = None | ||
self.client = None | ||
|
||
async def scan(self): | ||
self.devices = await BleakScanner.discover(return_adv=True) | ||
for device,data in self.devices.items(): | ||
print(f"Name: {data[1].local_name} MAC: {device} RSSI: {data[1].rssi}") | ||
|
||
async def isavalable(self): | ||
await self.scan() | ||
if self.devices: | ||
devicename = input("Enter the Devices Name: ") | ||
for device,data in self.devices.items(): | ||
name = data[1].local_name | ||
if devicename == name: | ||
self.macaddrs = device | ||
self.name = name | ||
return True | ||
print("Device not Found") | ||
return False | ||
|
||
async def connect(self): | ||
if await self.isavalable(): | ||
self.client = BleakClient(self.macaddrs) | ||
try: | ||
await self.client.connect() | ||
print(f"Connected to {self.name}") | ||
return True | ||
|
||
except Exception as e: | ||
print(f"Error Connecting: {str(e)}") | ||
return False | ||
|
||
async def disconnect(self): | ||
try: | ||
await self.client.disconnect() | ||
print(f"{self.name} disonnected") | ||
self.name = None | ||
self.macaddrs = None | ||
self.client = None | ||
return True | ||
except Exception as e: | ||
print(f"Error Disonnecting: {str(e)}") | ||
return False | ||
|
||
async def read(self): | ||
try: | ||
data = await self.client.read_gatt_char(characteristic_uuid) | ||
if data: | ||
return data.decode('utf-8') | ||
return False | ||
except Exception as e: | ||
print(f"Error Reading: {str(e)}") | ||
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
# Author : Dasun Theekshana | ||
# Date : 19/09/2023 | ||
# File : SerialReader.py | ||
|
||
import serial | ||
import serial.tools.list_ports | ||
|
||
class ESPSerialReader: | ||
def __init__(self): | ||
self.port = None | ||
self.ser = None | ||
|
||
def scan(self): | ||
""" | ||
Scan the device ports to find ESP COM port | ||
return -> COMx | ||
""" | ||
esp_port =None | ||
|
||
# List available serial ports | ||
available_ports = list(serial.tools.list_ports.comports()) | ||
|
||
# Iterate through Port to Find the ESP Board | ||
for port in available_ports: | ||
port_name = port.device | ||
port_discription = port.description | ||
try: | ||
print(f"Try to connect to {port_name}") | ||
|
||
# Open a connection with port | ||
ser = serial.Serial(port_name, 9600, timeout=1) | ||
|
||
# If port discription match with the ESP discription find the board | ||
if "CP210x USB to UART Bridge" in port_discription: | ||
print(f"ESP Board: {port_name}") | ||
esp_port = port_name | ||
break | ||
|
||
# Close the connection | ||
ser.close() | ||
|
||
except Exception as e: | ||
print(f"Failed to connect to {port_name}: {str(e)}") | ||
|
||
return esp_port | ||
|
||
def isavalable(self): | ||
self.port = self.scan() | ||
if self.port: | ||
return True | ||
return False | ||
|
||
def connect(self): | ||
try: | ||
if self.isavalable(): | ||
self.ser = serial.Serial(self.port, 9600) | ||
print(f"Key Board Connected via : {self.port}") | ||
return True | ||
return False | ||
except Exception as e: | ||
print(f"Error Connecting: {str(e)}") | ||
return False | ||
|
||
def read(self): | ||
try: | ||
data = self.ser.read().decode('utf-8') | ||
if data: | ||
return data | ||
return False | ||
except Exception as e: | ||
print(f"Error Disonnecting: {str(e)}") | ||
return False |
Binary file not shown.
Binary file not shown.