Skip to content

Commit a8b3a87

Browse files
Merge pull request #35 from python-thread/dev
Parallel Processing Decorator
2 parents b4675ed + 8bddf59 commit a8b3a87

File tree

5 files changed

+204
-13
lines changed

5 files changed

+204
-13
lines changed

src/thread/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@
3333

3434
# Export decorators
3535
from .decorators import (
36-
threaded
36+
threaded,
37+
processor
3738
)
3839

3940

src/thread/decorators/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
"""
2+
Export decorators
3+
"""
4+
5+
from ._threaded import threaded
6+
from ._processor import processor

src/thread/decorators/_processor.py

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
"""
2+
## Processor
3+
4+
Documentation: https://thread.ngjx.org
5+
"""
6+
7+
from functools import wraps
8+
from ..thread import ParallelProcessing
9+
10+
from .._types import Overflow_In, Data_In
11+
from typing import Callable, Mapping, Sequence, Optional, Union, overload
12+
from typing_extensions import ParamSpec, TypeVar, Concatenate
13+
14+
15+
_TargetT = TypeVar('_TargetT')
16+
_TargetP = ParamSpec('_TargetP')
17+
_DataT = TypeVar('_DataT')
18+
TargetFunction = Callable[Concatenate[_DataT, _TargetP], _TargetT]
19+
20+
21+
NoParamReturn = Callable[Concatenate[Sequence[_DataT], _TargetP], ParallelProcessing[_TargetP, _TargetT, _DataT]]
22+
WithParamReturn = Callable[[TargetFunction[_DataT, _TargetP, _TargetT]], NoParamReturn[_DataT, _TargetP, _TargetT]]
23+
FullParamReturn = Callable[Concatenate[Sequence[_DataT], _TargetP], ParallelProcessing[_TargetP, _TargetT, _DataT]]
24+
25+
26+
27+
28+
@overload
29+
def processor(__function: TargetFunction[_DataT, _TargetP, _TargetT]) -> NoParamReturn[_DataT, _TargetP, _TargetT]: ...
30+
31+
@overload
32+
def processor(
33+
*,
34+
args: Sequence[Data_In] = (),
35+
kwargs: Mapping[str, Data_In] = {},
36+
ignore_errors: Sequence[type[Exception]] = (),
37+
suppress_errors: bool = False,
38+
**overflow_kwargs: Overflow_In
39+
) -> WithParamReturn[_DataT, _TargetP, _TargetT]: ...
40+
41+
@overload
42+
def processor(
43+
__function: TargetFunction[_DataT, _TargetP, _TargetT],
44+
*,
45+
args: Sequence[Data_In] = (),
46+
kwargs: Mapping[str, Data_In] = {},
47+
ignore_errors: Sequence[type[Exception]] = (),
48+
suppress_errors: bool = False,
49+
**overflow_kwargs: Overflow_In
50+
) -> FullParamReturn[_DataT, _TargetP, _TargetT]: ...
51+
52+
53+
def processor(
54+
__function: Optional[TargetFunction[_DataT, _TargetP, _TargetT]] = None,
55+
*,
56+
args: Sequence[Data_In] = (),
57+
kwargs: Mapping[str, Data_In] = {},
58+
ignore_errors: Sequence[type[Exception]] = (),
59+
suppress_errors: bool = False,
60+
**overflow_kwargs: Overflow_In
61+
) -> Union[NoParamReturn[_DataT, _TargetP, _TargetT], WithParamReturn[_DataT, _TargetP, _TargetT], FullParamReturn[_DataT, _TargetP, _TargetT]]:
62+
"""
63+
Decorate a function to run it in a thread
64+
65+
Parameters
66+
----------
67+
:param __function: The function to run in a thread
68+
:param args: Keyword-Only arguments to pass into `thread.Thread`
69+
:param kwargs: Keyword-Only keyword arguments to pass into `thread.Thread`
70+
:param ignore_errors: Keyword-Only arguments to pass into `thread.Thread`
71+
:param suppress_errors: Keyword-Only arguments to pass into `thread.Thread`
72+
:param **: Keyword-Only arguments to pass into `thread.Thread`
73+
74+
Returns
75+
-------
76+
:return decorator:
77+
78+
Use Case
79+
--------
80+
Now whenever `myfunction` is invoked, it will be executed in a thread and the `Thread` object will be returned
81+
82+
>>> @thread.threaded
83+
>>> def myfunction(*args, **kwargs): ...
84+
85+
>>> myJob = myfunction(1, 2)
86+
>>> type(myjob)
87+
> Thread
88+
89+
You can also pass keyword arguments to change the thread behaviour, it otherwise follows the defaults of `thread.Thread`
90+
>>> @thread.threaded(daemon = True)
91+
>>> def myfunction(): ...
92+
93+
Args will be ordered infront of function-parsed args parsed into `thread.Thread.args`
94+
>>> @thread.threaded(args = (1))
95+
>>> def myfunction(*args):
96+
>>> print(args)
97+
>>>
98+
>>> myfunction(4, 6).get_return_value()
99+
1, 4, 6
100+
"""
101+
102+
if not callable(__function):
103+
def wrapper(func: TargetFunction[_DataT, _TargetP, _TargetT]) -> FullParamReturn[_DataT, _TargetP, _TargetT]:
104+
return processor(
105+
func,
106+
args = args,
107+
kwargs = kwargs,
108+
ignore_errors = ignore_errors,
109+
suppress_errors = suppress_errors,
110+
**overflow_kwargs
111+
)
112+
return wrapper
113+
114+
overflow_kwargs.update({
115+
'ignore_errors': ignore_errors,
116+
'suppress_errors': suppress_errors
117+
})
118+
119+
kwargs = dict(kwargs)
120+
121+
@wraps(__function)
122+
def wrapped(data: Sequence[_DataT], *parsed_args: _TargetP.args, **parsed_kwargs: _TargetP.kwargs) -> ParallelProcessing[_TargetP, _TargetT, _DataT]:
123+
kwargs.update(parsed_kwargs)
124+
125+
processed_args = ( *args, *parsed_args )
126+
processed_kwargs = { i:v for i,v in kwargs.items() if i not in ['args', 'kwargs'] }
127+
128+
job = ParallelProcessing(
129+
function = __function,
130+
dataset = data,
131+
args = processed_args,
132+
kwargs = processed_kwargs,
133+
**overflow_kwargs
134+
)
135+
job.start()
136+
return job
137+
138+
return wrapped

