Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Discovered devices improvements #1047

Merged
merged 4 commits into from
Oct 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ Added
* Added support for Python 3.11.
* Added better error message for Bluetooth not authorized on macOS.
* ``BleakDeviceNotFoundError`` which should be raised if a device can not be found by ``connect``, ``pair`` and ``unpair``
* Added ``rssi`` attribute to ``AdvertisementData``.
* Added ``BleakScanner.discovered_devices_and_advertisement_data`` property.
* Added ``return_adv`` argument to ``BleakScanner.discover`` method.

Changed
-------
Changed ``AdvertisementData`` to a named tuple.

Fixed
-----
Expand Down
64 changes: 55 additions & 9 deletions bleak/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,18 @@
import os
import sys
import uuid
from typing import TYPE_CHECKING, Awaitable, Callable, List, Optional, Type, Union
from typing import (
TYPE_CHECKING,
Awaitable,
Callable,
Dict,
List,
Optional,
Tuple,
Type,
Union,
overload,
)
from warnings import warn

import async_timeout
Expand Down Expand Up @@ -153,34 +164,69 @@ def set_scanning_filter(self, **kwargs):
)
self._backend.set_scanning_filter(**kwargs)

@overload
@classmethod
async def discover(
cls, timeout: float = 5.0, *, return_adv: Literal[False], **kwargs
) -> List[BLEDevice]:
...

@overload
@classmethod
async def discover(
cls, timeout: float = 5.0, *, return_adv: Literal[True], **kwargs
) -> Dict[str, Tuple[BLEDevice, AdvertisementData]]:
...

@classmethod
async def discover(cls, timeout=5.0, **kwargs) -> List[BLEDevice]:
async def discover(cls, timeout=5.0, *, return_adv=False, **kwargs):
"""
Scan continuously for ``timeout`` seconds and return discovered devices.

Args:
timeout:
Time, in seconds, to scan for.
return_adv:
If ``True``, the return value will include advertising data.
**kwargs:
Additional arguments will be passed to the :class:`BleakScanner`
constructor.

Returns:

The value of :attr:`discovered_devices_and_advertisement_data` if
``return_adv`` is ``True``, otherwise the value of :attr:`discovered_devices`.
"""
async with cls(**kwargs) as scanner:
await asyncio.sleep(timeout)
devices = scanner.discovered_devices
return devices

if return_adv:
return scanner.discovered_devices_and_advertisement_data

return scanner.discovered_devices

@property
def discovered_devices(self) -> List[BLEDevice]:
"""Gets the devices registered by the BleakScanner.
"""
Gets list of the devices that the scanner has discovered during the scanning.

Returns:
A list of the devices that the scanner has discovered during the scanning.
If you also need advertisement data, use :attr:`discovered_devices_and_advertisement_data` instead.
"""
return [d for d, _ in self._backend.seen_devices.values()]

@property
def discovered_devices_and_advertisement_data(
self,
) -> Dict[str, Tuple[BLEDevice, AdvertisementData]]:
"""
Gets a map of device address to tuples of devices and the most recently
received advertisement data for that device.

The address keys are useful to compare the discovered devices to a set
of known devices. If you don't need to do that, consider using
``discovered_devices_and_advertisement_data.values()`` to just get the
values instead.
"""
return self._backend.discovered_devices
return self._backend.seen_devices

async def get_discovered_devices(self) -> List[BLEDevice]:
"""Gets the devices registered by the BleakScanner.
Expand Down
48 changes: 12 additions & 36 deletions bleak/backends/bluezdbus/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
from typing import Literal, TypedDict

