Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# Changelog
All notable changes to this project will be documented in this file.

## [3.2.1]

### Features:
- ``CanTp``: fix a case in ``decode_isotp`` that could lead to an infinite loop

## [3.2.0]

### Features:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
tests_require=["pytest", "pytest-mock"],
extras_require={"test": ["pytest", "pytest-mock"]},
# *strongly* suggested for sharing
version="3.2.0",
version="3.2.1",
# The license can be anything you like
license="MIT",
description="Please use python-uds instead, this is a refactored version with breaking changes, only for pykiso",
Expand Down
75 changes: 29 additions & 46 deletions uds/uds_communications/TransportProtocols/Can/CanTp.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def __init__(self, connector=None, is_fd: bool = True, **kwargs):
self._recv_buffer = queue.Queue()
self._discard_negative_responses = Config.isotp.discard_neg_resp

# default STmin for flow control when receiving consecutive frames
# default STmin for flow control when receiving consecutive frames
self.st_min = 0.030

# sets up the relevant parameters in the instance
Expand All @@ -98,7 +98,7 @@ def __init__(self, connector=None, is_fd: bool = True, **kwargs):

self._max_frame_length = 64 if is_fd else 8
# 7 bytes PDU for normal addressing, 6 for extended and mixed
self._max_pdu_length = (self._max_frame_length - self._pdu_start_index - MINIMUM_HEADER_SIZE)
self._max_pdu_length = self._max_frame_length - self._pdu_start_index - MINIMUM_HEADER_SIZE
# maximum payload length of a 'classic' single frame with a message data length of 7 bytes at most (defined by ISO)
self._single_frame_max_length_for_short_header = 0b111 - self._pdu_start_index

Expand All @@ -109,7 +109,7 @@ def is_fd(self) -> bool:
@is_fd.setter
def is_fd(self, value: bool):
self._max_frame_length = 64 if value is True else 8
self._max_pdu_length = (self._max_frame_length - self._pdu_start_index - MINIMUM_HEADER_SIZE)
self._max_pdu_length = self._max_frame_length - self._pdu_start_index - MINIMUM_HEADER_SIZE

@property
def reqIdAddress(self):
Expand Down Expand Up @@ -142,53 +142,44 @@ def connection(self, value):
def send(self, payload, functionalReq=False, tpWaitTime=0.01) -> None:
result = self.encode_isotp(payload, functionalReq)
return result

def make_single_frame(self, payload: List[int]) -> List[int]:
single_frame = [self.PADDING_PATTERN] * 8
single_frame = [self.PADDING_PATTERN] * 8
if not self.is_fd or len(payload) <= self._single_frame_max_length_for_short_header:
# if we are not using CAN FD or the payload can be packed within 8 bytes, create a short frame
# the MDL is then indicated on the low nibble of the 1st byte
single_frame = [
(CanTpMessageType.SINGLE_FRAME << 4) + len(payload),
# pad the frame to send to have a total length of 8 bytes
*fillArray(
payload,
length=self._single_frame_max_length_for_short_header,
fillValue=self.PADDING_PATTERN
)
payload, length=self._single_frame_max_length_for_short_header, fillValue=self.PADDING_PATTERN
),
]
else:
# otherwise the MDL is indicated in the entire 2nd byte
single_frame = [CanTpMessageType.SINGLE_FRAME, len(payload), *payload]
single_frame = self.add_padding(single_frame)
return single_frame

def make_first_frame(self, payload: List[int]) -> List[int]:
mdl = len(payload)
mdl_high_nibble = (mdl & 0xF00) >> 8
mdl_low_nibble = mdl & 0x0FF
first_frame = [
(CanTpMessageType.FIRST_FRAME << 4) + mdl_high_nibble,
mdl_low_nibble,
*payload[: self._max_pdu_length - 1]
*payload[: self._max_pdu_length - 1],
]
first_frame = self.add_padding(first_frame)
return first_frame

def make_consecutive_frame(self, payload: List[int], sequence_number: int = 1) -> List[int]:
consecutive_frame = [
(CanTpMessageType.CONSECUTIVE_FRAME << 4) + sequence_number,
*payload
]
consecutive_frame = [(CanTpMessageType.CONSECUTIVE_FRAME << 4) + sequence_number, *payload]
consecutive_frame = self.add_padding(consecutive_frame)
return consecutive_frame

def make_flow_control_frame(self, blocksize: int = 0, st_min: float = 0) -> List[int]:
flow_control_frame = [
(CanTpMessageType.FLOW_CONTROL << 4),
blocksize,
self.encode_stMin(st_min)
]
flow_control_frame = [(CanTpMessageType.FLOW_CONTROL << 4), blocksize, self.encode_stMin(st_min)]
flow_control_frame = self.add_padding(flow_control_frame)
return flow_control_frame

