Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions executorlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
SlurmClusterExecutor,
SlurmJobExecutor,
)
from executorlib.standalone.select import get_item_from_future, split_future


def get_cache_data(cache_directory: str) -> list[dict]:
Expand Down Expand Up @@ -66,6 +67,8 @@ def terminate_tasks_in_cache(

__all__: list[str] = [
"get_cache_data",
"get_item_from_future",
"split_future",
"terminate_tasks_in_cache",
"BaseExecutor",
"FluxJobExecutor",
Expand Down
72 changes: 72 additions & 0 deletions executorlib/standalone/select.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from concurrent.futures import Future
from typing import Any, Optional


class FutureSelector(Future):
def __init__(self, future: Future, selector: int | str):
super().__init__()
self._future = future
self._selector = selector

def cancel(self) -> bool:
return self._future.cancel()

def cancelled(self) -> bool:
return self._future.cancelled()

def running(self) -> bool:
return self._future.running()

def done(self) -> bool:
return self._future.done()

def add_done_callback(self, fn) -> None:
return self._future.add_done_callback(fn=fn)

def result(self, timeout: Optional[float] = None) -> Any:
result = self._future.result(timeout=timeout)
if result is not None:
return result[self._selector]
else:
return None

def exception(self, timeout: Optional[float] = None) -> Optional[BaseException]:
return self._future.exception(timeout=timeout)

def set_running_or_notify_cancel(self) -> bool:
return self._future.set_running_or_notify_cancel()

def set_result(self, result: Any) -> None:
return self._future.set_result(result=result)

def set_exception(self, exception: Optional[BaseException]) -> None:
return self._future.set_exception(exception=exception)


def split_future(future: Future, n: int) -> list[FutureSelector]:
"""
Split a concurrent.futures.Future object which returns a tuple or list as result into individual future objects

Args:
future (Future): future object which returns a tuple or list as result
n: number of elements expected in the future object

Returns:
list: List of future objects
"""
return [FutureSelector(future=future, selector=i) for i in range(n)]


def get_item_from_future(future: Future, key: str) -> FutureSelector:
"""
Get item from concurrent.futures.Future object which returns a dictionary as result by the corresponding dictionary
key.

Args:
future (Future): future object which returns a dictionary as result
key (str): dictionary key to get item from dictionary

Returns:
FutureSelector: Future object which returns the value corresponding to the key
"""
return FutureSelector(future=future, selector=key)
82 changes: 82 additions & 0 deletions tests/test_standalone_select.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import unittest
from concurrent.futures import Future
from executorlib import SingleNodeExecutor, split_future, get_item_from_future
from executorlib.api import cloudpickle_register
from executorlib.standalone.select import FutureSelector


def function_returns_tuple(i):
return "a", "b", i


def function_returns_dict(i):
return {"a": 1, "b": 2, "c": i}


def function_with_exception(i):
raise RuntimeError()


def callback(future):
print("callback:", future.result())


class TestSplitFuture(unittest.TestCase):
def test_integration_return_tuple(self):
with SingleNodeExecutor() as exe:
cloudpickle_register(ind=1)
future = exe.submit(function_returns_tuple, 15)
f1, f2, f3 = split_future(future=future, n=3)
self.assertEqual(f1.result(), "a")
self.assertEqual(f2.result(), "b")
self.assertEqual(f3.result(), 15)
self.assertTrue(f1.done())
self.assertTrue(f2.done())
self.assertTrue(f3.done())

def test_integration_return_dict(self):
with SingleNodeExecutor() as exe:
cloudpickle_register(ind=1)
future = exe.submit(function_returns_dict, 15)
f1 = get_item_from_future(future=future, key="a")
f2 = get_item_from_future(future=future, key="b")
f3 = get_item_from_future(future=future, key="c")
self.assertEqual(f1.result(), 1)
self.assertEqual(f2.result(), 2)
self.assertEqual(f3.result(), 15)
self.assertTrue(f1.done())
self.assertTrue(f2.done())
self.assertTrue(f3.done())

def test_integration_exception(self):
with SingleNodeExecutor() as exe:
cloudpickle_register(ind=1)
future = exe.submit(function_with_exception, 15)
f1, f2, f3 = split_future(future=future, n=3)
with self.assertRaises(RuntimeError):
f3.result()

def test_split_future_object(self):
f1 = Future()
fs1 = FutureSelector(future=f1, selector=1)
fs1.add_done_callback(callback)
fs1.set_running_or_notify_cancel()
self.assertTrue(fs1.running())
fs1.set_result([1, 2])
self.assertEqual(fs1.result(), 2)
f2 = Future()
fs2 = FutureSelector(future=f2, selector=1)
fs2.cancel()
self.assertTrue(fs2.cancelled())
f3 = Future()
fs3 = FutureSelector(future=f3, selector=1)
fs3.set_running_or_notify_cancel()
self.assertTrue(fs3.running())
fs3.set_result(None)
self.assertEqual(fs3.result(), None)
f4 = Future()
fs4 = FutureSelector(future=f4, selector=1)
fs4.set_exception(RuntimeError())
self.assertEqual(type(fs4.exception()), RuntimeError)
with self.assertRaises(RuntimeError):
fs4.result()
Loading