Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
316 changes: 200 additions & 116 deletions can/interfaces/vector/canlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
Any,
Dict,
Callable,
cast,
)

WaitForSingleObject: Optional[Callable[[int, int], int]]
Expand All @@ -43,7 +44,7 @@
deprecated_args_alias,
time_perfcounter_correlation,
)
from can.typechecking import AutoDetectedConfig, CanFilters, Channel
from can.typechecking import AutoDetectedConfig, CanFilters

# Define Module Logger
# ====================
Expand Down Expand Up @@ -152,6 +153,7 @@ def __init__(
if xldriver is None:
raise CanInterfaceNotImplementedError("The Vector API has not been loaded")
self.xldriver = xldriver # keep reference so mypy knows it is not None
self.xldriver.xlOpenDriver()

self.poll_interval = poll_interval

Expand All @@ -165,7 +167,7 @@ def __init__(
self.channels = [int(ch) for ch in channel]
else:
raise TypeError(
f"Invalid type for channels parameter: {type(channel).__name__}"
f"Invalid type for parameter 'channel': {type(channel).__name__}"
)

self._app_name = app_name.encode() if app_name is not None else b""
Expand All @@ -174,136 +176,71 @@ def __init__(
", ".join(f"CAN {ch + 1}" for ch in self.channels),
)

if serial is not None:
app_name = None
channel_index = []
channel_configs = get_channel_configs()
for channel_config in channel_configs:
if channel_config.serialNumber == serial:
if channel_config.hwChannel in self.channels:
channel_index.append(channel_config.channelIndex)
if channel_index:
if len(channel_index) != len(self.channels):
LOG.info(
"At least one defined channel wasn't found on the specified hardware."
)
self.channels = channel_index
else:
# Is there any better way to raise the error?
raise CanInitializationError(
"None of the configured channels could be found on the specified hardware."
)
channel_configs = get_channel_configs()

self.xldriver.xlOpenDriver()
self.port_handle = xlclass.XLportHandle(xldefine.XL_INVALID_PORTHANDLE)
self.mask = 0
self.fd = fd
# Get channels masks
self.channel_masks: Dict[Optional[Channel], int] = {}
self.index_to_channel = {}
self.channel_masks: Dict[int, int] = {}
self.index_to_channel: Dict[int, int] = {}

for channel in self.channels:
if app_name:
# Get global channel index from application channel
hw_type, hw_index, hw_channel = self.get_application_config(
app_name, channel
)
LOG.debug("Channel index %d found", channel)
idx = self.xldriver.xlGetChannelIndex(hw_type, hw_index, hw_channel)
if idx < 0:
# Undocumented behavior! See issue #353.
# If hardware is unavailable, this function returns -1.
# Raise an exception as if the driver
# would have signalled XL_ERR_HW_NOT_PRESENT.
raise VectorInitializationError(
xldefine.XL_Status.XL_ERR_HW_NOT_PRESENT,
xldefine.XL_Status.XL_ERR_HW_NOT_PRESENT.name,
"xlGetChannelIndex",
)
else:
# Channel already given as global channel
idx = channel
mask = 1 << idx
self.channel_masks[channel] = mask
self.index_to_channel[idx] = channel
self.mask |= mask
channel_index = self._find_global_channel_idx(
channel=channel,
serial=serial,
app_name=app_name,
channel_configs=channel_configs,
)
LOG.debug("Channel index %d found", channel)

channel_mask = 1 << channel_index
self.channel_masks[channel] = channel_mask
self.index_to_channel[channel_index] = channel
self.mask |= channel_mask

permission_mask = xlclass.XLaccess()
# Set mask to request channel init permission if needed
if bitrate or fd:
permission_mask.value = self.mask
if fd:
self.xldriver.xlOpenPort(
self.port_handle,
self._app_name,
self.mask,
permission_mask,
rx_queue_size,
xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION_V4,
xldefine.XL_BusTypes.XL_BUS_TYPE_CAN,
)
else:
self.xldriver.xlOpenPort(
self.port_handle,
self._app_name,
self.mask,
permission_mask,
rx_queue_size,
xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION,
xldefine.XL_BusTypes.XL_BUS_TYPE_CAN,
)

interface_version = (
xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION_V4
if fd
else xldefine.XL_InterfaceVersion.XL_INTERFACE_VERSION
)

self.port_handle = xlclass.XLportHandle(xldefine.XL_INVALID_PORTHANDLE)
self.xldriver.xlOpenPort(
self.port_handle,
self._app_name,
self.mask,
permission_mask,
rx_queue_size,
interface_version,
xldefine.XL_BusTypes.XL_BUS_TYPE_CAN,
)

LOG.debug(
"Open Port: PortHandle: %d, PermissionMask: 0x%X",
self.port_handle.value,
permission_mask.value,
)

if permission_mask.value == self.mask:
if fd:
self.canFdConf = xlclass.XLcanFdConf()
if bitrate:
self.canFdConf.arbitrationBitRate = int(bitrate)
else:
self.canFdConf.arbitrationBitRate = 500000
self.canFdConf.sjwAbr = int(sjw_abr)
self.canFdConf.tseg1Abr = int(tseg1_abr)
self.canFdConf.tseg2Abr = int(tseg2_abr)
if data_bitrate:
self.canFdConf.dataBitRate = int(data_bitrate)
else:
self.canFdConf.dataBitRate = self.canFdConf.arbitrationBitRate
self.canFdConf.sjwDbr = int(sjw_dbr)
self.canFdConf.tseg1Dbr = int(tseg1_dbr)
self.canFdConf.tseg2Dbr = int(tseg2_dbr)

self.xldriver.xlCanFdSetConfiguration(
self.port_handle, self.mask, self.canFdConf
)
LOG.info(
"SetFdConfig.: ABaudr.=%u, DBaudr.=%u",
self.canFdConf.arbitrationBitRate,
self.canFdConf.dataBitRate,
)
LOG.info(
"SetFdConfig.: sjwAbr=%u, tseg1Abr=%u, tseg2Abr=%u",
self.canFdConf.sjwAbr,
self.canFdConf.tseg1Abr,
self.canFdConf.tseg2Abr,
)
LOG.info(
"SetFdConfig.: sjwDbr=%u, tseg1Dbr=%u, tseg2Dbr=%u",
self.canFdConf.sjwDbr,
self.canFdConf.tseg1Dbr,
self.canFdConf.tseg2Dbr,
)
else:
if bitrate:
self.xldriver.xlCanSetChannelBitrate(
self.port_handle, permission_mask, bitrate
for channel in self.channels:
if permission_mask.value & self.channel_masks[channel]:
if fd:
self._set_bitrate_canfd(
channel=channel,
bitrate=bitrate,
data_bitrate=data_bitrate,
sjw_abr=sjw_abr,
tseg1_abr=tseg1_abr,
tseg2_abr=tseg2_abr,
sjw_dbr=sjw_dbr,
tseg1_dbr=tseg1_dbr,
tseg2_dbr=tseg2_dbr,
)
LOG.info("SetChannelBitrate: baudr.=%u", bitrate)
else:
LOG.info("No init access!")
elif bitrate:
self._set_bitrate_can(channel=channel, bitrate=bitrate)

# Enable/disable TX receipts
tx_receipts = 1 if receive_own_messages else 0
Expand Down Expand Up @@ -348,6 +285,153 @@ def __init__(
self._is_filtered = False
super().__init__(channel=channel, can_filters=can_filters, **kwargs)

def _find_global_channel_idx(
self,
channel: int,
serial: Optional[int],
app_name: Optional[str],
channel_configs: List["VectorChannelConfig"],
) -> int:
if serial is not None:
hw_type: Optional[xldefine.XL_HardwareType] = None
for channel_config in channel_configs:
if channel_config.serialNumber != serial:
continue

hw_type = xldefine.XL_HardwareType(channel_config.hwType)
if channel_config.hwChannel == channel:
return channel_config.channelIndex

if hw_type is None:
err_msg = f"No interface with serial {serial} found."
else:
err_msg = f"Channel {channel} not found on interface {hw_type.name} ({serial})."
raise CanInitializationError(
err_msg, error_code=xldefine.XL_Status.XL_ERR_HW_NOT_PRESENT
)

if app_name:
hw_type, hw_index, hw_channel = self.get_application_config(
app_name, channel
)
idx = cast(
int, self.xldriver.xlGetChannelIndex(hw_type, hw_index, hw_channel)
)
if idx < 0:
# Undocumented behavior! See issue #353.
# If hardware is unavailable, this function returns -1.
# Raise an exception as if the driver
# would have signalled XL_ERR_HW_NOT_PRESENT.
raise VectorInitializationError(
xldefine.XL_Status.XL_ERR_HW_NOT_PRESENT,
xldefine.XL_Status.XL_ERR_HW_NOT_PRESENT.name,
"xlGetChannelIndex",
)
return idx

# check if channel is a valid global channel index
for channel_config in channel_configs:
if channel == channel_config.channelIndex:
return channel

raise CanInitializationError(
f"Channel {channel} not found. The 'channel' parameter must be "
f"a valid global channel index if neither 'app_name' nor 'serial' were given.",
error_code=xldefine.XL_Status.XL_ERR_HW_NOT_PRESENT,
)

def _set_bitrate_can(
self,
channel: int,
bitrate: int,
sjw: Optional[int] = None,
tseg1: Optional[int] = None,
tseg2: Optional[int] = None,
sam: int = 1,
) -> None:
kwargs = [sjw, tseg1, tseg2]
if any(kwargs) and not all(kwargs):
raise ValueError(
f"Either all of sjw, tseg1, tseg2 must be set or none of them."
)

# set parameters if channel has init access
if any(kwargs):
chip_params = xlclass.XLchipParams()
chip_params.bitRate = bitrate
chip_params.sjw = sjw
chip_params.tseg1 = tseg1
chip_params.tseg2 = tseg2
chip_params.sam = sam
self.xldriver.xlCanSetChannelParams(
self.port_handle,
self.channel_masks[channel],
chip_params,
)
LOG.info(
"xlCanSetChannelParams: baudr.=%u, sjwAbr=%u, tseg1Abr=%u, tseg2Abr=%u",
chip_params.bitRate,
chip_params.sjw,
chip_params.tseg1,
chip_params.tseg2,
)
else:
self.xldriver.xlCanSetChannelBitrate(
self.port_handle,
self.channel_masks[channel],
bitrate,
)
LOG.info("xlCanSetChannelBitrate: baudr.=%u", bitrate)

def _set_bitrate_canfd(
self,
channel: int,
bitrate: Optional[int] = None,
data_bitrate: Optional[int] = None,
sjw_abr: int = 2,
tseg1_abr: int = 6,
tseg2_abr: int = 3,
sjw_dbr: int = 2,
tseg1_dbr: int = 6,
tseg2_dbr: int = 3,
) -> None:
# set parameters if channel has init access
canfd_conf = xlclass.XLcanFdConf()
if bitrate:
canfd_conf.arbitrationBitRate = int(bitrate)
else:
canfd_conf.arbitrationBitRate = 500_000
canfd_conf.sjwAbr = int(sjw_abr)
canfd_conf.tseg1Abr = int(tseg1_abr)
canfd_conf.tseg2Abr = int(tseg2_abr)
if data_bitrate:
canfd_conf.dataBitRate = int(data_bitrate)
else:
canfd_conf.dataBitRate = int(canfd_conf.arbitrationBitRate)
canfd_conf.sjwDbr = int(sjw_dbr)
canfd_conf.tseg1Dbr = int(tseg1_dbr)
canfd_conf.tseg2Dbr = int(tseg2_dbr)
self.xldriver.xlCanFdSetConfiguration(
self.port_handle, self.channel_masks[channel], canfd_conf
)
LOG.info(
"xlCanFdSetConfiguration.: ABaudr.=%u, DBaudr.=%u",
canfd_conf.arbitrationBitRate,
canfd_conf.dataBitRate,
)
LOG.info(
"xlCanFdSetConfiguration.: sjwAbr=%u, tseg1Abr=%u, tseg2Abr=%u",
canfd_conf.sjwAbr,
canfd_conf.tseg1Abr,
canfd_conf.tseg2Abr,
)
LOG.info(
"xlCanFdSetConfiguration.: sjwDbr=%u, tseg1Dbr=%u, tseg2Dbr=%u",
canfd_conf.sjwDbr,
canfd_conf.tseg1Dbr,
canfd_conf.tseg2Dbr,
)

def _apply_filters(self, filters: Optional[CanFilters]) -> None:
if filters:
# Only up to one filter per ID type allowed
Expand Down Expand Up @@ -544,7 +628,7 @@ def _send_sequence(self, msgs: Sequence[Message]) -> int:

def _get_tx_channel_mask(self, msgs: Sequence[Message]) -> int:
if len(msgs) == 1:
return self.channel_masks.get(msgs[0].channel, self.mask)
return self.channel_masks.get(msgs[0].channel, self.mask) # type: ignore[arg-type]
else:
return self.mask

Expand Down