Skip to content

Add an error callback to ThreadBasedCyclicSendTask #781

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions can/broadcastmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
:meth:`can.BusABC.send_periodic`.
"""

from typing import Optional, Sequence, Tuple, Union, TYPE_CHECKING
from typing import Optional, Sequence, Tuple, Union, Callable, TYPE_CHECKING

from can import typechecking

Expand Down Expand Up @@ -198,7 +198,7 @@ def __init__(
class ThreadBasedCyclicSendTask(
ModifiableCyclicTaskABC, LimitedDurationCyclicSendTaskABC, RestartableCyclicTaskABC
):
"""Fallback cyclic send task using thread."""
"""Fallback cyclic send task using daemon thread."""

def __init__(
self,
Expand All @@ -207,13 +207,28 @@ def __init__(
messages: Union[Sequence[Message], Message],
period: float,
duration: Optional[float] = None,
on_error: Optional[Callable[[Exception], bool]] = None,
):
"""Transmits `messages` with a `period` seconds for `duration` seconds on a `bus`.

The `on_error` is called if any error happens on `bus` while sending `messages`.
If `on_error` present, and returns ``False`` when invoked, thread is
stopped immediately, otherwise, thread continuiously tries to send `messages`
ignoring errors on a `bus`. Absence of `on_error` means that thread exits immediately
on error.

:param on_error: The callable that accepts an exception if any
error happened on a `bus` while sending `messages`,
it shall return either ``True`` or ``False`` depending
on desired behaviour of `ThreadBasedCyclicSendTask`.
"""
super().__init__(messages, period, duration)
self.bus = bus
self.send_lock = lock
self.stopped = True
self.thread = None
self.end_time = time.perf_counter() + duration if duration else None
self.on_error = on_error

if HAS_EVENTS:
self.period_ms: int = int(round(period * 1000, 0))
Expand Down Expand Up @@ -250,7 +265,11 @@ def _run(self):
self.bus.send(self.messages[msg_index])
except Exception as exc:
log.exception(exc)
break
if self.on_error:
if not self.on_error(exc):
break
else:
break
if self.end_time is not None and time.perf_counter() >= self.end_time:
break
msg_index = (msg_index + 1) % len(self.messages)
Expand Down
38 changes: 38 additions & 0 deletions test/simplecyclic_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from time import sleep
import unittest
from unittest.mock import MagicMock
import gc

import can
Expand Down Expand Up @@ -151,6 +152,43 @@ def test_stopping_perodic_tasks(self):

bus.shutdown()

def test_thread_based_cyclic_send_task(self):
bus = can.ThreadSafeBus(bustype="virtual")
msg = can.Message(
is_extended_id=False, arbitration_id=0x123, data=[0, 1, 2, 3, 4, 5, 6, 7]
)

# good case, bus is up
on_error_mock = MagicMock(return_value=False)
task = can.broadcastmanager.ThreadBasedCyclicSendTask(
bus, bus._lock_send_periodic, msg, 0.1, 3, on_error_mock
)
task.start()
sleep(1)
on_error_mock.assert_not_called()
task.stop()
bus.shutdown()

# bus has been shutted down
on_error_mock.reset_mock()
task = can.broadcastmanager.ThreadBasedCyclicSendTask(
bus, bus._lock_send_periodic, msg, 0.1, 3, on_error_mock
)
task.start()
sleep(1)
self.assertTrue(on_error_mock.call_count is 1)
task.stop()

# bus is still shutted down, but on_error returns True
on_error_mock = MagicMock(return_value=True)
task = can.broadcastmanager.ThreadBasedCyclicSendTask(
bus, bus._lock_send_periodic, msg, 0.1, 3, on_error_mock
)
task.start()
sleep(1)
self.assertTrue(on_error_mock.call_count > 1)
task.stop()


if __name__ == "__main__":
unittest.main()