Skip to content

Commit

Permalink
bpo-14976: Reentrant simple queue (#3346)
Browse files Browse the repository at this point in the history
Add a queue.SimpleQueue class, an unbounded FIFO queue with a reentrant C implementation of put().
  • Loading branch information
pitrou authored Jan 15, 2018
1 parent 5ec0fee commit 94e1696
Show file tree
Hide file tree
Showing 12 changed files with 1,125 additions and 12 deletions.
73 changes: 70 additions & 3 deletions Doc/library/queue.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,14 @@ the first retrieved (operating like a stack). With a priority queue,
the entries are kept sorted (using the :mod:`heapq` module) and the
lowest valued entry is retrieved first.

Internally, the module uses locks to temporarily block competing threads;
however, it is not designed to handle reentrancy within a thread.
Internally, those three types of queues use locks to temporarily block
competing threads; however, they are not designed to handle reentrancy
within a thread.

In addition, the module implements a "simple"
:abbr:`FIFO (first-in, first-out)` queue type where
specific implementations can provide additional guarantees
in exchange for the smaller functionality.

The :mod:`queue` module defines the following classes and exceptions:

Expand Down Expand Up @@ -67,6 +73,14 @@ The :mod:`queue` module defines the following classes and exceptions:
priority: int
item: Any=field(compare=False)

.. class:: SimpleQueue()

Constructor for an unbounded :abbr:`FIFO (first-in, first-out)` queue.
Simple queues lack advanced functionality such as task tracking.

.. versionadded:: 3.7


.. exception:: Empty

Exception raised when non-blocking :meth:`~Queue.get` (or
Expand Down Expand Up @@ -201,6 +215,60 @@ Example of how to wait for enqueued tasks to be completed::
t.join()


SimpleQueue Objects
-------------------

:class:`SimpleQueue` objects provide the public methods described below.

.. method:: SimpleQueue.qsize()

Return the approximate size of the queue. Note, qsize() > 0 doesn't
guarantee that a subsequent get() will not block.


.. method:: SimpleQueue.empty()

Return ``True`` if the queue is empty, ``False`` otherwise. If empty()
returns ``False`` it doesn't guarantee that a subsequent call to get()
will not block.


.. method:: SimpleQueue.put(item, block=True, timeout=None)

Put *item* into the queue. The method never blocks and always succeeds
(except for potential low-level errors such as failure to allocate memory).
The optional args *block* and *timeout* are ignored and only provided
for compatibility with :meth:`Queue.put`.

.. impl-detail::
This method has a C implementation which is reentrant. That is, a
``put()`` or ``get()`` call can be interrupted by another ``put()``
call in the same thread without deadlocking or corrupting internal
state inside the queue. This makes it appropriate for use in
destructors such as ``__del__`` methods or :mod:`weakref` callbacks.


.. method:: SimpleQueue.put_nowait(item)

Equivalent to ``put(item)``, provided for compatibility with
:meth:`Queue.put_nowait`.


.. method:: SimpleQueue.get(block=True, timeout=None)

Remove and return an item from the queue. If optional args *block* is true and
*timeout* is ``None`` (the default), block if necessary until an item is available.
If *timeout* is a positive number, it blocks at most *timeout* seconds and
raises the :exc:`Empty` exception if no item was available within that time.
Otherwise (*block* is false), return an item if one is immediately available,
else raise the :exc:`Empty` exception (*timeout* is ignored in that case).


.. method:: SimpleQueue.get_nowait()

Equivalent to ``get(False)``.


.. seealso::

Class :class:`multiprocessing.Queue`
Expand All @@ -210,4 +278,3 @@ Example of how to wait for enqueued tasks to be completed::
:class:`collections.deque` is an alternative implementation of unbounded
queues with fast atomic :meth:`~collections.deque.append` and
:meth:`~collections.deque.popleft` operations that do not require locking.

86 changes: 82 additions & 4 deletions Lib/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,26 @@
from collections import deque
from heapq import heappush, heappop
from time import monotonic as time
try:
from _queue import SimpleQueue
except ImportError:
SimpleQueue = None

__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']
__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue', 'SimpleQueue']

class Empty(Exception):
'Exception raised by Queue.get(block=0)/get_nowait().'
pass

try:
from _queue import Empty
except AttributeError:
class Empty(Exception):
'Exception raised by Queue.get(block=0)/get_nowait().'
pass

class Full(Exception):
'Exception raised by Queue.put(block=0)/put_nowait().'
pass


class Queue:
'''Create a queue object with a given maximum size.
Expand Down Expand Up @@ -241,3 +250,72 @@ def _put(self, item):

def _get(self):
return self.queue.pop()


class _PySimpleQueue:
'''Simple, unbounded FIFO queue.
This pure Python implementation is not reentrant.
'''
# Note: while this pure Python version provides fairness
# (by using a threading.Semaphore which is itself fair, being based
# on threading.Condition), fairness is not part of the API contract.
# This allows the C version to use a different implementation.

def __init__(self):
self._queue = deque()
self._count = threading.Semaphore(0)

def put(self, item, block=True, timeout=None):
'''Put the item on the queue.
The optional 'block' and 'timeout' arguments are ignored, as this method
never blocks. They are provided for compatibility with the Queue class.
'''
self._queue.append(item)
self._count.release()

def get(self, block=True, timeout=None):
'''Remove and return an item from the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until an item is available. If 'timeout' is
a non-negative number, it blocks at most 'timeout' seconds and raises
the Empty exception if no item was available within that time.
Otherwise ('block' is false), return an item if one is immediately
available, else raise the Empty exception ('timeout' is ignored
in that case).
'''
if timeout is not None and timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
if not self._count.acquire(block, timeout):
raise Empty
return self._queue.popleft()

def put_nowait(self, item):
'''Put an item into the queue without blocking.
This is exactly equivalent to `put(item)` and is only provided
for compatibility with the Queue class.
'''
return self.put(item, block=False)

def get_nowait(self):
'''Remove and return an item from the queue without blocking.
Only get an item if one is immediately available. Otherwise
raise the Empty exception.
'''
return self.get(block=False)

def empty(self):
'''Return True if the queue is empty, False otherwise (not reliable!).'''
return len(self._queue) == 0

def qsize(self):
'''Return the approximate size of the queue (not reliable!).'''
return len(self._queue)


if SimpleQueue is None:
SimpleQueue = _PySimpleQueue
Loading

0 comments on commit 94e1696

Please sign in to comment.