Skip to content

Commit d01a804

Browse files
Add Executors to the library (#12)
* Add executors to the library * Update dependencies
1 parent a4cafb3 commit d01a804

File tree

7 files changed

+277
-198
lines changed

7 files changed

+277
-198
lines changed
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
nav:
22
- pycommons.base.concurrent.executor: executor.md
3+
4+
title: executor

poetry.lock

Lines changed: 178 additions & 186 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
from .direct import DirectExecutor
2+
from .executors import Executors
23

3-
__all__ = ["DirectExecutor"]
4+
__all__ = ["DirectExecutor", "Executors"]
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
from concurrent.futures import ThreadPoolExecutor
2+
from typing import Any
3+
4+
from .direct import DirectExecutor
5+
from ...utils import UtilityClass
6+
7+
8+
class Executors(UtilityClass):
9+
"""
10+
The Executors Utility class that contains methods to create different executors.
11+
"""
12+
13+
@classmethod
14+
def get_direct_executor(cls) -> DirectExecutor:
15+
"""
16+
Get the singleton instance of "DirectExecutor" that runs the callable
17+
in the same thread as the caller.
18+
19+
Returns:
20+
The singleton instance of `DirectExecutor`
21+
"""
22+
return DirectExecutor.get_instance()
23+
24+
@classmethod
25+
def new_single_thread_executor(cls, *args: Any, **kwargs: Any) -> ThreadPoolExecutor:
26+
"""
27+
A special threadpool executor where the max number of worker
28+
threads is 1. If multiple tasks are submitted to this executor, they are queued
29+
until the thread becomes idle.
30+
31+
Args:
32+
*args: Arguments for threadpool executor
33+
**kwargs: Keyword Arguments for threadpool executor
34+
35+
Returns:
36+
a new instance of `ThreadPoolExecutor` with number of threads set to 1
37+
"""
38+
return cls.new_fixed_thread_pool_executor(1, *args, **kwargs)
39+
40+
@classmethod
41+
def new_fixed_thread_pool_executor(
42+
cls, n_threads: int, *args: Any, **kwargs: Any
43+
) -> ThreadPoolExecutor:
44+
"""
45+
A fixed threadpool with number of threads set. Can be used within a context
46+
47+
Args:
48+
n_threads: Number of worker threads
49+
*args: Arguments for threadpool executor
50+
**kwargs: Keyword Arguments for threadpool executor
51+
52+
Returns:
53+
A new instance of threadpool executor
54+
"""
55+
return ThreadPoolExecutor(n_threads, *args, **kwargs)

tests/pycommons/base/concurrent/__init__.py

Whitespace-only changes.

tests/pycommons/base/concurrent/executors/test_direct.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,14 @@ def test_direct_executor_executes_runnable_on_the_same_thread(self):
99
def runnable():
1010
return threading.current_thread()
1111

12-
executor = DirectExecutor.get_instance()
13-
14-
future = executor.submit(runnable)
15-
16-
self.assertEqual(threading.current_thread(), future.result())
12+
with DirectExecutor.get_instance() as executor:
13+
future = executor.submit(runnable)
14+
self.assertEqual(threading.current_thread(), future.result())
1715

1816
def test_direct_executor_executes_runnable_and_throws_exception_on_the_same_thread(self):
1917
def runnable():
20-
raise Exception(threading.current_thread())
21-
22-
executor = DirectExecutor.get_instance()
23-
24-
future = executor.submit(runnable)
18+
raise RuntimeError(threading.current_thread())
2519

26-
self.assertEqual(threading.current_thread(), future.exception().args[0])
20+
with DirectExecutor.get_instance() as executor:
21+
future = executor.submit(runnable)
22+
self.assertEqual(threading.current_thread(), future.exception().args[0])
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import threading
2+
from typing import List
3+
from unittest import TestCase
4+
5+
from pycommons.base.concurrent.executor import Executors
6+
7+
8+
class TestExecutors(TestCase):
9+
def test_single_thread_executor(self):
10+
def runnable(*args):
11+
assert args[0] is ...
12+
return threading.current_thread()
13+
14+
with Executors.new_single_thread_executor() as executor:
15+
threads: List[threading.Thread] = list(executor.map(runnable, (..., ...)))
16+
self.assertEqual(threads[0].ident, threads[1].ident)
17+
18+
def test_direct_executor_executes_runnable_on_the_same_thread(self):
19+
def runnable():
20+
return threading.current_thread()
21+
22+
with Executors.get_direct_executor() as executor:
23+
future = executor.submit(runnable)
24+
self.assertEqual(threading.current_thread(), future.result())
25+
26+
def test_fixed_threadpool_executor(self):
27+
def runnable(*args):
28+
assert args[0] is ...
29+
return threading.current_thread()
30+
31+
with Executors.new_fixed_thread_pool_executor(1) as executor:
32+
threads: List[threading.Thread] = list(executor.map(runnable, (..., ...)))
33+
self.assertEqual(threads[0].ident, threads[1].ident)

0 commit comments

Comments
 (0)