Skip to content

The gs_usb interface support #905

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Nov 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions can/interfaces/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"systec": ("can.interfaces.systec", "UcanBus"),
"seeedstudio": ("can.interfaces.seeedstudio", "SeeedBus"),
"cantact": ("can.interfaces.cantact", "CantactBus"),
"gs_usb": ("can.interfaces.gs_usb", "GsUsbBus"),
}

BACKENDS.update(
Expand Down
118 changes: 118 additions & 0 deletions can/interfaces/gs_usb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
from typing import cast, Any, Iterator, List, Optional, Sequence, Tuple, Union

from gs_usb.gs_usb import GsUsb
from gs_usb.gs_usb_frame import GsUsbFrame
from gs_usb.constants import CAN_ERR_FLAG, CAN_RTR_FLAG, CAN_EFF_FLAG, CAN_MAX_DLC
import can
import usb
import logging


logger = logging.getLogger(__name__)


class GsUsbBus(can.BusABC):
def __init__(self, channel, bus, address, bitrate, can_filters=None, **kwargs):
"""
:param channel: usb device name
:param bus: number of the bus that the device is connected to
:param address: address of the device on the bus it is connected to
:param can_filters: not supported
:param bitrate: CAN network bandwidth (bits/s)
"""
gs_usb = GsUsb.find(bus=bus, address=address)
if not gs_usb:
raise can.CanError("Can not find device {}".format(channel))
self.gs_usb = gs_usb
self.channel_info = channel

self.gs_usb.set_bitrate(bitrate)
self.gs_usb.start()

super().__init__(channel=channel, can_filters=can_filters, **kwargs)

def send(self, msg: can.Message, timeout: Optional[float] = None):
"""Transmit a message to the CAN bus.

:param Message msg: A message object.
:param timeout: timeout is not supported.
The function won't return until message is sent or exception is raised.

:raises can.CanError:
if the message could not be sent
"""
can_id = msg.arbitration_id

if msg.is_extended_id:
can_id = can_id | CAN_EFF_FLAG

if msg.is_remote_frame:
can_id = can_id | CAN_RTR_FLAG

if msg.is_error_frame:
can_id = can_id | CAN_ERR_FLAG

# Pad message data
msg.data.extend([0x00] * (CAN_MAX_DLC - len(msg.data)))

frame = GsUsbFrame()
frame.can_id = can_id
frame.can_dlc = msg.dlc
frame.timestamp_us = int(msg.timestamp * 1000000)
frame.data = list(msg.data)

try:
self.gs_usb.send(frame)
except usb.core.USBError:
raise can.CanError("The message can not be sent")

def _recv_internal(
self, timeout: Optional[float]
) -> Tuple[Optional[can.Message], bool]:
"""
Read a message from the bus and tell whether it was filtered.
This methods may be called by :meth:`~can.BusABC.recv`
to read a message multiple times if the filters set by
:meth:`~can.BusABC.set_filters` do not match and the call has
not yet timed out.

:param float timeout: seconds to wait for a message,
see :meth:`~can.BusABC.send`
0 and None will be converted to minimum value 1ms.

:return:
1. a message that was read or None on timeout
2. a bool that is True if message filtering has already
been done and else False. In this interface it is always False
since filtering is not available

:raises can.CanError:
if an error occurred while reading
"""
frame = GsUsbFrame()

# Do not set timeout as None or zero here to avoid blocking
timeout_ms = round(timeout * 1000) if timeout else 1
if not self.gs_usb.read(frame=frame, timeout_ms=timeout_ms):
return None, False

msg = can.Message(
timestamp=frame.timestamp,
arbitration_id=frame.arbitration_id,
is_extended_id=frame.can_dlc,
is_remote_frame=frame.is_remote_frame,
is_error_frame=frame.is_error_frame,
channel=self.channel_info,
dlc=frame.can_dlc,
data=bytearray(frame.data)[0 : frame.can_dlc],
is_rx=True,
)

return msg, False

def shutdown(self):
"""
Called to carry out any interface specific cleanup required
in shutting down a bus.
"""
self.gs_usb.stop()
1 change: 1 addition & 0 deletions doc/interfaces.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ The available interfaces are:
interfaces/canalystii
interfaces/systec
interfaces/seeedstudio
interfaces/gs_usb

Additional interfaces can be added via a plugin interface. An external package
can register a new interface by using the ``can.interface`` entry point in its setup.py.
Expand Down
52 changes: 52 additions & 0 deletions doc/interfaces/gs_usb.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
.. _gs_usb:

CAN driver for Geschwister Schneider USB/CAN devices and bytewerk.org candleLight USB CAN interfaces
==================================================================================================================

Windows/Linux/Mac CAN driver based on usbfs or WinUSB WCID for Geschwister Schneider USB/CAN devices and candleLight USB CAN interfaces.

Install: ``pip install "python-can[gs_usb]"``

Usage: pass ``bus`` and ``address`` to open the device. The parameters can be got by ``pyusb`` as shown below:

::

import usb
import can

dev = usb.core.find(idVendor=0x1D50, idProduct=0x606F)
bus = can.Bus(bustype="gs_usb", channel=dev.product, bus=dev.bus, address=dev.address, bitrate=250000)


Supported devices
-----------------

Geschwister Schneider USB/CAN devices and bytewerk.org candleLight USB CAN interfaces such as candleLight, canable, cantact, etc.


Supported platform
------------------

Windows, Linux and Mac.

Note: Since ``pyusb`` with ```libusb0``` as backend is used, ``libusb-win32`` usb driver is required to be installed in Windows.


Supplementary Info on ``gs_usb``
-----------------------------------

The firmware implementation for Geschwister Schneider USB/CAN devices and candleLight USB CAN can be found in `candle-usb/candleLight_fw <https://github.com/candle-usb/candleLight_fw>`_.
The Linux kernel driver can be found in `linux/drivers/net/can/usb/gs_usb.c <https://github.com/torvalds/linux/blob/master/drivers/net/can/usb/gs_usb.c>`_.

The ``gs_usb`` interface in ``PythonCan`` relys on upstream ``gs_usb`` package, which can be found in `https://pypi.org/project/gs-usb/ <https://pypi.org/project/gs-usb/>`_ or `https://github.com/jxltom/gs_usb <https://github.com/jxltom/gs_usb>`_.
The ``gs_usb`` package is using ``pyusb`` as backend, which brings better crossplatform compatibility.

Note: The bitrate ``10K``, ``20K``, ``50K``, ``83.333K``, ``100K``, ``125K``, ``250K``, ``500K``, ``800K`` and ``1M`` are supported in this interface, as implemented in the upstream ``gs_usb`` package's ``set_bitrate`` method.

Note: Message filtering is not supported in Geschwister Schneider USB/CAN devices and bytewerk.org candleLight USB CAN interfaces.

Bus
---

.. autoclass:: can.interfaces.gs_usb.GsUsbBus
:members:
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"serial": ["pyserial~=3.0"],
"neovi": ["python-ics>=2.12"],
"cantact": ["cantact>=0.0.7"],
"gs_usb": ["gs_usb>=0.2.1"],
}

setup(
Expand Down