Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 64 additions & 2 deletions can/interfaces/vector/canlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def __init__(

:raise ~can.exceptions.CanInterfaceNotImplementedError:
If the current operating system is not supported or the driver could not be loaded.
:raise can.exceptions.CanInitializationError:
:raise ~can.exceptions.CanInitializationError:
If the bus could not be set up.
This may or may not be a :class:`~can.interfaces.vector.VectorInitializationError`.
"""
Expand Down Expand Up @@ -218,15 +218,17 @@ def __init__(
interface_version,
xldefine.XL_BusTypes.XL_BUS_TYPE_CAN,
)
self.permission_mask = permission_mask.value

LOG.debug(
"Open Port: PortHandle: %d, PermissionMask: 0x%X",
self.port_handle.value,
permission_mask.value,
)

# set CAN settings
for channel in self.channels:
if permission_mask.value & self.channel_masks[channel]:
if self._has_init_access(channel):
if fd:
self._set_bitrate_canfd(
channel=channel,
Expand All @@ -242,6 +244,51 @@ def __init__(
elif bitrate:
self._set_bitrate_can(channel=channel, bitrate=bitrate)

# Check CAN settings
for channel in self.channels:
if kwargs.get("_testing", False):
# avoid check if xldriver is mocked for testing
break

bus_params = self._read_bus_params(channel)
if fd:
_canfd = bus_params.canfd
if not all(
[
bus_params.bus_type is xldefine.XL_BusTypes.XL_BUS_TYPE_CAN,
_canfd.can_op_mode
& xldefine.XL_CANFD_BusParams_CanOpMode.XL_BUS_PARAMS_CANOPMODE_CANFD,
_canfd.bitrate == bitrate if bitrate else True,
_canfd.sjw_abr == sjw_abr if bitrate else True,
_canfd.tseg1_abr == tseg1_abr if bitrate else True,
_canfd.tseg2_abr == tseg2_abr if bitrate else True,
_canfd.data_bitrate == data_bitrate if data_bitrate else True,
_canfd.sjw_dbr == sjw_dbr if data_bitrate else True,
_canfd.tseg1_dbr == tseg1_dbr if data_bitrate else True,
_canfd.tseg2_dbr == tseg2_dbr if data_bitrate else True,
]
):
raise CanInitializationError(
f"The requested CAN FD settings could not be set for channel {channel}. "
f"Another application might have set incompatible settings. "
f"These are the currently active settings: {_canfd._asdict()}"
)
else:
_can = bus_params.can
if not all(
[
bus_params.bus_type is xldefine.XL_BusTypes.XL_BUS_TYPE_CAN,
_can.can_op_mode
& xldefine.XL_CANFD_BusParams_CanOpMode.XL_BUS_PARAMS_CANOPMODE_CAN20,
_can.bitrate == bitrate if bitrate else True,
]
):
raise CanInitializationError(
f"The requested CAN settings could not be set for channel {channel}. "
f"Another application might have set incompatible settings. "
f"These are the currently active settings: {_can._asdict()}"
)

# Enable/disable TX receipts
tx_receipts = 1 if receive_own_messages else 0
self.xldriver.xlCanSetChannelMode(self.port_handle, self.mask, tx_receipts, 0)
Expand Down Expand Up @@ -340,6 +387,21 @@ def _find_global_channel_idx(
error_code=xldefine.XL_Status.XL_ERR_HW_NOT_PRESENT,
)

def _has_init_access(self, channel: int) -> bool:
return bool(self.permission_mask & self.channel_masks[channel])

def _read_bus_params(self, channel: int) -> "VectorBusParams":
channel_mask = self.channel_masks[channel]

vcc_list = get_channel_configs()
for vcc in vcc_list:
if vcc.channel_mask == channel_mask:
return vcc.bus_params

raise CanInitializationError(
f"Channel configuration for channel {channel} not found."
)

def _set_bitrate_can(
self,
channel: int,
Expand Down
7 changes: 5 additions & 2 deletions test/test_vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def mock_xldriver() -> None:
# backup unmodified values
real_xldriver = canlib.xldriver
real_waitforsingleobject = canlib.WaitForSingleObject
real_has_events = canlib.HAS_EVENTS

# set mock
canlib.xldriver = xldriver_mock
Expand All @@ -72,6 +73,7 @@ def mock_xldriver() -> None:
# cleanup
canlib.xldriver = real_xldriver
canlib.WaitForSingleObject = real_waitforsingleobject
canlib.HAS_EVENTS = real_has_events


def test_bus_creation_mocked(mock_xldriver) -> None:
Expand Down Expand Up @@ -870,13 +872,14 @@ def xlGetChannelIndex(
def xlOpenPort(
port_handle_p: ctypes.POINTER(xlclass.XLportHandle),
app_name_p: ctypes.c_char_p,
access_mask: xlclass.XLaccess,
permission_mask_p: ctypes.POINTER(xlclass.XLaccess),
access_mask: int,
permission_mask: xlclass.XLaccess,
rx_queue_size: ctypes.c_uint,
xl_interface_version: ctypes.c_uint,
bus_type: ctypes.c_uint,
) -> int:
port_handle_p.value = 0
permission_mask.value = access_mask
return 0


Expand Down