Expand Down Expand Up @@ -244,9 +235,7 @@ def encode_isotp(
fs = rxPdu[0] & 0x0F
if fs == CanTpFsTypes.CONTINUE_TO_SEND:
if state != CanTpState.WAIT_FLOW_CONTROL:
raise ValueError(
"Received unexpected Flow Control Continue to Send request"
)
raise ValueError("Received unexpected Flow Control Continue to Send request")

block_size = rxPdu[FC_BS_INDEX]
if block_size == 0:
Expand All @@ -265,7 +254,9 @@ def encode_isotp(
else:
raise ValueError(f"Unexpected fs response from ECU. {rxPdu}")
else:
logger.warning(f"Unexpected response from ECU while waiting for flow control: 0x{bytes(rxPdu).hex()}")
logger.warning(
f"Unexpected response from ECU while waiting for flow control: 0x{bytes(rxPdu).hex()}"
)

if state == CanTpState.SEND_SINGLE_FRAME:
txPdu = self.make_single_frame(payload)
Expand Down Expand Up @@ -344,28 +335,24 @@ def decode_isotp(
if state == CanTpState.IDLE:
if N_PCI == CanTpMessageType.SINGLE_FRAME:
payloadLength = rxPdu[N_PCI_INDEX & 0x0F]
payload = rxPdu[
SINGLE_FRAME_DATA_START_INDEX : SINGLE_FRAME_DATA_START_INDEX
+ payloadLength
]
payload = rxPdu[SINGLE_FRAME_DATA_START_INDEX : SINGLE_FRAME_DATA_START_INDEX + payloadLength]
endOfMessage_flag = True
elif N_PCI == CanTpMessageType.FIRST_FRAME:
payload = rxPdu[FIRST_FRAME_DATA_START_INDEX:]
payloadLength = (
(rxPdu[FIRST_FRAME_DL_INDEX_HIGH] & 0x0F) << 8
) + rxPdu[FIRST_FRAME_DL_INDEX_LOW]
payloadLength = ((rxPdu[FIRST_FRAME_DL_INDEX_HIGH] & 0x0F) << 8) + rxPdu[FIRST_FRAME_DL_INDEX_LOW]
payloadPtr = self._max_pdu_length - 1
state = CanTpState.SEND_FLOW_CONTROL
elif N_PCI == CanTpMessageType.CONSECUTIVE_FRAME:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the two lines that have been added.

# Consecutive frames are not expected in idle state else we are in an infinite loop
raise RuntimeError("Consecutive frames are not supported in idle state")
elif state == CanTpState.RECEIVING_CONSECUTIVE_FRAME:
if N_PCI == CanTpMessageType.CONSECUTIVE_FRAME:
sequenceNumber = (
rxPdu[CONSECUTIVE_FRAME_SEQUENCE_NUMBER_INDEX] & 0x0F
)
sequenceNumber = rxPdu[CONSECUTIVE_FRAME_SEQUENCE_NUMBER_INDEX] & 0x0F
if sequenceNumber != sequenceNumberExpected:
raise ValueError(
f"Consecutive frame sequence out of order, expected {sequenceNumberExpected} got {sequenceNumber}"
)

sequenceNumberExpected = (sequenceNumberExpected + 1) % 16
payload += rxPdu[CONSECUTIVE_FRAME_SEQUENCE_DATA_START_INDEX:]
payloadPtr += self._max_pdu_length
Expand Down Expand Up @@ -425,7 +412,7 @@ def decode_stMin(val: int) -> float:
raise ValueError(
f"Invalid STMin value {hex(val)}, should be between 0x00 and 0x7F or between 0xF1 and 0xF9"
)

@staticmethod
def encode_stMin(val: float) -> int:
if (0x01 * 1e-3) <= val <= (0x7F * 1e-3):
Expand All @@ -435,9 +422,7 @@ def encode_stMin(val: float) -> int:
# 100us - 900us -> 0xF1 - 0xF9
return 0xF0 + int(val * 1e4)
else:
raise ValueError(
f"Invalid STMin time {val}, should be between 0.1 and 0.9 ms or between 1 and 127 ms"
)
raise ValueError(f"Invalid STMin time {val}, should be between 0.1 and 0.9 ms or between 1 and 127 ms")

##
# @brief creates the blocklist from the blocksize and payload
Expand Down Expand Up @@ -474,9 +459,9 @@ def create_blockList(self, payload: List[int], blockSize: int) -> List[List[int]
blockPtr = 0

return blockList

def add_padding(self, payload: List[int]) -> List[int]:
"""Add padding to the payload to be sent over CAN.
"""Add padding to the payload to be sent over CAN.

:param payload: payload to be sent.
:param header_size: size of the ISO TP header of the CAN frame's payload.
Expand All @@ -498,9 +483,7 @@ def add_padding(self, payload: List[int]) -> List[int]:

##
# @brief transmits the data over can using can connection
def transmit(
self, data, functionalReq=False, use_external_snd_rcv_functions: bool = False
):
def transmit(self, data, functionalReq=False, use_external_snd_rcv_functions: bool = False):
# check functional request
if functionalReq:
raise Exception("Functional requests are currently not supported")
Expand Down