src/thread/decorators.py renamed to src/thread/decorators/_threaded.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,27 @@
11
"""
2-
## Decorators
2+
## Threaded
33
44
Documentation: https://thread.ngjx.org
55
"""
66

77
from functools import wraps
8-
from .thread import Thread
8+
from ..thread import Thread
99

10-
from ._types import Overflow_In, Data_Out, Data_In
10+
from .._types import Overflow_In, Data_In
1111
from typing import Callable, Mapping, Sequence, Optional, Union, overload
1212
from typing_extensions import ParamSpec, TypeVar
1313

1414

1515
T = TypeVar('T')
1616
P = ParamSpec('P')
1717
TargetFunction = Callable[P, T]
18+
19+
1820
NoParamReturn = Callable[P, Thread[P, T]]
1921
WithParamReturn = Callable[[TargetFunction[P, T]], NoParamReturn[P, T]]
2022
FullParamReturn = Callable[P, Thread[P, T]]
21-
WrappedWithParamReturn = Callable[[TargetFunction[P, T]], WithParamReturn[P, T]]
23+
24+
2225

2326

2427
@overload
@@ -131,5 +134,3 @@ def wrapped(*parsed_args: P.args, **parsed_kwargs: P.kwargs) -> Thread[P, T]:
131134
return job
132135

133136
return wrapped
134-
135-

tests/test_decorator.py

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import time
2-
from src.thread import threaded
2+
from src.thread import threaded, processor
33

44

55
# >>>>>>>>>> Dummy Functions <<<<<<<<<< #
@@ -10,24 +10,24 @@ def _dummy_target_raiseToPower(x: float, power: float, delay: float = 0):
1010

1111

1212

13-
# >>>>>>>>>> General Use <<<<<<<<<< #
14-
def test_creationNoParam():
13+
# >>>>>>>>>> Threaded <<<<<<<<<< #
14+
def test_threadedCreationNoParam():
1515
@threaded
1616
def _run(*args):
1717
return _dummy_target_raiseToPower(*args)
1818

1919
x = _run(2, 2)
2020
assert x.get_return_value() == 4
2121

22-
def test_creationEmptyParam():
22+
def test_threadedCreationEmptyParam():
2323
@threaded()
2424
def _run(*args):
2525
return _dummy_target_raiseToPower(*args)
2626

2727
x = _run(2, 2)
2828
assert x.get_return_value() == 4
2929

30-
def test_creationWithParam():
30+
def test_threadedCreationWithParam():
3131
@threaded(daemon = True)
3232
def _run(*args):
3333
return _dummy_target_raiseToPower(*args)
@@ -36,10 +36,55 @@ def _run(*args):
3636
assert x.daemon
3737
assert x.get_return_value() == 4
3838

39-
def test_argJoin():
39+
def test_threadedArgJoin():
4040
@threaded(daemon = True, args = (1, 2, 3))
4141
def _run(*args):
4242
return args
4343

4444
x = _run(8, 9)
4545
assert x.get_return_value() == (1, 2, 3, 8, 9)
46+
47+
48+
49+
50+
def test_processorCreationNoParam():
51+
@processor
52+
def _run(args):
53+
return _dummy_target_raiseToPower(*args)
54+
55+
x = _run([[2, 2]])
56+
assert x.get_return_values() == [4]
57+
58+
def test_processorCreationEmptyParam():
59+
@processor()
60+
def _run(args):
61+
return _dummy_target_raiseToPower(*args)
62+
63+
x = _run([[2, 2]])
64+
assert x.get_return_values() == [4]
65+
66+
def test_processorCreationWithParam():
67+
@processor(daemon = True)
68+
def _run(args):
69+
return _dummy_target_raiseToPower(*args)
70+
71+
x = _run([[2, 2]])
72+
assert len(x._threads) == 1
73+
assert x._threads[0].thread.daemon
74+
assert x.get_return_values() == [4]
75+
76+
def test_processorArgJoin():
77+
@processor(daemon = True, args = (1, 2, 3))
78+
def _run(data, *args):
79+
return [*args, *data]
80+
81+
x = _run([[8, 9]])
82+
assert x.get_return_values() == [[1, 2, 3, 8, 9]]
83+
84+
def test_processorMultiArgJoin():
85+
@processor(daemon = True, args = (1, 2, 3))
86+
def _run(data, *args):
87+
return [*args, *data]
88+
89+
x = _run([[8, 9], [10, 11]])
90+
assert x.get_return_values() == [[1, 2, 3, 8, 9], [1, 2, 3, 10, 11]]

0 commit comments

Comments
 (0)