-
Notifications
You must be signed in to change notification settings - Fork 638
usb2can-libusb driver for native 8devices CAN on OSX and other platforms #979
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
bri3d
wants to merge
11
commits into
hardbyte:main
Choose a base branch
from
bri3d:8devices-libusb
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
c240aed
usb2can-libusb driver for native 8devices CAN on OSX and other platforms
bri3d cdd24f6
Satisfy the linter and add basic docs
bri3d da68a66
refactor to enable testing, test low-level functions to extent possible
bri3d c2638b2
lint tests - the docs only run lint against can dir, maybe need a fix…
bri3d 2e8efe1
Merge branch 'develop' into 8devices-libusb
mergify[bot] ec5f888
Merge branch 'develop' into 8devices-libusb
mergify[bot] 6c5e57b
Merge branch 'develop' into 8devices-libusb
mergify[bot] a6a982f
Merge branch 'develop' into 8devices-libusb
mergify[bot] 55e41a4
Merge branch 'develop' into 8devices-libusb
mergify[bot] a6154be
Merge branch 'develop' into 8devices-libusb
mergify[bot] 394e7fb
Merge branch 'develop' into 8devices-libusb
mergify[bot] File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
""" | ||
""" | ||
|
||
from .usb2can_libusb_bus import Usb2CanLibUsbBus |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
import logging | ||
import queue | ||
from threading import Thread | ||
|
||
from .can_8dev_usb_utils import * | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
try: | ||
import usb.core | ||
import usb.util | ||
except ImportError: | ||
logger.warning( | ||
"The PyUSB module is not installed. Install it using `python3 -m pip install pyusb`" | ||
) | ||
|
||
|
||
class Can8DevUSBDevice: | ||
cmd_rx_ep: usb.core.Endpoint | ||
cmd_tx_ep: usb.core.Endpoint | ||
data_rx_ep: usb.core.Endpoint | ||
data_tx_ep: usb.core.Endpoint | ||
serial_number: str | ||
_close: bool | ||
_rx_queue: queue.Queue | ||
_recv_thread: Thread | ||
|
||
def __init__(self, serial_number=None): | ||
if serial_number is not None: | ||
dev = usb.core.find( | ||
idVendor=USB_8DEV_VENDOR_ID, | ||
idProduct=USB_8DEV_PRODUCT_ID, | ||
serial_number=serial_number, | ||
) | ||
else: | ||
dev = usb.core.find( | ||
idVendor=USB_8DEV_VENDOR_ID, idProduct=USB_8DEV_PRODUCT_ID | ||
) | ||
|
||
if dev is None: | ||
raise ValueError( | ||
"8Devices CAN interface not found! Serial number provided: %s" | ||
% serial_number | ||
) | ||
|
||
self.serial_number = dev.serial_number | ||
|
||
dev.reset() | ||
# set the active configuration. With no arguments, the first | ||
# configuration will be the active one | ||
dev.set_configuration() | ||
|
||
# get an endpoint instance | ||
cfg = dev.get_active_configuration() | ||
intf = cfg[(0, 0)] | ||
|
||
self.cmd_rx_ep: usb.core.Endpoint = usb.util.find_descriptor( | ||
intf, bEndpointAddress=USB_8DEV_ENDP_CMD_RX | ||
) | ||
self.cmd_tx_ep: usb.core.Endpoint = usb.util.find_descriptor( | ||
intf, bEndpointAddress=USB_8DEV_ENDP_CMD_TX | ||
) | ||
self.data_rx_ep: usb.core.Endpoint = usb.util.find_descriptor( | ||
intf, bEndpointAddress=USB_8DEV_ENDP_DATA_RX | ||
) | ||
self.data_tx_ep: usb.core.Endpoint = usb.util.find_descriptor( | ||
intf, bEndpointAddress=USB_8DEV_ENDP_DATA_TX | ||
) | ||
|
||
if ( | ||
self.cmd_rx_ep is None | ||
or self.cmd_tx_ep is None | ||
or self.data_rx_ep is None | ||
or self.data_tx_ep is None | ||
): | ||
raise ValueError("Could not configure 8Devices CAN endpoints!") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By now, a |
||
|
||
self._rx_queue = queue.Queue(MAX_8DEV_RECV_QUEUE) | ||
|
||
def _recv_thread_loop(self): | ||
while True: | ||
byte_buffer = bytes() | ||
try: | ||
# We must read the full possible buffer size each iteration or we risk a buffer overrun exception losing data. | ||
byte_buffer = self.data_rx_ep.read(512, 0).tobytes() | ||
except Exception: | ||
pass | ||
for i in range(0, len(byte_buffer), 21): | ||
# We could have read multiple frames in a single bulk xfer | ||
self._rx_queue.put(Can8DevRxFrame(byte_buffer[i : i + 21])) | ||
if self._close: | ||
return | ||
|
||
def _start_recv_thread(self): | ||
self._close = False | ||
self._recv_thread = Thread(target=self._recv_thread_loop, daemon=True) | ||
self._recv_thread.start() | ||
|
||
def _stop_recv_thread(self): | ||
self._close = True | ||
|
||
def send_command(self, cmd: Can8DevCommandFrame): | ||
self.cmd_tx_ep.write(cmd.to_bytes()) | ||
return Can8DevCommandFrame.from_bytes(self.cmd_rx_ep.read(16)) | ||
|
||
def open( | ||
self, | ||
phase_seg1: int, | ||
phase_seg2: int, | ||
sjw: int, | ||
brp: int, | ||
loopback: bool = False, | ||
listenonly: bool = False, | ||
oneshot: bool = False, | ||
): | ||
self.send_command(Can8DevCommandFrame(Can8DevCommand.USB_8DEV_RESET)) | ||
open_command = can_8dev_open_frame( | ||
phase_seg1, phase_seg2, sjw, brp, loopback, listenonly, oneshot | ||
) | ||
if self.send_command(open_command).opt1 == 0: | ||
self._start_recv_thread() | ||
return True | ||
else: | ||
return False | ||
|
||
def close(self): | ||
self._stop_recv_thread() | ||
close_command = Can8DevCommand.USB_8DEV_CLOSE | ||
self.send_command(Can8DevCommandFrame(close_command)) | ||
|
||
def recv(self, timeout=None): | ||
try: | ||
return self._rx_queue.get(True, timeout=timeout / 1000) | ||
except queue.Empty: | ||
return None | ||
|
||
def send(self, tx_frame: Can8DevTxFrame, timeout=None): | ||
self.data_tx_ep.write(tx_frame.to_bytes(), timeout) | ||
|
||
def get_version(self): | ||
cmd_response = self.send_command( | ||
Can8DevCommandFrame(Can8DevCommand.USB_8DEV_GET_SOFTW_HARDW_VER) | ||
) | ||
version = int.from_bytes(cmd_response.data[0:4], byteorder="big") | ||
return version | ||
|
||
def get_firmware_version(self): | ||
version = self.get_version() | ||
return "%d.%d" % ((version >> 24) & 0xFF, (version >> 16) & 0xFF) | ||
|
||
def get_serial_number(self): | ||
return self.serial_number |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,221 @@ | ||
from enum import Enum | ||
|
||
MAX_8DEV_RECV_QUEUE = 128 # Maximum number of slots in the recv queue | ||
|
||
USB_8DEV_VENDOR_ID = ( | ||
0x0483 | ||
) # Unfortunately this is actually the ST Microelectronics Vendor ID | ||
USB_8DEV_PRODUCT_ID = 0x1234 # Unfortunately this is pretty bogus | ||
USB_8DEV_PRODUCT_STRING = "USB2CAN converter" # So we use this instead. Not great. | ||
|
||
USB_8DEV_ABP_CLOCK = 32000000 | ||
|
||
# USB Bulk Endpoint identifiers | ||
|
||
USB_8DEV_ENDP_DATA_RX = 0x81 | ||
USB_8DEV_ENDP_DATA_TX = 0x2 | ||
USB_8DEV_ENDP_CMD_RX = 0x83 | ||
USB_8DEV_ENDP_CMD_TX = 0x4 | ||
|
||
# Open Device Options | ||
|
||
USB_8DEV_SILENT = 0x01 | ||
USB_8DEV_LOOPBACK = 0x02 | ||
USB_8DEV_DISABLE_AUTO_RESTRANS = 0x04 | ||
USB_8DEV_STATUS_FRAME = 0x08 | ||
|
||
# Command options | ||
USB_8DEV_BAUD_MANUAL = 0x09 | ||
USB_8DEV_CMD_START = 0x11 | ||
USB_8DEV_CMD_END = 0x22 | ||
|
||
USB_8DEV_CMD_SUCCESS = 0 | ||
USB_8DEV_CMD_ERROR = 255 | ||
|
||
USB_8DEV_CMD_TIMEOUT = 1000 | ||
|
||
# Framing definitions | ||
USB_8DEV_DATA_START = 0x55 | ||
USB_8DEV_DATA_END = 0xAA | ||
|
||
USB_8DEV_TYPE_CAN_FRAME = 0 | ||
USB_8DEV_TYPE_ERROR_FRAME = 3 | ||
|
||
USB_8DEV_EXTID = 0x01 | ||
USB_8DEV_RTR = 0x02 | ||
USB_8DEV_ERR_FLAG = 0x04 | ||
|
||
# Status messages | ||
USB_8DEV_STATUSMSG_OK = 0x00 | ||
USB_8DEV_STATUSMSG_OVERRUN = 0x01 # Overrun occured when sending */ | ||
USB_8DEV_STATUSMSG_BUSLIGHT = 0x02 # Error counter has reached 96 */ | ||
USB_8DEV_STATUSMSG_BUSHEAVY = 0x03 # Error count. has reached 128 */ | ||
USB_8DEV_STATUSMSG_BUSOFF = 0x04 # Device is in BUSOFF */ | ||
USB_8DEV_STATUSMSG_STUFF = 0x20 # Stuff Error */ | ||
USB_8DEV_STATUSMSG_FORM = 0x21 # Form Error */ | ||
USB_8DEV_STATUSMSG_ACK = 0x23 # Ack Error */ | ||
USB_8DEV_STATUSMSG_BIT0 = 0x24 # Bit1 Error */ | ||
USB_8DEV_STATUSMSG_BIT1 = 0x25 # Bit0 Error */ | ||
USB_8DEV_STATUSMSG_CRC = 0x27 # CRC Error */ | ||
|
||
USB_8DEV_RP_MASK = 0x7F # Mask for Receive Error Bit */ | ||
|
||
# Available Commands | ||
|
||
|
||
class Can8DevCommand(Enum): | ||
USB_8DEV_RESET = 1 # Reset Device | ||
USB_8DEV_OPEN = 2 # Open Port | ||
USB_8DEV_CLOSE = 3 # Close Port | ||
USB_8DEV_SET_SPEED = 4 | ||
USB_8DEV_SET_MASK_FILTER = ( | ||
5 | ||
) # Unfortunately unknown parameters and supposedly un-implemented on early firmwares | ||
USB_8DEV_GET_STATUS = 6 | ||
USB_8DEV_GET_STATISTICS = 7 | ||
USB_8DEV_GET_SERIAL = 8 | ||
USB_8DEV_GET_SOFTW_VER = 9 | ||
USB_8DEV_GET_HARDW_VER = 0xA | ||
USB_8DEV_RESET_TIMESTAMP = 0xB | ||
USB_8DEV_GET_SOFTW_HARDW_VER = 0xC | ||
|
||
|
||
class Can8DevTxFrame: | ||
flags: int | ||
id: int | ||
dlc: int | ||
data: bytes | ||
|
||
def __init__( | ||
self, can_id: int, dlc: int, data: bytes, is_ext: bool, is_remote: bool | ||
): | ||
self.can_id = can_id | ||
self.dlc = dlc | ||
self.data = data | ||
self.flags = 0 | ||
if is_ext: | ||
self.flags |= USB_8DEV_EXTID | ||
if is_remote: | ||
self.flags |= USB_8DEV_RTR | ||
|
||
def _pad_data(self, data: bytes): | ||
data_bytes = bytearray(8) | ||
for i in range(0, 7): | ||
if i < len(data): | ||
data_bytes[i] = data[i] | ||
return bytes(data_bytes) | ||
|
||
def to_bytes(self): | ||
cmd_buf = bytearray() | ||
cmd_buf.append(USB_8DEV_DATA_START) | ||
cmd_buf.append(self.flags) | ||
id_bytes = self.can_id.to_bytes(4, byteorder="big") | ||
cmd_buf.extend(id_bytes) | ||
cmd_buf.append(self.dlc) | ||
cmd_buf.extend(self._pad_data(self.data)) | ||
cmd_buf.append(USB_8DEV_DATA_END) | ||
return bytes(cmd_buf) | ||
|
||
|
||
class Can8DevRxFrame: | ||
data: bytes | ||
id: int | ||
dlc: int | ||
timestamp: int | ||
ext_id: bool | ||
is_error: bool | ||
is_remote: bool | ||
|
||
def __init__(self, bytes_in: bytes): | ||
if len(bytes_in) != 21: | ||
raise ValueError("Did not receive 21 bytes for 8Dev Data Frame") | ||
if bytes_in[0] != USB_8DEV_DATA_START: | ||
raise ValueError("Did not receive a valid 8Dev Data Frame") | ||
if bytes_in[1] == USB_8DEV_TYPE_CAN_FRAME: | ||
self.data = bytes_in[8:16] | ||
self.dlc = bytes_in[7] | ||
self.ext_id = bytes_in[2] & USB_8DEV_EXTID | ||
self.is_remote = bytes_in[2] & USB_8DEV_RTR | ||
self.id = int.from_bytes(bytes_in[3:7], byteorder="big") | ||
self.timestamp = int.from_bytes(bytes_in[16:20], byteorder="big") | ||
self.is_error = False | ||
elif bytes_in[1] == USB_8DEV_TYPE_ERROR_FRAME: | ||
self.is_error = True | ||
self.data = bytes_in[7:15] | ||
self.timestamp = int.from_bytes(bytes_in[16:20], byteorder="big") | ||
else: | ||
raise ValueError("8Dev Data Frame with Unknown Type") | ||
|
||
|
||
class Can8DevCommandFrame: | ||
command: Can8DevCommand | ||
opt1: int | ||
opt2: int | ||
data: bytes | ||
|
||
def __init__(self, command, data=bytes(), opt1=0, opt2=0): | ||
self.command = command | ||
self.data = data | ||
self.opt1 = opt1 | ||
self.opt2 = opt2 | ||
|
||
def _pad_data(self, data: bytes): | ||
data_bytes = bytearray(10) | ||
for i in range(0, 9): | ||
if i < len(data): | ||
data_bytes[i] = data[i] | ||
return bytes(data_bytes) | ||
|
||
def to_bytes(self): | ||
cmd_buf = bytearray() | ||
cmd_buf.append(USB_8DEV_CMD_START) | ||
cmd_buf.append(0) # Supposedly could be a channel value, but unknown | ||
cmd_buf.append(self.command.value) | ||
cmd_buf.append(self.opt1) | ||
cmd_buf.append(self.opt2) | ||
cmd_buf.extend(self._pad_data(self.data)) | ||
cmd_buf.append(USB_8DEV_CMD_END) | ||
return bytes(cmd_buf) | ||
|
||
def from_bytes(byte_input: bytes): | ||
if len(byte_input) != 16: | ||
raise ValueError("Did not receive 16 bytes for 8Dev Command Frame") | ||
return Can8DevCommandFrame( | ||
Can8DevCommand(byte_input[2]), | ||
byte_input[5:15], | ||
byte_input[3], | ||
byte_input[4], | ||
) | ||
|
||
|
||
def can_8dev_open_frame( | ||
phase_seg1: int, | ||
phase_seg2: int, | ||
sjw: int, | ||
brp: int, | ||
loopback: bool = False, | ||
listenonly: bool = False, | ||
oneshot: bool = False, | ||
) -> Can8DevCommandFrame: | ||
open_command = Can8DevCommand.USB_8DEV_OPEN | ||
opt1 = USB_8DEV_BAUD_MANUAL | ||
flags = 0 | ||
if loopback: | ||
flags |= USB_8DEV_LOOPBACK | ||
if listenonly: | ||
flags |= USB_8DEV_SILENT | ||
if oneshot: | ||
flags |= USB_8DEV_DISABLE_AUTO_RESTRANS | ||
flags_bytes = flags.to_bytes(4, "big") | ||
brp_bytes = brp.to_bytes(2, "big") | ||
data = bytearray(10) | ||
data[0] = phase_seg1 | ||
data[1] = phase_seg2 | ||
data[2] = sjw | ||
data[3] = brp_bytes[0] | ||
data[4] = brp_bytes[1] | ||
data[5] = flags_bytes[0] | ||
data[6] = flags_bytes[1] | ||
data[7] = flags_bytes[2] | ||
data[8] = flags_bytes[3] | ||
return Can8DevCommandFrame(open_command, data, opt1) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No worries with raising a value error here, but I'd like to give a backend specific exception to the user.
See #1046 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By now, a
CanInitializationError
seems more appropriate.