Skip to content

Commit

Permalink
Ignore previous tasks before submitting ones via map and `map_unord…
Browse files Browse the repository at this point in the history
  • Loading branch information
czgdp1807 authored May 24, 2022
1 parent 806c187 commit 5b9b4fa
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 9 deletions.
21 changes: 17 additions & 4 deletions python/ray/tests/test_actor_pool.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import sys
import time
import pytest
Expand Down Expand Up @@ -101,11 +102,15 @@ def double(self, x):


def test_map_gh23107(init):
sleep_time = 40

# Reference - https://github.com/ray-project/ray/issues/23107
@ray.remote
class DummyActor:
async def identity(self, s):
return s
if s == 6:
await asyncio.sleep(sleep_time)
return s, time.time()

def func(a, v):
return a.identity.remote(v)
Expand All @@ -114,13 +119,21 @@ def func(a, v):

pool_map = ActorPool([DummyActor.remote() for i in range(2)])
pool_map.submit(func, 6)
start_time = time.time()
gen = pool_map.map(func, map_values)
assert list(gen) == [1, 2, 3, 4, 5]
assert all(elem[0] in [1, 2, 3, 4, 5] for elem in list(gen))
assert all(
abs(elem[1] - start_time) < sleep_time in [1, 2, 3, 4, 5] for elem in list(gen)
)

pool_map_unordered = ActorPool([DummyActor.remote() for i in range(2)])
pool_map_unordered.submit(func, 6)
gen = pool_map_unordered.map(func, map_values)
assert all(elem in [1, 2, 3, 4, 5] for elem in list(gen))
start_time = time.time()
gen = pool_map_unordered.map_unordered(func, map_values)
assert all(elem[0] in [1, 2, 3, 4, 5] for elem in list(gen))
assert all(
abs(elem[1] - start_time) < sleep_time in [1, 2, 3, 4, 5] for elem in list(gen)
)


def test_get_next_timeout(init):
Expand Down
28 changes: 23 additions & 5 deletions python/ray/util/actor_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def map(self, fn, values):
# by calling `has_next` and `gen_next` repeteadly.
while self.has_next():
try:
self.get_next(timeout=0)
self.get_next(timeout=0, ignore_if_timedout=True)
except TimeoutError:
pass

Expand Down Expand Up @@ -165,7 +165,7 @@ def has_next(self):
"""
return bool(self._future_to_actor)

def get_next(self, timeout=None):
def get_next(self, timeout=None, ignore_if_timedout=False):
"""Returns the next pending result in order.
This returns the next result produced by submit(), blocking for up to
Expand All @@ -191,20 +191,29 @@ def get_next(self, timeout=None):
"It is not allowed to call get_next() after get_next_unordered()."
)
future = self._index_to_future[self._next_return_index]
timeout_msg = "Timed out waiting for result"
raise_timeout_after_ignore = False
if timeout is not None:
res, _ = ray.wait([future], timeout=timeout)
if not res:
raise TimeoutError("Timed out waiting for result")
if not ignore_if_timedout:
raise TimeoutError(timeout_msg)
else:
raise_timeout_after_ignore = True
del self._index_to_future[self._next_return_index]
self._next_return_index += 1

future_key = tuple(future) if isinstance(future, list) else future
i, a = self._future_to_actor.pop(future_key)

self._return_actor(a)
if raise_timeout_after_ignore:
raise TimeoutError(
timeout_msg + ". The task {} has been ignored.".format(future)
)
return ray.get(future)

def get_next_unordered(self, timeout=None):
def get_next_unordered(self, timeout=None, ignore_if_timedout=False):
"""Returns any of the next pending results.
This returns some result produced by submit(), blocking for up to
Expand Down Expand Up @@ -232,14 +241,23 @@ def get_next_unordered(self, timeout=None):
raise StopIteration("No more results to get")
# TODO(ekl) bulk wait for performance
res, _ = ray.wait(list(self._future_to_actor), num_returns=1, timeout=timeout)
timeout_msg = "Timed out waiting for result"
raise_timeout_after_ignore = False
if res:
[future] = res
else:
raise TimeoutError("Timed out waiting for result")
if not ignore_if_timedout:
raise TimeoutError(timeout_msg)
else:
raise_timeout_after_ignore = True
i, a = self._future_to_actor.pop(future)
self._return_actor(a)
del self._index_to_future[i]
self._next_return_index = max(self._next_return_index, i + 1)
if raise_timeout_after_ignore:
raise TimeoutError(
timeout_msg + ". The task {} has been ignored.".format(future)
)
return ray.get(future)

def _return_actor(self, actor):
Expand Down

0 comments on commit 5b9b4fa

Please sign in to comment.