Skip to content

Add end count to send periodic in BCM #995

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion can/broadcastmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def __init__(
messages: Union[Sequence[Message], Message],
period: float,
duration: Optional[float],
count: Optional[int],
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
count: Optional[int],
count: Optional[int] = None,

):
"""Message send task with a defined duration and period.

Expand All @@ -113,9 +114,13 @@ def __init__(
:param duration:
Approximate duration in seconds to continue sending messages. If
no duration is provided, the task will continue indefinitely.
:param count:
The number of messages to send before stopping. If
no count is provided, the task will continue indefinitely.
"""
super().__init__(messages, period)
self.duration = duration
self.count = count


class RestartableCyclicTaskABC(CyclicSendTaskABC):
Expand Down Expand Up @@ -207,6 +212,7 @@ def __init__(
messages: Union[Sequence[Message], Message],
period: float,
duration: Optional[float] = None,
count: Optional[int] = None,
on_error: Optional[Callable[[Exception], bool]] = None,
):
"""Transmits `messages` with a `period` seconds for `duration` seconds on a `bus`.
Expand All @@ -222,12 +228,13 @@ def __init__(
it shall return either ``True`` or ``False`` depending
on desired behaviour of `ThreadBasedCyclicSendTask`.
"""
super().__init__(messages, period, duration)
super().__init__(messages, period, duration, count)
self.bus = bus
self.send_lock = lock
self.stopped = True
self.thread = None
self.end_time = time.perf_counter() + duration if duration else None
self.end_count = count if count else None
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.end_count = count if count else None
self.end_count = count

self.on_error = on_error

if HAS_EVENTS:
Expand Down Expand Up @@ -257,6 +264,7 @@ def start(self):

def _run(self):
msg_index = 0
count = 0
while not self.stopped:
# Prevent calling bus.send from multiple threads
with self.send_lock:
Expand All @@ -272,7 +280,10 @@ def _run(self):
break
if self.end_time is not None and time.perf_counter() >= self.end_time:
break
if self.end_count is not None and count >= self.end_count:
break
msg_index = (msg_index + 1) % len(self.messages)
count += 1

if HAS_EVENTS:
win32event.WaitForSingleObject(self.event.handle, self.period_ms)
Expand Down
12 changes: 10 additions & 2 deletions can/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ def send_periodic(
msgs: Union[Sequence[Message], Message],
period: float,
duration: Optional[float] = None,
count: Optional[int] = None,
store_task: bool = True,
) -> can.broadcastmanager.CyclicSendTaskABC:
"""Start sending messages at a given period on this bus.
Expand All @@ -194,6 +195,9 @@ def send_periodic(
:param duration:
Approximate duration in seconds to continue sending messages. If
no duration is provided, the task will continue indefinitely.
:param count:
The number of messages to send before stopping. If
no count is provided, the task will continue indefinitely.
:param store_task:
If True (the default) the task will be attached to this Bus instance.
Disable to instead manage tasks manually.
Expand Down Expand Up @@ -222,7 +226,7 @@ def send_periodic(
raise ValueError("Must be either a list, tuple, or a Message")
if not msgs:
raise ValueError("Must be at least a list or tuple of length 1")
task = self._send_periodic_internal(msgs, period, duration)
task = self._send_periodic_internal(msgs, period, duration, count)
# we wrap the task's stop method to also remove it from the Bus's list of tasks
original_stop_method = task.stop

Expand All @@ -246,6 +250,7 @@ def _send_periodic_internal(
msgs: Union[Sequence[Message], Message],
period: float,
duration: Optional[float] = None,
count: Optional[int] = None,
) -> can.broadcastmanager.CyclicSendTaskABC:
"""Default implementation of periodic message sending using threading.

Expand All @@ -258,6 +263,9 @@ def _send_periodic_internal(
:param duration:
The duration between sending each message at the given rate. If
no duration is provided, the task will continue indefinitely.
:param count:
The number of messages to send before stopping. If
no count is provided, the task will continue indefinitely.
:return:
A started task instance. Note the task can be stopped (and
depending on the backend modified) by calling the :meth:`stop`
Expand All @@ -269,7 +277,7 @@ def _send_periodic_internal(
threading.Lock()
) # pylint: disable=attribute-defined-outside-init
task = ThreadBasedCyclicSendTask(
self, self._lock_send_periodic, msgs, period, duration
self, self._lock_send_periodic, msgs, period, duration, count
)
return task

Expand Down