Skip to content

Commit

Permalink
BleakScanner: Add async iterator scanning capability
Browse files Browse the repository at this point in the history
Add `observe()` async iterator method to the `BleakScanner` which yields
results of the ongoing scan.
  • Loading branch information
bojanpotocnik committed Jul 19, 2023
1 parent 6d603aa commit f1c8af1
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ and this project adheres to `Semantic Versioning <https://semver.org/spec/v2.0.0
Changed
-------
- Improved error messages when failing to get services in WinRT backend.
- Added ``observe()`` async iterator method to ``BleakScanner``.
- Added ``scan_iterator.py`` example.

Fixed
-----
Expand Down
40 changes: 32 additions & 8 deletions bleak/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
Type,
Union,
overload,
AsyncIterator,
)
from warnings import warn

Expand Down Expand Up @@ -204,6 +205,33 @@ def set_scanning_filter(self, **kwargs):
)
self._backend.set_scanning_filter(**kwargs)

async def observe(self) -> AsyncIterator[Tuple[BLEDevice, AdvertisementData]]:
"""
Yields devices and their advertising data as they are discovered.
.. note::
Ensure that scanning is started before calling this method.
Returns:
An async iterator that yields tuples (:class:`BLEDevice`, :class:`AdvertisementData`).
"""
# noinspection PyProtectedMember
if self._backend._callback:
raise BleakError(
"Cannot use async iterator methods when detection_callback is used"
)

devices = asyncio.Queue()

self._backend.register_detection_callback(
lambda bd, ad: devices.put_nowait((bd, ad))
)
try:
while True:
yield await devices.get()
finally:
self._backend.register_detection_callback(None)

@overload
@classmethod
async def discover(
Expand Down Expand Up @@ -360,16 +388,12 @@ async def find_device_by_filter(
the timeout.
"""
found_device_queue: asyncio.Queue[BLEDevice] = asyncio.Queue()

def apply_filter(d: BLEDevice, ad: AdvertisementData):
if filterfunc(d, ad):
found_device_queue.put_nowait(d)

async with cls(detection_callback=apply_filter, **kwargs):
async with cls(**kwargs) as scanner:
try:
async with async_timeout(timeout):
return await found_device_queue.get()
async for bd, ad in scanner.observe():
if filterfunc(bd, ad):
return bd
except asyncio.TimeoutError:
return None

Expand Down
6 changes: 6 additions & 0 deletions docs/api/scanner.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ For event-driven programming, you can provide a ``detection_callback`` callback
to the :class:`BleakScanner` constructor. This will be called back each time
and advertisement is received.

Alternatively, you can utilize the asynchronous iterator to iterate over
advertisements as they are received. The method below returns an async iterator
that yields the same tuples as otherwise provided to ``detection_callback``.

.. automethod:: bleak.BleakScanner.observe

Otherwise, you can use one of the properties below after scanning has stopped.

.. autoproperty:: bleak.BleakScanner.discovered_devices
Expand Down
103 changes: 103 additions & 0 deletions examples/scan_iterator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
"""
Scan/Discovery Async Iterator
--------------
Example showing how to scan for BLE devices using async iterator instead of callback function
Created on 2023-07-07 by bojanpotocnik <info@bojanpotocnik.com>
"""
import asyncio
import contextlib
from typing import AsyncIterator, Tuple

from bleak import BleakScanner, BLEDevice, AdvertisementData


async def normal_usage():
n = 5
print(f"Scanning for {n} devices...")
async with BleakScanner() as scanner:
async for bd, ad in scanner.observe():
print(f"{n}. {bd!r} with {ad!r}")
n -= 1
if n == 0:
break

n = 6
print(f"\nScanning for a device with name longer than {n} characters...")
async with BleakScanner() as scanner:
async for bd, ad in scanner.observe():
found = len(bd.name or "") > n or len(ad.local_name or "") > n
print(f"Found{' it' if found else ''} {bd!r} with {ad!r}")
if found:
break


async def deterministic_cleanup():
# If you want to use async iterator again immediately after it is done,
# ensure that the previous one is closed and resources released.
# Cleanup of asynchronous generators is a tricky case since they can
# be cleaned up by multiple cases at once (unrolling as the generator
# finishes, cancellation, or closing as the generator is garbage
# collected), the order of which is implementation dependent.
# The proper way to address this is not to rely on implicit cleanup,
# but await aclose() method or use contextlib.aclosing().

# Helper method to avoid code duplication, could also be used in `normal_usage()`,
# but that would make the `normal_usage()` example less clear.
async def iterate_devices(
iterator: AsyncIterator[Tuple[BLEDevice, AdvertisementData]], n: int
):
print(f"Scanning for {n} devices...")
async for bd, ad in iterator:
print(f"{n}. {bd!r} with {ad!r}")
n -= 1
if n == 0:
break

async with BleakScanner() as scanner:
print("\nUsing the same scanner async iterator immediately after it is done...")

if hasattr(contextlib, "aclosing"): # New in version 3.10
print("Using contextlib.aclosing() to ensure cleanup...")

# You can try removing any context managers below, e.g. change
# async with contextlib.aclosing(scanner.observe()) as observer:
# await iterate_devices(observer, 2)
# to
# await iterate_devices(scanner.observe(), 2)
# to see how the cleanup is affected. Most likely `finally:` block in `observe()`
# will not reset the callback in time and BleakError will be raised on next call.

async with contextlib.aclosing(scanner.observe()) as observer:
await iterate_devices(observer, 2)

async with contextlib.aclosing(scanner.observe()) as observer:
await iterate_devices(observer, 3)
await iterate_devices(observer, 4)

else:
print("Using AsyncIterator.aclose() to ensure cleanup...")

observer = scanner.observe()
await iterate_devices(observer, 2)
await observer.aclose() # Try commenting this

observer = scanner.observe()
await iterate_devices(observer, 3)
await observer.aclose() # or this

await iterate_devices(scanner.observe(), 6)
# This one can be cleaned implicitly, as we do not care about reusing the same scanner anymore


async def main():
await normal_usage()

# Note about deterministic cleanup when using async
await deterministic_cleanup()


if __name__ == "__main__":
asyncio.run(main())

0 comments on commit f1c8af1

Please sign in to comment.