-
Notifications
You must be signed in to change notification settings - Fork 654
Description
Describe the bug
BufferedReader.get is documented as returning the latest message: "Attempts to retrieve the latest message received by the instance.". In reality BufferedReader.get returns the oldest message in it's queue. This is because the queue is implemented using a SimpleQueue which is a FIFO type queue.
To Reproduce
Receive CAN message and compare the order read using BufferedReader.get to the output of something like candump.
Expected behavior
BufferedReader.get should always return the latest message received. If no message has been received, the most recent item from the queue should be returned.
Additional context
OS and version:
Python version: 3.8
python-can version: 3.3.4
python-can interface/s (if applicable):
I implemented a basic LIFO implementation using LIFO queue. Ultimately I believe a circular LIFO queue would be the best implementation for BufferedReader. My very basic LIFO queue implementation is shown below. Note the hacky way I allow new messages when the queue is full by creating a new queue and overwriting the old one.
class LIFOReader(Listener): """ A BufferedReader is a subclass of :class:~can.Listener which implements a **message buffer**: that is, when the :class:can.BufferedReader instance is notified of a new message it pushes it into a queue of messages waiting to be serviced. The messages can then be fetched with :meth:~can.BufferedReader.get_message`.
Putting in messages after :meth:`~can.BufferedReader.stop` has be called will raise
an exception, see :meth:`~can.BufferedReader.on_message_received`.
:attr bool is_stopped: ``True`` iff the reader has been stopped
"""
def __init__(self, maxsize = 128):
# set to "infinite" size
self.buffer = LifoQueue(maxsize=maxsize)
self.is_stopped = False
def on_message_received(self, msg):
"""Append a message to the buffer.
:raises: BufferError
if the reader has already been stopped
"""
if self.is_stopped:
raise RuntimeError("reader has already been stopped")
else:
try:
self.buffer.put(msg)
except Full:
with self.buffer.mutex:
self.buffer = LifoQueue(maxsize=self.buffer.maxsize)
def get_message(self, timeout=0.5):
"""
Attempts to retrieve the latest message received by the instance. If no message is
available it blocks for given timeout or until a message is received, or else
returns None (whichever is shorter). This method does not block after
:meth:`can.BufferedReader.stop` has been called.
:param float timeout: The number of seconds to wait for a new message.
:rytpe: can.Message or None
:return: the message if there is one, or None if there is not.
"""
try:
return self.buffer.get(block=not self.is_stopped, timeout=timeout)
except Empty:
return None
def stop(self):
"""Prohibits any more additions to this reader.
"""
self.is_stopped = True`