Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ dependencies = [
"bluetooth-adapters ~= 0.16",
"cachetools ~= 5.3",
"dbus-fast ~= 2.15",
"pybricks ~= 3.6.0",
"pybricks @ git+https://github.com/fkleon/pybricks-api@ble-type-fixes",
]

[dependency-groups]
Expand All @@ -31,6 +31,7 @@ dev = [
"mypy ~= 1.16.0",
"pytest ~= 8.3",
"pytest-asyncio ~= 1.0.0",
"pytest-mock ~= 3.14.1",
"python-dbusmock ~= 0.34.3",
"ruff ~= 0.11.8",
"types-cachetools ~= 5.3",
Expand Down
22 changes: 10 additions & 12 deletions src/pb_ble/vhub.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import sys
from contextlib import AsyncExitStack
from typing import ClassVar, Optional, Sequence, Tuple, Union, cast
from typing import ClassVar, Sequence, cast

from dbus_fast.aio import MessageBus, ProxyObject
from dbus_fast.constants import BusType
Expand Down Expand Up @@ -30,6 +30,13 @@ class VirtualBLE(_common.BLE, AsyncExitStack):

_adv: PybricksBroadcastAdvertisement
"""The current data broadcast."""
_device_version: str
"""The version string configured for this VirtualBLE device."""

_broadcaster: BlueZBroadcaster
"""The broadcaster to use."""
_observer: BlueZPybricksObserver
"""The observer to use."""

def __init__(
self,
Expand Down Expand Up @@ -73,20 +80,11 @@ async def broadcast(self, *data: PybricksBroadcastValue | None) -> None: # type
await self._broadcaster.broadcast(self._adv)
self._adv.message = cast(PybricksBroadcastData, tuple(data))

def observe(
self, channel: int
) -> Optional[Tuple[Union[bool, int, float, str, bytes], ...]]:
def observe(self, channel: int) -> PybricksBroadcastData | None:
advertisement = self._observer.observe(channel)

if advertisement is not None:
if isinstance(advertisement.data, tuple):
return advertisement.data
else:
# TODO: Pybricks does expose single-value broadcasts
# in a single-object tuple. However, that doesn't match
# the type signature of the observe() method. To adhere
# to the type signature, we only return wrapped values.
return (advertisement.data,)
return advertisement.data
else:
return None

Expand Down
6 changes: 3 additions & 3 deletions tests/test_broadcaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async def test_stop_broadcaster(self, message_bus, adapter):
# THEN the advertisements are removed
assert len(broadcaster.advertisements) == 0

async def test_broadcast(self, broadcaster):
async def test_broadcast(self, broadcaster: BlueZBroadcaster):
# GIVEN a broadcast
adv = BroadcastAdvertisement(
broadcaster.name,
Expand All @@ -69,7 +69,7 @@ async def test_broadcast(self, broadcaster):
assert adv.path in broadcaster.advertisements

@pytest.mark.skip_on_bluez_mock("Does not implement release timeout")
async def test_broadcast_release(self, broadcaster):
async def test_broadcast_release(self, broadcaster: BlueZBroadcaster):
# GIVEN a broadcast
semaphore = Semaphore(1)
adv = BroadcastAdvertisement(
Expand Down Expand Up @@ -97,7 +97,7 @@ async def test_broadcast_release(self, broadcaster):

# TODO: test that it's unexported from the bus

async def test_broadcast_twice(self, broadcaster):
async def test_broadcast_twice(self, broadcaster: BlueZBroadcaster):
# GIVEN a broadcast
adv = BroadcastAdvertisement(broadcaster.name)

Expand Down
4 changes: 2 additions & 2 deletions tests/test_observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def test_create_observer(self, message_bus):
assert observer.rssi_threshold == -100
assert observer.device_pattern == "Pybricks"

async def test_observe(self, adapter, observer):
async def test_observe(self, adapter, observer: BlueZPybricksObserver):
# WHEN a channel is observed
data = observer.observe(0)

Expand Down Expand Up @@ -64,7 +64,7 @@ async def test_create_observer(self, message_bus, adapter):
assert observer.rssi_threshold is None
assert observer.device_pattern == "Name"

async def test_observe(self, adapter, observer):
async def test_observe(self, adapter, observer: BlueZPybricksObserver):
# WHEN a channel is observed
data = observer.observe(0)

Expand Down
18 changes: 18 additions & 0 deletions tests/test_vhub.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import pytest
import pytest_asyncio
from pytest_mock import MockerFixture

from pb_ble import get_virtual_ble
from pb_ble.bluezdbus.observer import ObservedAdvertisement


@pytest_asyncio.fixture(autouse=True)
Expand All @@ -24,6 +26,22 @@ async def test_observe_none(self):
assert data is None
assert not ble._broadcaster.is_broadcasting()

async def test_observe_single(self, mocker: MockerFixture):
ble = await get_virtual_ble(broadcast_channel=1, observe_channels=[2])
observe_mock = mocker.patch.object(ble._observer, "observe")
observe_mock.return_value = ObservedAdvertisement(data=b"val", rssi=0)
data = ble.observe(2)
assert data == b"val"

async def test_observe_multiple(self, mocker: MockerFixture):
ble = await get_virtual_ble(broadcast_channel=1, observe_channels=[2])
observe_mock = mocker.patch.object(ble._observer, "observe")
observe_mock.return_value = ObservedAdvertisement(
data=(b"val", 0, True, 1.0, "str"), rssi=0
)
data = ble.observe(2)
assert data == (b"val", 0, True, 1.0, "str")

async def test_broadcast_single(self):
ble = await get_virtual_ble(broadcast_channel=1, observe_channels=[2])
await ble.broadcast(42)
Expand Down
2 changes: 1 addition & 1 deletion tools/pybricks_virtual_ble.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async def observe(vble: _common.BLE, observe_channel: int, interval: float = 1.0
data = vble.observe(observe_channel)
rssi = vble.signal_strength(observe_channel)
if data:
print(f"Observation: '{data}' [{rssi} dBm]")
print(f"Observation: '{data!r}' [{rssi} dBm]")


async def broadcast(vble: _common.BLE, interval: float = 10.0):
Expand Down