Skip to content

Commit

Permalink
Working code for BB8 joystick
Browse files Browse the repository at this point in the history
  • Loading branch information
undera committed Aug 4, 2019
1 parent 9f347c2 commit 6da6797
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 83 deletions.
101 changes: 21 additions & 80 deletions examples/bb8joystick/__init__.py
Original file line number Diff line number Diff line change
@@ -1,95 +1,36 @@
import asyncio
import logging
import sys
import time

import spheropy
# noinspection PyProtectedMember
from spheropy.spheropy import _ClientCommandPacket, _DEVICE_ID_CORE
from examples.bb8joystick import joystick
from examples.bb8joystick.bb8 import BB8
from examples.bb8joystick.joystick import Joystick

if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG if 'pydevd' in sys.modules else logging.WARNING)

class BLEInterfaceGattool(spheropy.BleInterface):
def _find_adapter(self):
adapter = spheropy.pygatt.GATTToolBackend()
adapter.start()
adapter_type = spheropy.BleInterface.BleAdapterType.PYGATT

self._adapter = adapter
self._adapter_type = adapter_type

return True


class _SpheroImproved(spheropy.Sphero):
async def connect(self, search_name=None, address=None, port=None, bluetooth_interface=None, use_ble=False,
num_retry_attempts=1):
gattool = BLEInterfaceGattool(search_name)
return await super().connect(search_name, address, port, gattool, use_ble, num_retry_attempts)

async def sleep(self, sleeptime, reset_inactivity_timeout=True, response_timeout_in_seconds=None):
# port from https://github.com/jchadwhite/SpheroBB8-python/blob/master/BB8_driver.py#L394
command = _ClientCommandPacket(device_id=_DEVICE_ID_CORE,
command_id=0x22,
sequence_number=self._get_and_increment_command_sequence_number(),
data=[(sleeptime >> 8), (sleeptime & 0xff), 0],
wait_for_response=True,
reset_inactivity_timeout=reset_inactivity_timeout)

return await self._send_command(command, response_timeout_in_seconds)


class BB8(object):
def __init__(self, name):
# marry sync with async https://www.aeracode.org/2018/02/19/python-async-simplified/
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)

print("Started to wake up BB-8...")
self._sphero = _SpheroImproved()
self._loop.run_until_complete(self._sphero.connect(num_retry_attempts=3, use_ble=True, search_name=name))
self._loop.run_until_complete(self._sphero.set_stabilization(True))
self.stabilize()
self.color(0, 0xFF, 0)
print("BB-8 is ready for commands")

def disconnect(self):
self._loop.run_until_complete(self._sphero.sleep(0))
self._sphero.disconnect()

def color(self, red, green, blue):
self._loop.run_until_complete(self._sphero.set_rgb_led(red, green, blue))

def heading(self, heading):
self._loop.run_until_complete(self._sphero.set_heading(heading))
bb8 = BB8("BB-CC13")
joystick = Joystick()

def roll(self, speed=1.0):
speed = int(255 * speed)
self._loop.run_until_complete(self._sphero.roll(speed, 0))

def stop(self):
self._loop.run_until_complete(self._sphero.roll(0, 0))
def set_bb_color(r, g, b):
print("Color", r, g, b)
bb8.color(r, g, b)

def stabilize(self):
self._loop.run_until_complete(self._sphero.self_level())

def set_heading(angle):
a = int(angle) % 360
if a < 0:
a = 360 - a
print("Angle", a)
bb8.heading(a)

if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG if 'pydevd' in sys.modules else logging.WARNING)

bb8 = BB8("BB-CC13")
try:
# bb8.color(0xFF, 0x00, 0xFF)
bb8.color(0x00, 0x00, 0x00)

bb8._loop.run_until_complete(bb8._sphero.set_back_led(254))
time.sleep(3)

for x in range(0, 359, 90):
print(x)
bb8.heading(x)
bb8.roll(0.25)
time.sleep(1)
bb8.stop()
bb8.stabilize()
# joystick.on_color_sensor(set_bb_color)
joystick.on_external_motor(set_heading)
print("All set up")
time.sleep(60)
finally:
joystick.disconnect()
bb8.disconnect()
93 changes: 93 additions & 0 deletions examples/bb8joystick/bb8.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import asyncio
import time

import spheropy
# noinspection PyProtectedMember
from spheropy.spheropy import _ClientCommandPacket, _DEVICE_ID_CORE, _DEVICE_ID_SPHERO


