diff --git a/examples/bb8joystick/__init__.py b/examples/bb8joystick/__init__.py index 759ad2b..f09f7d9 100644 --- a/examples/bb8joystick/__init__.py +++ b/examples/bb8joystick/__init__.py @@ -1,95 +1,36 @@ -import asyncio import logging import sys import time -import spheropy -# noinspection PyProtectedMember -from spheropy.spheropy import _ClientCommandPacket, _DEVICE_ID_CORE +from examples.bb8joystick import joystick +from examples.bb8joystick.bb8 import BB8 +from examples.bb8joystick.joystick import Joystick +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG if 'pydevd' in sys.modules else logging.WARNING) -class BLEInterfaceGattool(spheropy.BleInterface): - def _find_adapter(self): - adapter = spheropy.pygatt.GATTToolBackend() - adapter.start() - adapter_type = spheropy.BleInterface.BleAdapterType.PYGATT - - self._adapter = adapter - self._adapter_type = adapter_type - - return True - - -class _SpheroImproved(spheropy.Sphero): - async def connect(self, search_name=None, address=None, port=None, bluetooth_interface=None, use_ble=False, - num_retry_attempts=1): - gattool = BLEInterfaceGattool(search_name) - return await super().connect(search_name, address, port, gattool, use_ble, num_retry_attempts) - - async def sleep(self, sleeptime, reset_inactivity_timeout=True, response_timeout_in_seconds=None): - # port from https://github.com/jchadwhite/SpheroBB8-python/blob/master/BB8_driver.py#L394 - command = _ClientCommandPacket(device_id=_DEVICE_ID_CORE, - command_id=0x22, - sequence_number=self._get_and_increment_command_sequence_number(), - data=[(sleeptime >> 8), (sleeptime & 0xff), 0], - wait_for_response=True, - reset_inactivity_timeout=reset_inactivity_timeout) - - return await self._send_command(command, response_timeout_in_seconds) - - -class BB8(object): - def __init__(self, name): - # marry sync with async https://www.aeracode.org/2018/02/19/python-async-simplified/ - self._loop = asyncio.new_event_loop() - asyncio.set_event_loop(self._loop) - - print("Started to wake up BB-8...") - self._sphero = _SpheroImproved() - self._loop.run_until_complete(self._sphero.connect(num_retry_attempts=3, use_ble=True, search_name=name)) - self._loop.run_until_complete(self._sphero.set_stabilization(True)) - self.stabilize() - self.color(0, 0xFF, 0) - print("BB-8 is ready for commands") - - def disconnect(self): - self._loop.run_until_complete(self._sphero.sleep(0)) - self._sphero.disconnect() - - def color(self, red, green, blue): - self._loop.run_until_complete(self._sphero.set_rgb_led(red, green, blue)) - - def heading(self, heading): - self._loop.run_until_complete(self._sphero.set_heading(heading)) + bb8 = BB8("BB-CC13") + joystick = Joystick() - def roll(self, speed=1.0): - speed = int(255 * speed) - self._loop.run_until_complete(self._sphero.roll(speed, 0)) - def stop(self): - self._loop.run_until_complete(self._sphero.roll(0, 0)) + def set_bb_color(r, g, b): + print("Color", r, g, b) + bb8.color(r, g, b) - def stabilize(self): - self._loop.run_until_complete(self._sphero.self_level()) + def set_heading(angle): + a = int(angle) % 360 + if a < 0: + a = 360 - a + print("Angle", a) + bb8.heading(a) -if __name__ == "__main__": - logging.basicConfig(level=logging.DEBUG if 'pydevd' in sys.modules else logging.WARNING) - bb8 = BB8("BB-CC13") try: - # bb8.color(0xFF, 0x00, 0xFF) - bb8.color(0x00, 0x00, 0x00) - - bb8._loop.run_until_complete(bb8._sphero.set_back_led(254)) - time.sleep(3) - - for x in range(0, 359, 90): - print(x) - bb8.heading(x) - bb8.roll(0.25) - time.sleep(1) - bb8.stop() - bb8.stabilize() + # joystick.on_color_sensor(set_bb_color) + joystick.on_external_motor(set_heading) + print("All set up") + time.sleep(60) finally: + joystick.disconnect() bb8.disconnect() diff --git a/examples/bb8joystick/bb8.py b/examples/bb8joystick/bb8.py new file mode 100644 index 0000000..1625a36 --- /dev/null +++ b/examples/bb8joystick/bb8.py @@ -0,0 +1,93 @@ +import asyncio +import time + +import spheropy +# noinspection PyProtectedMember +from spheropy.spheropy import _ClientCommandPacket, _DEVICE_ID_CORE, _DEVICE_ID_SPHERO + + +class BLEInterfaceGattool(spheropy.BleInterface): + def _find_adapter(self): + adapter = spheropy.pygatt.GATTToolBackend() + adapter.start() + adapter_type = spheropy.BleInterface.BleAdapterType.PYGATT + + self._adapter = adapter + self._adapter_type = adapter_type + + return True + + +class _SpheroImproved(spheropy.Sphero): + async def connect(self, search_name=None, address=None, port=None, bluetooth_interface=None, use_ble=False, + num_retry_attempts=1): + gattool = BLEInterfaceGattool(search_name) + return await super().connect(search_name, address, port, gattool, use_ble, num_retry_attempts) + + async def sleep(self, sleeptime, reset_inactivity_timeout=True, response_timeout_in_seconds=None): + # port from https://github.com/jchadwhite/SpheroBB8-python/blob/master/BB8_driver.py#L394 + command = _ClientCommandPacket(device_id=_DEVICE_ID_CORE, + command_id=0x22, + sequence_number=self._get_and_increment_command_sequence_number(), + data=[(sleeptime >> 8), (sleeptime & 0xff), 0], + wait_for_response=False, + reset_inactivity_timeout=reset_inactivity_timeout) + + return await self._send_command(command, response_timeout_in_seconds) + + async def set_rotation_rate(self, rate, reset_inactivity_timeout=True, response_timeout_in_seconds=None): + # port from https://github.com/jchadwhite/SpheroBB8-python/blob/master/BB8_driver.py + command = _ClientCommandPacket(device_id=_DEVICE_ID_SPHERO, + command_id=0x03, + sequence_number=self._get_and_increment_command_sequence_number(), + data=[rate], + wait_for_response=False, + reset_inactivity_timeout=reset_inactivity_timeout) + + return await self._send_command(command, response_timeout_in_seconds) + + +class BB8(object): + def __init__(self, name): + # marry sync with async https://www.aeracode.org/2018/02/19/python-async-simplified/ + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + + print("Started to wake up BB-8...") + self._sphero = _SpheroImproved() + self._loop.run_until_complete(self._sphero.connect(num_retry_attempts=3, use_ble=True, search_name=name)) + # self._loop.run_until_complete(self._sphero.set_stabilization(True)) + # self._loop.run_until_complete(self._sphero.set_rotation_rate(1)) + self.color(0, 0xFF, 0) + # self.stabilize() + print("BB-8 is ready for commands") + + def disconnect(self): + self._loop.run_until_complete(self._sphero.sleep(0)) + self._sphero.disconnect() + + def color(self, red, green, blue): + self._wait_loop() + self._loop.run_until_complete(self._sphero.set_rgb_led(red, green, blue, wait_for_response=False)) + + def heading(self, heading): + self._wait_loop() + self._loop.run_until_complete(self._sphero.roll(1, heading, spheropy.RollMode.IN_PLACE_ROTATE)) + # self._loop.run_until_complete(self._sphero.set_heading(heading)) + + def roll(self, speed=1.0): + self._wait_loop() + speed = int(255 * speed) + self._loop.run_until_complete(self._sphero.roll(speed, 0)) + + def stop(self): + self._wait_loop() + self._loop.run_until_complete(self._sphero.roll(0, 0)) + + def stabilize(self): + self._wait_loop() + self._loop.run_until_complete(self._sphero.self_level()) + + def _wait_loop(self): + while self._loop.is_running(): + time.sleep(0.001) diff --git a/examples/bb8joystick/joystick.py b/examples/bb8joystick/joystick.py new file mode 100644 index 0000000..7747f66 --- /dev/null +++ b/examples/bb8joystick/joystick.py @@ -0,0 +1,18 @@ +from pylgbst.hub import MoveHub +from pylgbst.peripherals import VisionSensor, EncodedMotor + + +class Joystick(object): + def __init__(self): + super(Joystick, self).__init__() + self._hub = MoveHub() + self._sensor = [] + + def disconnect(self): + self._hub.disconnect() + + def on_color_sensor(self, callback): + self._hub.vision_sensor.subscribe(callback, VisionSensor.COLOR_RGB, granularity=5) + + def on_external_motor(self, callback): + self._hub.motor_external.subscribe(callback, EncodedMotor.SENSOR_ANGLE, granularity=5) diff --git a/pylgbst/peripherals.py b/pylgbst/peripherals.py index cd2c2dd..834adda 100644 --- a/pylgbst/peripherals.py +++ b/pylgbst/peripherals.py @@ -420,9 +420,6 @@ def _decode_port_data(self, msg): if self._port_mode.mode == self.SENSOR_ANGLE: angle = unpack("