|
11 | 11 | import logging
|
12 | 12 | import os
|
13 | 13 | import tempfile
|
14 |
| -from collections import deque |
| 14 | +from collections import deque, defaultdict |
| 15 | +from itertools import cycle |
| 16 | +from threading import Event |
15 | 17 |
|
16 | 18 | from can import Message, CanError, BusABC
|
17 | 19 |
|
@@ -55,6 +57,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
|
55 | 57 | # Use inter-process mutex to prevent concurrent device open.
|
56 | 58 | # When neoVI server is enabled, there is an issue with concurrent device open.
|
57 | 59 | open_lock = FileLock(os.path.join(tempfile.gettempdir(), "neovi.lock"))
|
| 60 | +description_id = cycle(range(1, 0x8000)) |
58 | 61 |
|
59 | 62 |
|
60 | 63 | class ICSApiError(CanError):
|
@@ -176,6 +179,7 @@ def __init__(self, channel, can_filters=None, **kwargs):
|
176 | 179 | logger.info("Using device: {}".format(self.channel_info))
|
177 | 180 |
|
178 | 181 | self.rx_buffer = deque()
|
| 182 | + self.message_receipts = defaultdict(Event) |
179 | 183 |
|
180 | 184 | @staticmethod
|
181 | 185 | def channel_to_netid(channel_name_or_id):
|
@@ -267,6 +271,9 @@ def _process_msg_queue(self, timeout=0.1):
|
267 | 271 | if is_tx:
|
268 | 272 | if bool(ics_msg.StatusBitField & ics.SPY_STATUS_GLOBAL_ERR):
|
269 | 273 | continue
|
| 274 | + if ics_msg.DescriptionID: |
| 275 | + receipt_key = (ics_msg.ArbIDOrHeader, ics_msg.DescriptionID) |
| 276 | + self.message_receipts[receipt_key].set() |
270 | 277 | if not self._receive_own_messages:
|
271 | 278 | continue
|
272 | 279 |
|
@@ -349,7 +356,19 @@ def _recv_internal(self, timeout=0.1):
|
349 | 356 | return None, False
|
350 | 357 | return msg, False
|
351 | 358 |
|
352 |
| - def send(self, msg, timeout=None): |
| 359 | + def send(self, msg, timeout=0): |
| 360 | + """Transmit a message to the CAN bus. |
| 361 | +
|
| 362 | + :param Message msg: A message object. |
| 363 | +
|
| 364 | + :param float timeout: |
| 365 | + If > 0, wait up to this many seconds for message to be ACK'ed. |
| 366 | + If timeout is exceeded, an exception will be raised. |
| 367 | + None blocks indefinitely. |
| 368 | +
|
| 369 | + :raises can.CanError: |
| 370 | + if the message could not be sent |
| 371 | + """ |
353 | 372 | if not ics.validate_hobject(self.dev):
|
354 | 373 | raise CanError("bus not open")
|
355 | 374 | message = ics.SpyMessage()
|
@@ -385,7 +404,20 @@ def send(self, msg, timeout=None):
|
385 | 404 | else:
|
386 | 405 | raise ValueError("msg.channel must be set when using multiple channels.")
|
387 | 406 |
|
| 407 | + msg_desc_id = next(description_id) |
| 408 | + message.DescriptionID = msg_desc_id |
| 409 | + receipt_key = (msg.arbitration_id, msg_desc_id) |
| 410 | + |
| 411 | + if timeout != 0: |
| 412 | + self.message_receipts[receipt_key].clear() |
| 413 | + |
388 | 414 | try:
|
389 | 415 | ics.transmit_messages(self.dev, message)
|
390 | 416 | except ics.RuntimeError:
|
391 | 417 | raise ICSApiError(*ics.get_last_api_error(self.dev))
|
| 418 | + |
| 419 | + # If timeout is set, wait for ACK |
| 420 | + # This requires a notifier for the bus or |
| 421 | + # some other thread calling recv periodically |
| 422 | + if timeout != 0 and not self.message_receipts[receipt_key].wait(timeout): |
| 423 | + raise CanError("Transmit timeout") |
0 commit comments