Skip to content
This repository was archived by the owner on Aug 15, 2019. It is now read-only.

Add support for generic arduino commands #55

Merged
merged 7 commits into from
Feb 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions robot/servo.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from enum import Enum
from typing import List

from robot.board import Board

Expand Down Expand Up @@ -82,6 +83,24 @@ def read(self):
return self._pin_read()


class ArduinoError(Exception):
"""Base class for exceptions fed back from the ``ServoBoard`` (arduino)."""

pass


class CommandError(ArduinoError):
"""The servo assembly experienced an error in processing a command."""

pass


class InvalidResponse(ArduinoError):
"""The servo assembly emitted a response which could not be processed."""

pass


class ServoBoard(Board):
"""
A servo board, providing access to ``Servo``s and ``Gpio`` pins.
Expand Down Expand Up @@ -112,6 +131,36 @@ def __init__(self, socket_path):
for x in gpio_pins
}

def direct_command(self, command_name: str, *args) -> List[str]:
"""
Issue a command directly to the arduino.

:Example:
>>> # arrives on the arduino as "my-command 4"
>>> servo_board.direct_command('my-command', 4)
["first line response from my command", "second line"]

The arguments to this method are bundled as a list and passed to robotd.
We expect to immediately get back a response message (as well as the
usual status blob) which contains either valid data from the arduino or
a description of the failure.
"""
command = (command_name,) + args
response = self._send_and_receive({'command': command})['response']
status = response['status']

# consume the broadcast status
self._receive()

if status == 'ok':
return response['data']
else:
for cls in (CommandError, InvalidResponse):
if cls.__name__ == response['type']:
raise cls(response['description'])

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we receive a response with an invalid type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point; I'll make that error harder.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raise ArduinoError(response['description'])

# Servo code

@property
Expand Down
243 changes: 243 additions & 0 deletions tests/test_servos.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import json
import socket
import time
import unittest
from unittest import mock

from robot.robot import Robot
from robot.servo import ArduinoError, CommandError, InvalidResponse, ServoBoard
from tests.mock_robotd import MockRobotD

from robotd.devices import ServoAssembly


class ServoBoardTest(unittest.TestCase):
def setUp(self):
Expand Down Expand Up @@ -79,3 +85,240 @@ def _try_position_expect(self, servo, board, value, expect):

def tearDown(self):
self.mock.stop()


class FakeSerialConnection:
def __init__(self, responses):
self.responses = responses
self.received = ''

@property
def lines_received(self):
return self.received.splitlines()

def write(self, content):
self.received += content.decode('utf-8')

def flush(self):
pass

def reset_input_buffer(self):
pass

def readline(self):
return self.responses.pop(0).encode('utf-8')


class Counter:
def __init__(self):
self.current = 0

def __call__(self, *args):
self.current += 1
return self.current


class FakeSocket:
def __init__(self, robotd_board):
self.robotd_board = robotd_board

def settimeout(self, timeout):
pass

def connect(self, location):
pass


class ServoBoardGenericCommandTest(unittest.TestCase):
longMessage = True

def assertCommands(self, expected_commands):
self.assertEqual(
expected_commands,
self.fake_connection.lines_received,
"Wrong commands send over serial connection.",
)

@staticmethod
def ok_response():
return '+ ok'

@staticmethod
def error_response(error):
return '- {}'.format(error)

@staticmethod
def message_response(message):
return '> {}'.format(message)

@staticmethod
def comment_response(comment):
return '# {}'.format(comment)

@staticmethod
def build_response_lines(command_number, messages):
return ['@{:d} {}'.format(command_number, x) for x in messages]

@staticmethod
def connect_socket_to_robotd_board(socket, robotd_board):
response_queue = [b'{"greetings": 1}']

def enqueue_response(response):
response_queue.append(json.dumps(response).encode('utf-8'))

def send(raw_data):
response = robotd_board.command(
json.loads(raw_data.decode('utf-8')),
)
# do what the BoardRunner would do
if response is not None:
enqueue_response({'response': response})

enqueue_response(robotd_board.status())

def receive(size):
return response_queue.pop(0) + b'\n'

socket.sendall = send
socket.recv = receive

return response_queue

def setUp(self):
self.command_counter = Counter()
self.mock_randint = mock.patch(
'robotd.devices.random.randint',
self.command_counter,
)
self.mock_randint.start()

self.fake_connection = FakeSerialConnection(
responses=self.build_response_lines(1, [
self.message_response('fw-version'),
self.ok_response(),
]),
)
self.mock_serial = mock.patch(
'robotd.devices.serial.Serial',
return_value=self.fake_connection,
)
self.mock_serial.start()

self._servo_assembly = ServoAssembly({'DEVNAME': None})
self._servo_assembly.make_safe = lambda: None
self._servo_assembly.start()

# remove the firmware version query that we don't care about
self.fake_connection.received = ''

self.spec_socket = socket.socket()
socket_mock = mock.Mock(spec=self.spec_socket)
self.mock_socket = mock.patch(
'socket.socket',
return_value=socket_mock,
autospec=True,
)
self.mock_socket.start()

self.response_queue = self.connect_socket_to_robotd_board(
socket_mock,
self._servo_assembly,
)
self.board = ServoBoard('')

def tearDown(self):
self.mock_socket.stop()
self.mock_serial.stop()
self.mock_randint.stop()

self.spec_socket.close()
self.board.close()

def test_command_ok(self):
RESPONSE = 'the-response'
self.fake_connection.responses += self.build_response_lines(2, [
self.message_response(RESPONSE),
self.ok_response(),
])

actual_response = self.board.direct_command('the-command')

self.assertEqual(
[RESPONSE],
actual_response,
"Wrong response to command",
)

self.assertCommands(['\0@2 the-command'])

self.assertEqual(
[],
self.response_queue,
"There should be no pending messages from the robotd board after "
"the command completes",
)

def test_error_command_error(self):
RESPONSE = 'the-response'
self.fake_connection.responses += self.build_response_lines(2, [
self.error_response(RESPONSE),
self.ok_response(),
])

with self.assertRaises(CommandError) as e_info:
self.board.direct_command('the-command')

self.assertIn(
RESPONSE,
str(e_info.exception),
"Wrong error message",
)

self.assertEqual(
[],
self.response_queue,
"There should be no pending messages from the robotd board after "
"the command completes",
)

def test_error_invalid_response(self):
RESPONSE = 'the-response'
self.fake_connection.responses += self.build_response_lines(2, [
RESPONSE,
])

with self.assertRaises(InvalidResponse) as e_info:
self.board.direct_command('the-command')

self.assertIn(
RESPONSE,
str(e_info.exception),
"Wrong error message",
)

self.assertEqual(
[],
self.response_queue,
"There should be no pending messages from the robotd board after "
"the command completes",
)

def test_unknown_error(self):
ERROR_MESSAGE = "fancy error description"

self._servo_assembly.command = mock.Mock(
return_value={
'status': 'error',
'type': 'newly-added-type',
'description': ERROR_MESSAGE,
},
)

with self.assertRaises(ArduinoError) as e_info:
self.board.direct_command('the-command')

self.assertIn(
ERROR_MESSAGE,
str(e_info.exception),
"Wrong error message",
)