Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/source/udsoncan/client.rst
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,8 @@ Methods by services
#################################################

.. automethod:: udsoncan.client.Client.read_data_by_identifier
.. automethod:: udsoncan.client.Client.read_data_by_identifier_first
.. automethod:: udsoncan.client.Client.test_data_identifier

-------------

Expand Down
54 changes: 54 additions & 0 deletions test/client/test_read_data_by_identifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@ def _test_rdbi_single_success(self):
values = response.service_data.values
self.assertEqual(values[1], (0x1234,))

def test_peek_rdbi_single_success(self):
request = self.conn.touserqueue.get(timeout=0.2)
self.assertEqual(request, b"\x22\x11\x22")
self.conn.fromuserqueue.put(b"\x62\x11\x22\x12\x34") # Positive response

def _test_peek_rdbi_single_success(self):
response = self.udsclient.test_data_identifier(0x1122) # not in config, but this is OK
self.assertTrue(response.positive)
self.assertIsNone(response.service_data)
self.assertEqual(response.data, b"\x11\x22\x12\x34")

def test_rdbi_single_success_default_did(self):
request = self.conn.touserqueue.get(timeout=0.2)
self.assertEqual(request, b"\x22\x00\x01")
Expand Down Expand Up @@ -106,6 +117,16 @@ def _test_rdbi_multiple_success(self):
self.assertEqual(values[3], 0x10)
self.assertEqual(values[4], 'abcde')

def test_peek_rdbi_multiple_success(self):
request = self.conn.touserqueue.get(timeout=0.2)
self.assertEqual(request, b"\x22\x11\x22\x33\x44")
self.conn.fromuserqueue.put(b"\x62\xaa") # Positive response, content can be invalid. we don't check with the peek method

def _test_peek_rdbi_multiple_success(self):
response = self.udsclient.test_data_identifier([0x1122, 0x3344])
self.assertTrue(response.positive)
self.assertIsNone(response.service_data)

def test_rdbi_multiple_zero_padding1_success(self):
data = b'\x62\x00\x01\x12\x34\x00\x02\x56\x78\x00\x03\x11'
for i in range(8):
Expand Down Expand Up @@ -264,6 +285,39 @@ def _test_rdbi_wrongservice_no_exception(self):
self.assertTrue(response.valid)
self.assertTrue(response.unexpected)

def test_peek_rdbi_wrongservice_exception(self):
self.wait_request_and_respond(b"\x50\x00\x01\x12\x34") # Valid service, but not the one requested

def _test_peek_rdbi_wrongservice_exception(self):
with self.assertRaises(UnexpectedResponseException) as handle:
self.udsclient.test_data_identifier(0x1122)

def test_peek_rdbi_wrongservice_no_exception(self):
self.wait_request_and_respond(b"\x50\x00\x01\x12\x34") # Valid service, but not the one requested

def _test_peek_rdbi_wrongservice_no_exception(self):
self.udsclient.config['exception_on_unexpected_response'] = False
response = self.udsclient.test_data_identifier(0x1122)
self.assertTrue(response.valid)
self.assertTrue(response.unexpected)

def test_peek_rdbi_negative_exception(self):
self.wait_request_and_respond(b"\x7F\x22\x10") # general reject

def _test_peek_rdbi_negative_exception(self):
with self.assertRaises(NegativeResponseException) as handle:
self.udsclient.test_data_identifier(0x1122)

def test_peek_rdbi_negative_no_exception(self):
self.wait_request_and_respond(b"\x7F\x22\x10") # general reject

def _test_peek_rdbi_negative_no_exception(self):
self.udsclient.config['exception_on_negative_response'] = False
response = self.udsclient.test_data_identifier(0x1122)
self.assertTrue(response.valid)
self.assertFalse(response.unexpected)
self.assertFalse(response.positive)

def test_no_config(self):
pass

Expand Down
2 changes: 1 addition & 1 deletion test/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def __init__(self, *args, **kwargs):
self.__class__._next_id += 2

def make_bus(self):
return can.interface.Bus(bustype='socketcan', channel='vcan0', bitrate=500000, receive_own_messages=True)
return can.Bus(interface='socketcan', channel='vcan0', bitrate=500000, receive_own_messages=True)

def setUp(self):
self.vcan0_bus = self.make_bus()
Expand Down
21 changes: 21 additions & 0 deletions udsoncan/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,27 @@ def read_data_by_identifier_first(self, didlist: Union[int, List[int]]) -> Optio
return values[didlist[0]]
return None

