Skip to content

Commit

Permalink
BleakScanner: Add async iterator scanning capability
Browse files Browse the repository at this point in the history
Enable using `BleakScanner()` as an async iterator (async for),
combining detection_callback and filtering functionality.
BleakScanner().devices() method has the same functionality with option
to provide timeout.
  • Loading branch information
bojanpotocnik committed Jul 7, 2023
1 parent 2abe80d commit 7e5b4a1
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 13 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ and this project adheres to `Semantic Versioning <https://semver.org/spec/v2.0.0
Changed
-------
- Improved error messages when failing to get services in WinRT backend.
- Made ``BleakScanner`` an async iterator, added ``BleakScanner.devices()`` async iterator method.

Fixed
-----
- Fix handling all access denied errors when enumerating characteristics on Windows. Fixes #1291.
- Added support for 32bit UUIDs. Fixes #1314
- Added support for 32bit UUIDs. Fixes #1314

`0.20.2`_ (2023-04-19)
======================
Expand Down
69 changes: 58 additions & 11 deletions bleak/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
Type,
Union,
overload,
AsyncIterator,
)
from warnings import warn

Expand Down Expand Up @@ -148,12 +149,12 @@ def __init__(
**kwargs,
)

async def __aenter__(self):
await self._backend.start()
async def __aenter__(self) -> "BleakScanner":
await self.start()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._backend.stop()
await self.stop()

def register_detection_callback(
self, callback: Optional[AdvertisementDataCallback]
Expand Down Expand Up @@ -184,6 +185,7 @@ async def start(self):
async def stop(self):
"""Stop scanning for devices"""
await self._backend.stop()
self._backend.new_devices = None

def set_scanning_filter(self, **kwargs):
"""
Expand All @@ -204,6 +206,55 @@ def set_scanning_filter(self, **kwargs):
)
self._backend.set_scanning_filter(**kwargs)

def __aiter__(self) -> AsyncIterator[Tuple[BLEDevice, AdvertisementData]]:
self._backend.new_devices = asyncio.Queue()
return self

async def __anext__(self) -> Tuple[BLEDevice, AdvertisementData]:
return await self._backend.new_devices.get()

@overload
def devices(
self, timeout: Optional[float] = None, return_adv: Literal[False] = False
) -> AsyncIterator[BLEDevice]:
...

@overload
def devices(
self, timeout: Optional[float] = None, return_adv: Literal[True] = True
) -> AsyncIterator[Tuple[BLEDevice, AdvertisementData]]:
...

async def devices(self, timeout: Optional[float] = None, return_adv: bool = False):
"""
Continuously scan for devices and yield them.
Args:
timeout:
The maximum duration, in seconds, to scan for each individual device.
This is not a total scan duration. Each invocation of ``next()`` will
wait for up to ``timeout`` seconds for a new device to be discovered
before stopping the iteration.
return_adv:
If ``True``, the return value will also include advertising data.
Returns:
An async iterator that yields instances of :class:`BLEDevice` if ``return_adv``
parameter is ``False`` or tuple of (:class:`BLEDevice`, :class:`AdvertisementData`)
if the parameter is ``True``.
"""
self._backend.new_devices = asyncio.Queue()
while True:
try:
bd, ad = await asyncio.wait_for(
self._backend.new_devices.get(), timeout=timeout
)
yield (bd, ad) if return_adv else bd
except asyncio.TimeoutError:
break
finally:
self._backend.new_devices = None

@overload
@classmethod
async def discover(
Expand Down Expand Up @@ -360,16 +411,12 @@ async def find_device_by_filter(
the timeout.
"""
found_device_queue: asyncio.Queue[BLEDevice] = asyncio.Queue()

def apply_filter(d: BLEDevice, ad: AdvertisementData):
if filterfunc(d, ad):
found_device_queue.put_nowait(d)

async with cls(detection_callback=apply_filter, **kwargs):
async with cls(**kwargs) as scanner:
try:
async with async_timeout(timeout):
return await found_device_queue.get()
async for d, ad in scanner:
if filterfunc(d, ad):
return d
except asyncio.TimeoutError:
return None

Expand Down
8 changes: 7 additions & 1 deletion bleak/backends/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ def __init__(
)

self.seen_devices = {}
self.new_devices: Optional[
asyncio.Queue[Tuple[BLEDevice, AdvertisementData]]
] = None

def register_detection_callback(
self, callback: Optional[AdvertisementDataCallback]
Expand Down Expand Up @@ -215,7 +218,10 @@ def create_or_update_device(
**metadata,
)

self.seen_devices[address] = (device, adv)
dev = (device, adv)
self.seen_devices[address] = dev
if self.new_devices:
self.new_devices.put_nowait(dev)

return device

Expand Down
35 changes: 35 additions & 0 deletions examples/scan_iterator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""
Scan/Discovery Async Iterator
--------------
Example showing how to scan for BLE devices using async iterator instead of callback function
Created on 2023-07-27 by bojanpotocnik <info@bojanpotocnik.com>
"""
import asyncio

from bleak import BleakScanner


async def main():
x = 5
print(f"Scanning for {x} devices...")
async with BleakScanner() as scanner:
async for bd, ad in scanner:
print(f"{x}. {bd!r} with {ad!r}")
x -= 1
if x == 0:
break

x = 6
print(f"\nScanning for a device with name longer than {x} characters...")
async with BleakScanner() as scanner:
async for bd, ad in scanner.devices(timeout=10, return_adv=True):
print(f"Found{' it' if len(bd.name) > x else ''} {bd!r} with {ad!r}")
if len(bd.name or "") > x or len(ad.local_name or "") > x:
break


if __name__ == "__main__":
asyncio.run(main())

0 comments on commit 7e5b4a1

Please sign in to comment.