from ...exc import BleakError
from ..device import BLEDevice
from ..scanner import AdvertisementData, AdvertisementDataCallback, BaseBleakScanner
from .advertisement_monitor import OrPatternLike
from .defs import Device1
from .manager import get_global_bluez_manager
from .utils import bdaddr_from_device_path

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -89,9 +89,6 @@ def __init__(
# kwarg "device" is for backwards compatibility
self._adapter: Optional[str] = kwargs.get("adapter", kwargs.get("device"))

# map of d-bus object path to d-bus object properties
self._devices: Dict[str, Device1] = {}

# callback from manager for stopping scanning if it has been started
self._stop: Optional[Callable[[], Coroutine]] = None

Expand Down Expand Up @@ -137,7 +134,7 @@ async def start(self):
else:
adapter_path = manager.get_default_adapter()

self._devices.clear()
self.seen_devices = {}

if self._scanning_mode == "passive":
self._stop = await manager.passive_scan(
Expand Down Expand Up @@ -192,25 +189,6 @@ def set_scanning_filter(self, **kwargs):
else:
logger.warning("Filter '%s' is not currently supported." % k)

@property
def discovered_devices(self) -> List[BLEDevice]:
discovered_devices = []

for path, props in self._devices.items():
uuids = props.get("UUIDs", [])
manufacturer_data = props.get("ManufacturerData", {})
discovered_devices.append(
BLEDevice(
props["Address"],
props["Alias"],
{"path": path, "props": props},
props.get("RSSI", 0),
uuids=uuids,
manufacturer_data=manufacturer_data,
)
)
return discovered_devices

# Helper methods

def _handle_advertising_data(self, path: str, props: Device1) -> None:
Expand All @@ -222,11 +200,6 @@ def _handle_advertising_data(self, path: str, props: Device1) -> None:
props: The D-Bus object properties of the device.
"""

self._devices[path] = props

if self._callback is None:
return

# Get all the information wanted to pack in the advertisement data
_local_name = props.get("Name")
_manufacturer_data = {
Expand All @@ -244,29 +217,32 @@ def _handle_advertising_data(self, path: str, props: Device1) -> None:
manufacturer_data=_manufacturer_data,
service_data=_service_data,
service_uuids=_service_uuids,
platform_data=props,
tx_power=tx_power,
rssi=props.get("RSSI", -127),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

platform_data=(path, props),
)

device = BLEDevice(
device = self.create_or_update_device(
props["Address"],
props["Alias"],
{"path": path, "props": props},
props.get("RSSI", 0),
uuids=_service_uuids,
manufacturer_data=_manufacturer_data,
advertisement_data,
)

if self._callback is None:
return

self._callback(device, advertisement_data)

def _handle_device_removed(self, device_path: str) -> None:
"""
Handles a device being removed from BlueZ.
"""
try:
del self._devices[device_path]
bdaddr = bdaddr_from_device_path(device_path)
del self.seen_devices[bdaddr]
except KeyError:
# The device will not have been added to self._devices if no
# The device will not have been added to self.seen_devices if no
# advertising data was received, so this is expected to happen
# occasionally.
pass
13 changes: 13 additions & 0 deletions bleak/backends/bluezdbus/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,16 @@ def extract_service_handle_from_path(path):
return int(path[-4:], 16)
except Exception as e:
raise BleakError(f"Could not parse service handle from path: {path}") from e


def bdaddr_from_device_path(device_path: str) -> str:
"""
Scrape the Bluetooth address from a D-Bus device path.

Args:
device_path: The D-Bus object path of the device.

Returns:
A Bluetooth address as a string.
"""
return ":".join(device_path[-17:].split("_"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be faster to grab it and use .replace

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, this isn't used in any critical paths.

7 changes: 0 additions & 7 deletions bleak/backends/corebluetooth/CentralManagerDelegate.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ def init(self) -> Optional["CentralManagerDelegate"]:
self.event_loop = asyncio.get_running_loop()
self._connect_futures: Dict[NSUUID, asyncio.Future] = {}

self.last_rssi: Dict[str, int] = {}

self.callbacks: Dict[
int, Callable[[CBPeripheral, Dict[str, Any], int], None]
] = {}
Expand Down Expand Up @@ -108,9 +106,6 @@ def __del__(self):

@objc.python_method
async def start_scan(self, service_uuids) -> None:
# remove old
self.last_rssi.clear()

service_uuids = (
NSArray.alloc().initWithArray_(
list(map(CBUUID.UUIDWithString_, service_uuids))
Expand Down Expand Up @@ -254,8 +249,6 @@ def did_discover_peripheral(

uuid_string = peripheral.identifier().UUIDString()

self.last_rssi[uuid_string] = RSSI

for callback in self.callbacks.values():
if callback:
callback(peripheral, advertisementData, RSSI)
Expand Down
9 changes: 5 additions & 4 deletions bleak/backends/corebluetooth/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ def __init__(self, address_or_ble_device: Union[BLEDevice, str], **kwargs):
self._central_manager_delegate: Optional[CentralManagerDelegate] = None

if isinstance(address_or_ble_device, BLEDevice):
self._peripheral = address_or_ble_device.details
self._central_manager_delegate = address_or_ble_device.metadata["delegate"]
(
self._peripheral,
self._central_manager_delegate,
) = address_or_ble_device.details

self._services: Optional[NSArray] = None

Expand All @@ -77,8 +79,7 @@ async def connect(self, **kwargs) -> bool:
)

if device:
self._peripheral = device.details
self._central_manager_delegate = device.metadata["delegate"]
self._peripheral, self._central_manager_delegate = device.details
else:
raise BleakDeviceNotFoundError(
self.address, f"Device with address {self.address} was not found"
Expand Down
Loading