class BLEInterfaceGattool(spheropy.BleInterface):
def _find_adapter(self):
adapter = spheropy.pygatt.GATTToolBackend()
adapter.start()
adapter_type = spheropy.BleInterface.BleAdapterType.PYGATT

self._adapter = adapter
self._adapter_type = adapter_type

return True


class _SpheroImproved(spheropy.Sphero):
async def connect(self, search_name=None, address=None, port=None, bluetooth_interface=None, use_ble=False,
num_retry_attempts=1):
gattool = BLEInterfaceGattool(search_name)
return await super().connect(search_name, address, port, gattool, use_ble, num_retry_attempts)

async def sleep(self, sleeptime, reset_inactivity_timeout=True, response_timeout_in_seconds=None):
# port from https://github.com/jchadwhite/SpheroBB8-python/blob/master/BB8_driver.py#L394
command = _ClientCommandPacket(device_id=_DEVICE_ID_CORE,
command_id=0x22,
sequence_number=self._get_and_increment_command_sequence_number(),
data=[(sleeptime >> 8), (sleeptime & 0xff), 0],
wait_for_response=False,
reset_inactivity_timeout=reset_inactivity_timeout)

return await self._send_command(command, response_timeout_in_seconds)

async def set_rotation_rate(self, rate, reset_inactivity_timeout=True, response_timeout_in_seconds=None):
# port from https://github.com/jchadwhite/SpheroBB8-python/blob/master/BB8_driver.py
command = _ClientCommandPacket(device_id=_DEVICE_ID_SPHERO,
command_id=0x03,
sequence_number=self._get_and_increment_command_sequence_number(),
data=[rate],
wait_for_response=False,
reset_inactivity_timeout=reset_inactivity_timeout)

return await self._send_command(command, response_timeout_in_seconds)


class BB8(object):
def __init__(self, name):
# marry sync with async https://www.aeracode.org/2018/02/19/python-async-simplified/
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)

print("Started to wake up BB-8...")
self._sphero = _SpheroImproved()
self._loop.run_until_complete(self._sphero.connect(num_retry_attempts=3, use_ble=True, search_name=name))
# self._loop.run_until_complete(self._sphero.set_stabilization(True))
# self._loop.run_until_complete(self._sphero.set_rotation_rate(1))
self.color(0, 0xFF, 0)
# self.stabilize()
print("BB-8 is ready for commands")

def disconnect(self):
self._loop.run_until_complete(self._sphero.sleep(0))
self._sphero.disconnect()

def color(self, red, green, blue):
self._wait_loop()
self._loop.run_until_complete(self._sphero.set_rgb_led(red, green, blue, wait_for_response=False))

def heading(self, heading):
self._wait_loop()
self._loop.run_until_complete(self._sphero.roll(1, heading, spheropy.RollMode.IN_PLACE_ROTATE))
# self._loop.run_until_complete(self._sphero.set_heading(heading))

def roll(self, speed=1.0):
self._wait_loop()
speed = int(255 * speed)
self._loop.run_until_complete(self._sphero.roll(speed, 0))

def stop(self):
self._wait_loop()
self._loop.run_until_complete(self._sphero.roll(0, 0))

def stabilize(self):
self._wait_loop()
self._loop.run_until_complete(self._sphero.self_level())

def _wait_loop(self):
while self._loop.is_running():
time.sleep(0.001)
18 changes: 18 additions & 0 deletions examples/bb8joystick/joystick.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from pylgbst.hub import MoveHub
from pylgbst.peripherals import VisionSensor, EncodedMotor


class Joystick(object):
def __init__(self):
super(Joystick, self).__init__()
self._hub = MoveHub()
self._sensor = []

def disconnect(self):
self._hub.disconnect()

def on_color_sensor(self, callback):
self._hub.vision_sensor.subscribe(callback, VisionSensor.COLOR_RGB, granularity=5)

def on_external_motor(self, callback):
self._hub.motor_external.subscribe(callback, EncodedMotor.SENSOR_ANGLE, granularity=5)
3 changes: 0 additions & 3 deletions pylgbst/peripherals.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,6 @@ def _decode_port_data(self, msg):
if self._port_mode.mode == self.SENSOR_ANGLE:
angle = unpack("<l", data[0:4])[0]
return (angle,)
elif self._port_mode.mode == self.SENSOR_SOMETHING1:
smth = usbyte(data, 0)
return (smth,)
elif self._port_mode.mode == self.SENSOR_SPEED:
speed = unpack("<b", data[0])[0]
return (speed,)
Expand Down

0 comments on commit 6da6797

Please sign in to comment.