-
Notifications
You must be signed in to change notification settings - Fork 3k
Add USB CDC and Serial tests #7755
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e30cc39
56b928f
1ff4c8a
ee36db2
8fdf965
6220f0c
3862ae4
db2f9af
3f4cb9f
a40f81e
4577608
045344a
c7d1a72
96598b9
331e1f3
e46ec47
db9895a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,346 @@ | ||
""" | ||
mbed SDK | ||
Copyright (c) 2018 ARM Limited | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
""" | ||
from __future__ import print_function | ||
import functools | ||
import itertools | ||
import time | ||
import threading | ||
import uuid | ||
import sys | ||
import serial | ||
import serial.tools.list_ports as stlp | ||
import mbed_host_tests | ||
|
||
|
||
MSG_KEY_DEVICE_READY = 'ready' | ||
MSG_KEY_SERIAL_NUMBER = 'usb_dev_sn' | ||
MSG_KEY_PORT_OPEN_WAIT = 'port_open_wait' | ||
MSG_KEY_PORT_OPEN_CLOSE = 'port_open_close' | ||
MSG_KEY_SEND_BYTES_SINGLE = 'send_single' | ||
MSG_KEY_SEND_BYTES_MULTIPLE = 'send_multiple' | ||
MSG_KEY_LOOPBACK = 'loopback' | ||
MSG_KEY_CHANGE_LINE_CODING = 'change_lc' | ||
|
||
RX_BUFF_SIZE = 32 | ||
|
||
# This delay eliminates the possibility of the device detecting | ||
# the port being closed when still waiting for data. | ||
TERM_CLOSE_DELAY = 0.01 | ||
|
||
# A duration the serial terminal is open on the host side | ||
# during terminal reopen test. | ||
TERM_REOPEN_DELAY = 0.1 | ||
|
||
# 6 (baud) + 2 (bits) + 1 (parity) + 1 (stop) + 3 * comma | ||
LINE_CODING_STRLEN = 13 | ||
|
||
|
||
def usb_serial_name(serial_number): | ||
"""Get USB serial device name based on the device serial number.""" | ||
if sys.platform.startswith('win'): | ||
# The USB spec defines all USB string descriptors to be | ||
# UNICODE UTF-16LE. Windows however, decodes the USB serial | ||
# number string descriptor as uppercase characters only. | ||
# To solve this issue, convert the pattern to uppercase. | ||
serial_number = str(serial_number).upper() | ||
for port_info in stlp.comports(): | ||
if port_info.serial_number == serial_number: | ||
return port_info.device | ||
return None | ||
|
||
|
||
class RetryError(Exception): | ||
"""Exception raised by retry_fun_call().""" | ||
|
||
|
||
def retry_fun_call(fun, num_retries=3, retry_delay=0.0): | ||
"""Call fun and retry if any exception was raised. | ||
|
||
fun is called at most num_retries with a retry_dalay in between calls. | ||
Raises RetryError if the retry limit is exhausted. | ||
""" | ||
verbose = False | ||
final_err = None | ||
for retry in range(1, num_retries + 1): | ||
try: | ||
return fun() # pylint: disable=not-callable | ||
except Exception as exc: # pylint: disable=broad-except | ||
final_err = exc | ||
if verbose: | ||
print('Retry {}/{} failed ({})' | ||
.format(retry, num_retries, str(fun))) | ||
time.sleep(retry_delay) | ||
err_msg = 'Failed with "{}". Tried {} times.' | ||
raise RetryError(err_msg.format(final_err, num_retries)) | ||
|
||
|
||
class USBSerialTest(mbed_host_tests.BaseHostTest): | ||
"""Host side test for USB CDC & Serial classes.""" | ||
|
||
_BYTESIZES = { | ||
5: serial.FIVEBITS, | ||
6: serial.SIXBITS, | ||
7: serial.SEVENBITS, | ||
8: serial.EIGHTBITS} | ||
_PARITIES = { | ||
0: serial.PARITY_NONE, | ||
1: serial.PARITY_ODD, | ||
2: serial.PARITY_EVEN, | ||
3: serial.PARITY_MARK, | ||
4: serial.PARITY_SPACE} | ||
_STOPBITS = { | ||
0: serial.STOPBITS_ONE, | ||
1: serial.STOPBITS_ONE_POINT_FIVE, | ||
2: serial.STOPBITS_TWO} | ||
|
||
@staticmethod | ||
def get_usb_serial_name(usb_id_str): | ||
"""Get USB serial device name as registered in the system. | ||
|
||
Search is based on the unique USB SN generated by the host | ||
during test suite setup. | ||
Raises RuntimeError if the device is not found. | ||
""" | ||
port_name = usb_serial_name(usb_id_str) | ||
if port_name is None: | ||
err_msg = 'USB serial device (SN={}) not found.' | ||
raise RuntimeError(err_msg.format(usb_id_str)) | ||
return port_name | ||
|
||
def __init__(self): | ||
super(USBSerialTest, self).__init__() | ||
self.__bg_task = None | ||
self.dut_usb_dev_sn = uuid.uuid4().hex # 32 hex digit string | ||
|
||
def port_open_wait(self): | ||
"""Open the serial and wait until it's closed by the device.""" | ||
mbed_serial = serial.Serial(dsrdtr=False) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just setting the default |
||
mbed_serial.dtr = False | ||
try: | ||
mbed_serial.port = retry_fun_call( | ||
fun=functools.partial(self.get_usb_serial_name, self.dut_usb_dev_sn), # pylint: disable=not-callable | ||
num_retries=20, | ||
retry_delay=0.05) | ||
retry_fun_call( | ||
fun=mbed_serial.open, | ||
num_retries=20, | ||
retry_delay=0.05) | ||
except RetryError as exc: | ||
self.log('TEST ERROR: {}'.format(exc)) | ||
self.notify_complete(False) | ||
return | ||
mbed_serial.dtr = True | ||
try: | ||
mbed_serial.read() # wait until closed | ||
except (serial.portNotOpenError, serial.SerialException): | ||
pass | ||
|
||
def port_open_close(self): | ||
"""Open the serial and close it with a delay.""" | ||
mbed_serial = serial.Serial(timeout=0.5, write_timeout=0.1, dsrdtr=False) | ||
mbed_serial.dtr = False | ||
try: | ||
mbed_serial.port = retry_fun_call( | ||
fun=functools.partial(self.get_usb_serial_name, self.dut_usb_dev_sn), # pylint: disable=not-callable | ||
num_retries=20, | ||
retry_delay=0.05) | ||
retry_fun_call( | ||
fun=mbed_serial.open, | ||
num_retries=20, | ||
retry_delay=0.05) | ||
except RetryError as exc: | ||
self.log('TEST ERROR: {}'.format(exc)) | ||
self.notify_complete(False) | ||
return | ||
mbed_serial.reset_output_buffer() | ||
mbed_serial.dtr = True | ||
time.sleep(TERM_REOPEN_DELAY) | ||
mbed_serial.close() | ||
|
||
def send_data_sequence(self, chunk_size=1): | ||
"""Open the serial and send a sequence of values. | ||
|
||
chunk_size defines the size of data sent in each write operation. | ||
The input buffer content is discarded. | ||
""" | ||
mbed_serial = serial.Serial(write_timeout=0.1, dsrdtr=False) | ||
try: | ||
mbed_serial.port = retry_fun_call( | ||
fun=functools.partial(self.get_usb_serial_name, self.dut_usb_dev_sn), # pylint: disable=not-callable | ||
num_retries=20, | ||
retry_delay=0.05) | ||
retry_fun_call( | ||
fun=mbed_serial.open, | ||
num_retries=20, | ||
retry_delay=0.05) | ||
except RetryError as exc: | ||
self.log('TEST ERROR: {}'.format(exc)) | ||
self.notify_complete(False) | ||
return | ||
mbed_serial.reset_output_buffer() | ||
mbed_serial.dtr = True | ||
for byteval in itertools.chain(reversed(range(0x100)), range(0x100)): | ||
try: | ||
payload = bytearray(chunk_size * (byteval,)) | ||
mbed_serial.write(payload) | ||
# self.log('SENT: {!r}'.format(payload)) | ||
# Discard input buffer content. The data received from the | ||
# device during the concurrent rx/tx test is irrelevant. | ||
mbed_serial.reset_input_buffer() | ||
except serial.SerialException as exc: | ||
self.log('TEST ERROR: {}'.format(exc)) | ||
self.notify_complete(False) | ||
return | ||
while mbed_serial.out_waiting > 0: | ||
time.sleep(0.001) | ||
time.sleep(TERM_CLOSE_DELAY) | ||
mbed_serial.close() | ||
|
||
def loopback(self): | ||
"""Open the serial and send back every byte received.""" | ||
mbed_serial = serial.Serial(timeout=0.5, write_timeout=0.1, dsrdtr=False) | ||
mbed_serial.dtr = False | ||
try: | ||
mbed_serial.port = retry_fun_call( | ||
fun=functools.partial(self.get_usb_serial_name, self.dut_usb_dev_sn), # pylint: disable=not-callable | ||
num_retries=20, | ||
retry_delay=0.05) | ||
retry_fun_call( | ||
fun=mbed_serial.open, | ||
num_retries=20, | ||
retry_delay=0.05) | ||
except RetryError as exc: | ||
self.log('TEST ERROR: {}'.format(exc)) | ||
self.notify_complete(False) | ||
return | ||
mbed_serial.reset_output_buffer() | ||
mbed_serial.dtr = True | ||
try: | ||
payload = mbed_serial.read(1) | ||
while len(payload) == 1: | ||
mbed_serial.write(payload) | ||
# self.log('SENT: {!r}'.format(payload)) | ||
payload = mbed_serial.read(1) | ||
except serial.SerialException as exc: | ||
self.log('TEST ERROR: {}'.format(exc)) | ||
self.notify_complete(False) | ||
return | ||
while mbed_serial.out_waiting > 0: | ||
time.sleep(0.001) | ||
time.sleep(TERM_CLOSE_DELAY) | ||
mbed_serial.close() | ||
|
||
def change_line_coding(self): | ||
"""Open the serial and change serial params according to device request. | ||
|
||
New line coding params are read from the device serial data. | ||
""" | ||
mbed_serial = serial.Serial(timeout=0.5, dsrdtr=False) | ||
mbed_serial.dtr = False | ||
try: | ||
mbed_serial.port = retry_fun_call( | ||
fun=functools.partial(self.get_usb_serial_name, self.dut_usb_dev_sn), # pylint: disable=not-callable | ||
num_retries=20, | ||
retry_delay=0.05) | ||
retry_fun_call( | ||
fun=mbed_serial.open, | ||
num_retries=20, | ||
retry_delay=0.05) | ||
except RetryError as exc: | ||
self.log('TEST ERROR: {}'.format(exc)) | ||
self.notify_complete(False) | ||
return | ||
mbed_serial.reset_output_buffer() | ||
mbed_serial.dtr = True | ||
try: | ||
payload = mbed_serial.read(LINE_CODING_STRLEN) | ||
while len(payload) == LINE_CODING_STRLEN: | ||
baud, bits, parity, stop = (int(i) for i in payload.split(',')) | ||
new_line_coding = { | ||
'baudrate': baud, | ||
'bytesize': self._BYTESIZES[bits], | ||
'parity': self._PARITIES[parity], | ||
'stopbits': self._STOPBITS[stop]} | ||
mbed_serial.apply_settings(new_line_coding) | ||
payload = mbed_serial.read(LINE_CODING_STRLEN) | ||
except serial.SerialException as exc: | ||
self.log('TEST ERROR: {}'.format(exc)) | ||
self.notify_complete(False) | ||
return | ||
time.sleep(TERM_CLOSE_DELAY) | ||
mbed_serial.close() | ||
|
||
def setup(self): | ||
self.register_callback(MSG_KEY_DEVICE_READY, self.cb_device_ready) | ||
self.register_callback(MSG_KEY_PORT_OPEN_WAIT, self.cb_port_open_wait) | ||
self.register_callback(MSG_KEY_PORT_OPEN_CLOSE, self.cb_port_open_close) | ||
self.register_callback(MSG_KEY_SEND_BYTES_SINGLE, self.cb_send_bytes_single) | ||
self.register_callback(MSG_KEY_SEND_BYTES_MULTIPLE, self.cb_send_bytes_multiple) | ||
self.register_callback(MSG_KEY_LOOPBACK, self.cb_loopback) | ||
self.register_callback(MSG_KEY_CHANGE_LINE_CODING, self.cb_change_line_coding) | ||
|
||
def cb_device_ready(self, key, value, timestamp): | ||
"""Send a unique USB SN to the device. | ||
|
||
DUT uses this SN every time it connects to host as a USB device. | ||
""" | ||
self.send_kv(MSG_KEY_SERIAL_NUMBER, self.dut_usb_dev_sn) | ||
|
||
def start_bg_task(self, **thread_kwargs): | ||
"""Start a new daemon thread. | ||
|
||
The callbacks delegate serial handling to a background task to | ||
prevent any delays in the device side assert handling. Only one | ||
background task is kept running to prevent multiple access | ||
to serial. | ||
""" | ||
try: | ||
self.__bg_task.join() | ||
except (AttributeError, RuntimeError): | ||
pass | ||
self.__bg_task = threading.Thread(**thread_kwargs) | ||
self.__bg_task.daemon = True | ||
self.__bg_task.start() | ||
|
||
def cb_port_open_wait(self, key, value, timestamp): | ||
"""Open the serial and wait until it's closed by the device.""" | ||
self.start_bg_task(target=self.port_open_wait) | ||
|
||
def cb_port_open_close(self, key, value, timestamp): | ||
"""Open the serial and close it with a delay.""" | ||
self.start_bg_task(target=self.port_open_close) | ||
|
||
def cb_send_bytes_single(self, key, value, timestamp): | ||
"""Open the serial and send a sequence of values.""" | ||
self.start_bg_task( | ||
target=self.send_data_sequence, | ||
args=(1, )) | ||
|
||
def cb_send_bytes_multiple(self, key, value, timestamp): | ||
"""Open the serial and send a sequence of one byte values.""" | ||
chunk_size = RX_BUFF_SIZE * int(value) | ||
self.start_bg_task( | ||
target=self.send_data_sequence, | ||
args=(chunk_size, )) | ||
|
||
def cb_loopback(self, key, value, timestamp): | ||
"""Open the serial and send a sequence of multibyte values.""" | ||
self.start_bg_task(target=self.loopback) | ||
|
||
def cb_change_line_coding(self, key, value, timestamp): | ||
"""Open the serial and change the line coding.""" | ||
self.start_bg_task(target=self.change_line_coding) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
# `udev` rules for Mbed USB CDC device | ||
|
||
Before running `tests-usb_device-serial` test suite on Debian-based Linux | ||
distros, make sure to [update the `udev` rules][1] as follows: | ||
|
||
```bash | ||
sudo tee /etc/udev/rules.d/99-ttyacms.rules >/dev/null <<EOF | ||
ATTRS{idVendor}=="1f00" ATTRS{idProduct}=="2013", ENV{ID_MM_DEVICE_IGNORE}="1" | ||
ATTRS{idVendor}=="1f00" ATTRS{idProduct}=="2012", ENV{ID_MM_DEVICE_IGNORE}="1" | ||
EOF | ||
sudo udevadm control --reload-rules | ||
``` | ||
|
||
This will prevent the `ModemManager` daemon from automatically opening the port | ||
and sending the `AT commands`, which it does for every new `/dev/ttyACM` device | ||
registered in system. | ||
|
||
[1]: https://linux-tips.com/t/prevent-modem-manager-to-capture-usb-serial-devices/284 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are retries needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Operations like finding the serial port and opening it have to be retried due to variable delays on the host side. You can check how this works on your machine after setting
verbose
in line https://github.com/fkjagodzinski/mbed-os/blob/082c63e16a11b14a25b9b327843d2e8b1d7db408/TESTS/host_tests/usb_device_serial.py#L55