@standard_error_management
def test_data_identifier(self, didlist: Union[int, List[int]]) -> Optional[Response]:
"""
Sends a request for the ReadDataByIdentifier and returns blindly the received response without parsing.
The requested DIDs do not have to be inside the client list of supported DID.
This method can be useful for testing if a DID exists on an ECU

:Effective configuration: ``exception_on_<type>_response``

:param didlist: The DIDs to peek
:type didlist: list[int]

:return: The raw server response. The response will not be parsed by any service, causing ``service_data`` to always be ``None``
:rtype: :ref:`Response<Response>`

"""
# Do the validation. No need to read return value as we enforced a single DID already
didlist = services.ReadDataByIdentifier.validate_didlist_input(didlist)
req = services.ReadDataByIdentifier.make_request(didlist=didlist, didconfig=None) # No config
return self.send_request(req)

@standard_error_management
def read_data_by_identifier(self, didlist: Union[int, List[int]]) -> Optional[services.ReadDataByIdentifier.InterpretedResponse]:
"""
Expand Down
49 changes: 25 additions & 24 deletions udsoncan/services/ReadDataByIdentifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from udsoncan.ResponseCode import ResponseCode
import udsoncan.tools as tools

from typing import Dict, Any, Union, List, cast
from typing import Dict, Any, Union, List, cast, Optional


class ReadDataByIdentifier(BaseService):
Expand Down Expand Up @@ -53,14 +53,14 @@ def validate_didlist_input(cls, dids: Union[int, List[int]]) -> List[int]:
return [dids] if not isinstance(dids, list) else dids

@classmethod
def make_request(cls, didlist: Union[int, List[int]], didconfig: DIDConfig) -> Request:
def make_request(cls, didlist: Union[int, List[int]], didconfig: Optional[DIDConfig]) -> Request:
"""
Generates a request for ReadDataByIdentifier

:param didlist: List of data identifier to read.
:type didlist: list[int]

:param didconfig: Definition of DID codecs. Dictionary mapping a DID (int) to a valid :ref:`DidCodec<DidCodec>` class or pack/unpack string
:param didconfig: Optional definition of DID codecs for validation. Dictionary mapping a DID (int) to a valid :ref:`DidCodec<DidCodec>` class or pack/unpack string
:type didconfig: dict[int] = :ref:`DidCodec<DidCodec>`

:raises ValueError: If parameters are out of range, missing or wrong type
Expand All @@ -70,27 +70,28 @@ def make_request(cls, didlist: Union[int, List[int]], didconfig: DIDConfig) -> R
didlist = cls.validate_didlist_input(didlist)

req = Request(cls)
# Return a validated did config. Format may change, entries might be added if default value is set.
didconfig_validated = check_did_config(didlist, didconfig)

did_reading_all_data = None
for did in didlist:
if did not in didconfig_validated: # Already checked in check_did_config. Paranoid check
raise ConfigError(key=did, msg='Actual data identifier configuration contains no definition for data identifier 0x%04x' % did)

# Make sure the config is good before sending the request. This method can raise.
codec = make_did_codec_from_config(didconfig_validated[did])

try:
len(codec) # Validate the length function. May raise
if did_reading_all_data is not None:
raise ValueError('Did 0x%04X is configured to read the rest of the payload (__len__ raisong ReadAllRemainingData), but a subsequent DID is requested (0x%04x)' % (
did_reading_all_data, did))
except DidCodec.ReadAllRemainingData:
if did_reading_all_data is not None:
raise ValueError('It is impossible to read 2 DIDs configured to read the rest of the payload (__len__ raising ReadAllRemainingData). Dids are : 0x%04X and 0x%04X' % (
did_reading_all_data, did))
did_reading_all_data = did
if didconfig is not None:
# Return a validated did config. Format may change, entries might be added if default value is set.
didconfig_validated = check_did_config(didlist, didconfig)

did_reading_all_data = None
for did in didlist:
if did not in didconfig_validated: # Already checked in check_did_config. Paranoid check
raise ConfigError(key=did, msg='Actual data identifier configuration contains no definition for data identifier 0x%04x' % did)

# Make sure the config is good before sending the request. This method can raise.
codec = make_did_codec_from_config(didconfig_validated[did])

try:
len(codec) # Validate the length function. May raise
if did_reading_all_data is not None:
raise ValueError('Did 0x%04X is configured to read the rest of the payload (__len__ raisong ReadAllRemainingData), but a subsequent DID is requested (0x%04x)' % (
did_reading_all_data, did))
except DidCodec.ReadAllRemainingData:
if did_reading_all_data is not None:
raise ValueError('It is impossible to read 2 DIDs configured to read the rest of the payload (__len__ raising ReadAllRemainingData). Dids are : 0x%04X and 0x%04X' % (
did_reading_all_data, did))
did_reading_all_data = did

req.data = struct.pack('>' + 'H' * len(didlist), *didlist) # Encode list of DID

Expand Down