Skip to content

Commit fa9df7b

Browse files
tysonitefelixdivo
andauthored
Add an error callback to ThreadBasedCyclicSendTask (#781)
* Add an error callback to ThreadBasedCyclicSendTask * Fix formatting error reported by black * Add return value for on_error * unit tests * black * review comments Co-authored-by: Felix Divo <felixdivo@users.noreply.github.com>
1 parent 5cada6d commit fa9df7b

File tree

2 files changed

+60
-3
lines changed

2 files changed

+60
-3
lines changed

can/broadcastmanager.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
:meth:`can.BusABC.send_periodic`.
66
"""
77

8-
from typing import Optional, Sequence, Tuple, Union, TYPE_CHECKING
8+
from typing import Optional, Sequence, Tuple, Union, Callable, TYPE_CHECKING
99

1010
from can import typechecking
1111

@@ -198,7 +198,7 @@ def __init__(
198198
class ThreadBasedCyclicSendTask(
199199
ModifiableCyclicTaskABC, LimitedDurationCyclicSendTaskABC, RestartableCyclicTaskABC
200200
):
201-
"""Fallback cyclic send task using thread."""
201+
"""Fallback cyclic send task using daemon thread."""
202202

203203
def __init__(
204204
self,
@@ -207,13 +207,28 @@ def __init__(
207207
messages: Union[Sequence[Message], Message],
208208
period: float,
209209
duration: Optional[float] = None,
210+
on_error: Optional[Callable[[Exception], bool]] = None,
210211
):
212+
"""Transmits `messages` with a `period` seconds for `duration` seconds on a `bus`.
213+
214+
The `on_error` is called if any error happens on `bus` while sending `messages`.
215+
If `on_error` present, and returns ``False`` when invoked, thread is
216+
stopped immediately, otherwise, thread continuiously tries to send `messages`
217+
ignoring errors on a `bus`. Absence of `on_error` means that thread exits immediately
218+
on error.
219+
220+
:param on_error: The callable that accepts an exception if any
221+
error happened on a `bus` while sending `messages`,
222+
it shall return either ``True`` or ``False`` depending
223+
on desired behaviour of `ThreadBasedCyclicSendTask`.
224+
"""
211225
super().__init__(messages, period, duration)
212226
self.bus = bus
213227
self.send_lock = lock
214228
self.stopped = True
215229
self.thread = None
216230
self.end_time = time.perf_counter() + duration if duration else None
231+
self.on_error = on_error
217232

218233
if HAS_EVENTS:
219234
self.period_ms: int = int(round(period * 1000, 0))
@@ -250,7 +265,11 @@ def _run(self):
250265
self.bus.send(self.messages[msg_index])
251266
except Exception as exc:
252267
log.exception(exc)
253-
break
268+
if self.on_error:
269+
if not self.on_error(exc):
270+
break
271+
else:
272+
break
254273
if self.end_time is not None and time.perf_counter() >= self.end_time:
255274
break
256275
msg_index = (msg_index + 1) % len(self.messages)

test/simplecyclic_test.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from time import sleep
99
import unittest
10+
from unittest.mock import MagicMock
1011
import gc
1112

1213
import can
@@ -151,6 +152,43 @@ def test_stopping_perodic_tasks(self):
151152

152153
bus.shutdown()
153154

155+
def test_thread_based_cyclic_send_task(self):
156+
bus = can.ThreadSafeBus(bustype="virtual")
157+
msg = can.Message(
158+
is_extended_id=False, arbitration_id=0x123, data=[0, 1, 2, 3, 4, 5, 6, 7]
159+
)
160+
161+
# good case, bus is up
162+
on_error_mock = MagicMock(return_value=False)
163+
task = can.broadcastmanager.ThreadBasedCyclicSendTask(
164+
bus, bus._lock_send_periodic, msg, 0.1, 3, on_error_mock
165+
)
166+
task.start()
167+
sleep(1)
168+
on_error_mock.assert_not_called()
169+
task.stop()
170+
bus.shutdown()
171+
172+
# bus has been shutted down
173+
on_error_mock.reset_mock()
174+
task = can.broadcastmanager.ThreadBasedCyclicSendTask(
175+
bus, bus._lock_send_periodic, msg, 0.1, 3, on_error_mock
176+
)
177+
task.start()
178+
sleep(1)
179+
self.assertTrue(on_error_mock.call_count is 1)
180+
task.stop()
181+
182+
# bus is still shutted down, but on_error returns True
183+
on_error_mock = MagicMock(return_value=True)
184+
task = can.broadcastmanager.ThreadBasedCyclicSendTask(
185+
bus, bus._lock_send_periodic, msg, 0.1, 3, on_error_mock
186+
)
187+
task.start()
188+
sleep(1)
189+
self.assertTrue(on_error_mock.call_count > 1)
190+
task.stop()
191+
154192

155193
if __name__ == "__main__":
156194
unittest.main()

0 commit comments

Comments
 (0)