Skip to content

Commit

Permalink
[Utils] Add Queue async and batch methods (#12578)
Browse files Browse the repository at this point in the history
  • Loading branch information
architkulkarni authored Dec 10, 2020
1 parent 38ba238 commit 3fd3cb9
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 24 deletions.
66 changes: 64 additions & 2 deletions python/ray/tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from ray.util.queue import Queue, Empty, Full


# Remote helper functions for testing concurrency
@ray.remote
def async_get(queue):
return queue.get(block=True)
Expand Down Expand Up @@ -50,6 +51,29 @@ def test_get(ray_start_regular_shared):
q.get(timeout=0.2)


@pytest.mark.asyncio
async def test_get_async(ray_start_regular_shared):

q = Queue()

item = 0
await q.put_async(item)
assert await q.get_async(block=False) == item

item = 1
await q.put_async(item)
assert await q.get_async(timeout=0.2) == item

with pytest.raises(ValueError):
await q.get_async(timeout=-1)

with pytest.raises(Empty):
await q.get_async(block=False)

with pytest.raises(Empty):
await q.get_async(timeout=0.2)


def test_put(ray_start_regular_shared):

q = Queue(1)
Expand All @@ -73,7 +97,31 @@ def test_put(ray_start_regular_shared):
q.put(1, timeout=0.2)


def test_async_get(ray_start_regular_shared):
@pytest.mark.asyncio
async def test_put_async(ray_start_regular_shared):

q = Queue(1)

item = 0
await q.put_async(item, block=False)
assert await q.get_async() == item

item = 1
await q.put_async(item, timeout=0.2)
assert await q.get_async() == item

with pytest.raises(ValueError):
await q.put_async(0, timeout=-1)

await q.put_async(0)
with pytest.raises(Full):
await q.put_async(1, block=False)

with pytest.raises(Full):
await q.put_async(1, timeout=0.2)


def test_concurrent_get(ray_start_regular_shared):
q = Queue()
future = async_get.remote(q)

Expand All @@ -87,7 +135,7 @@ def test_async_get(ray_start_regular_shared):
assert ray.get(future) == 1


def test_async_put(ray_start_regular_shared):
def test_concurrent_put(ray_start_regular_shared):
q = Queue(1)
q.put(1)
future = async_put.remote(q, 2)
Expand All @@ -102,6 +150,20 @@ def test_async_put(ray_start_regular_shared):
assert q.get() == 2


def test_batch(ray_start_regular_shared):
q = Queue(1)

with pytest.raises(Full):
q.put_nowait_batch([1, 2])

with pytest.raises(Empty):
q.get_nowait_batch(1)

big_q = Queue(100)
big_q.put_nowait_batch(list(range(100)))
assert big_q.get_nowait_batch(100) == list(range(100))


def test_qsize(ray_start_regular_shared):

q = Queue()
Expand Down
168 changes: 146 additions & 22 deletions python/ray/util/queue.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
from typing import Optional, Any, List
from collections.abc import Iterable

import ray

Expand All @@ -12,44 +14,69 @@ class Full(Exception):


class Queue:
"""Queue implementation on Ray.
"""A first-in, first-out queue implementation on Ray.
The behavior and use cases are similar to those of the asyncio.Queue class.
Features both sync and async put and get methods. Provides the option to
block until space is available when calling put on a full queue,
or to block until items are available when calling get on an empty queue.
Optionally supports batched put and get operations to minimize
serialization overhead.
Args:
maxsize (int): maximum size of the queue. If zero, size is unbounded.
maxsize (optional, int): maximum size of the queue. If zero, size is
unbounded.
Examples:
>>> q = Queue()
>>> items = list(range(10))
>>> for item in items:
>>> q.put(item)
>>> for item in items:
>>> assert item == q.get()
"""

def __init__(self, maxsize=0):
self.actor = _QueueActor.remote(maxsize)
def __init__(self, maxsize: int = 0) -> None:
self.maxsize = maxsize
self.actor = _QueueActor.remote(self.maxsize)

def __len__(self):
def __len__(self) -> int:
return self.size()

def size(self):
def size(self) -> int:
"""The size of the queue."""
return ray.get(self.actor.qsize.remote())

def qsize(self):
def qsize(self) -> int:
"""The size of the queue."""
return self.size()

def empty(self):
def empty(self) -> bool:
"""Whether the queue is empty."""
return ray.get(self.actor.empty.remote())

def full(self):
def full(self) -> bool:
"""Whether the queue is full."""
return ray.get(self.actor.full.remote())

def put(self, item, block=True, timeout=None):
def put(self,
item: Any,
block: bool = True,
timeout: Optional[float] = None) -> None:
"""Adds an item to the queue.
If block is True and the queue is full, blocks until the queue is no
longer full or until timeout.
There is no guarantee of order if multiple producers put to the same
full queue.
Raises:
Full if the queue is full and blocking is False.
Full if the queue is full, blocking is True, and it timed out.
ValueError if timeout is negative.
Full: if the queue is full and blocking is False.
Full: if the queue is full, blocking is True, and it timed out.
ValueError: if timeout is negative.
"""
if not block:
try:
Expand All @@ -62,19 +89,50 @@ def put(self, item, block=True, timeout=None):
else:
ray.get(self.actor.put.remote(item, timeout))

def get(self, block=True, timeout=None):
async def put_async(self,
item: Any,
block: bool = True,
timeout: Optional[float] = None) -> None:
"""Adds an item to the queue.
If block is True and the queue is full,
blocks until the queue is no longer full or until timeout.
There is no guarantee of order if multiple producers put to the same
full queue.
Raises:
Full: if the queue is full and blocking is False.
Full: if the queue is full, blocking is True, and it timed out.
ValueError: if timeout is negative.
"""
if not block:
try:
await self.actor.put_nowait.remote(item)
except asyncio.QueueFull:
raise Full
else:
if timeout is not None and timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
await self.actor.put.remote(item, timeout)

def get(self, block: bool = True, timeout: Optional[float] = None) -> Any:
"""Gets an item from the queue.
If block is True and the queue is empty, blocks until the queue is no
longer empty or until timeout.
There is no guarantee of order if multiple consumers get from the
same empty queue.
Returns:
The next item in the queue.
Raises:
Empty if the queue is empty and blocking is False.
Empty if the queue is empty, blocking is True, and it timed out.
ValueError if timeout is negative.
Empty: if the queue is empty and blocking is False.
Empty: if the queue is empty, blocking is True, and it timed out.
ValueError: if timeout is negative.
"""
if not block:
try:
Expand All @@ -87,27 +145,79 @@ def get(self, block=True, timeout=None):
else:
return ray.get(self.actor.get.remote(timeout))

def put_nowait(self, item):
async def get_async(self,
block: bool = True,
timeout: Optional[float] = None) -> Any:
"""Gets an item from the queue.
There is no guarantee of order if multiple consumers get from the
same empty queue.
Returns:
The next item in the queue.
Raises:
Empty: if the queue is empty and blocking is False.
Empty: if the queue is empty, blocking is True, and it timed out.
ValueError: if timeout is negative.
"""
if not block:
try:
return await self.actor.get_nowait.remote()
except asyncio.QueueEmpty:
raise Empty
else:
if timeout is not None and timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
return await self.actor.get.remote(timeout)

def put_nowait(self, item: Any) -> None:
"""Equivalent to put(item, block=False).
Raises:
Full if the queue is full.
Full: if the queue is full.
"""
return self.put(item, block=False)

def get_nowait(self):
def put_nowait_batch(self, items: Iterable) -> None:
"""Takes in a list of items and puts them into the queue in order.
Raises:
Full: if the items will not fit in the queue
"""
if not isinstance(items, Iterable):
raise TypeError("Argument 'items' must be an Iterable")

ray.get(self.actor.put_nowait_batch.remote(items))

def get_nowait(self) -> Any:
"""Equivalent to get(block=False).
Raises:
Empty if the queue is empty.
Empty: if the queue is empty.
"""
return self.get(block=False)

def get_nowait_batch(self, num_items: int) -> List[Any]:
"""Gets items from the queue and returns them in a
list in order.
Raises:
Empty: if the queue does not contain the desired number of items
"""
if not isinstance(num_items, int):
raise TypeError("Argument 'num_items' must be an int")
if num_items < 0:
raise ValueError("'num_items' must be nonnegative")

return ray.get(self.actor.get_nowait_batch.remote(num_items))


@ray.remote
class _QueueActor:
def __init__(self, maxsize):
self.queue = asyncio.Queue(maxsize)
self.maxsize = maxsize
self.queue = asyncio.Queue(self.maxsize)

def qsize(self):
return self.queue.qsize()
Expand All @@ -133,5 +243,19 @@ async def get(self, timeout=None):
def put_nowait(self, item):
self.queue.put_nowait(item)

def put_nowait_batch(self, items):
# If maxsize is 0, queue is unbounded, so no need to check size.
if self.maxsize > 0 and len(items) + self.qsize() > self.maxsize:
raise Full(f"Cannot add {len(items)} items to queue of size "
f"{self.qsize()} and maxsize {self.maxsize}.")
for item in items:
self.queue.put_nowait(item)

def get_nowait(self):
return self.queue.get_nowait()

def get_nowait_batch(self, num_items):
if num_items > self.qsize():
raise Empty(f"Cannot get {num_items} items from queue of size "
f"{self.qsize()}.")
return [self.queue.get_nowait() for _ in range(num_items)]

0 comments on commit 3fd3cb9

Please sign in to comment.