Skip to content

Parallel Processing Decorator #35

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/thread/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@

# Export decorators
from .decorators import (
threaded
threaded,
processor
)


Expand Down
6 changes: 6 additions & 0 deletions src/thread/decorators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""
Export decorators
"""

from ._threaded import threaded
from ._processor import processor
138 changes: 138 additions & 0 deletions src/thread/decorators/_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""
## Processor

Documentation: https://thread.ngjx.org
"""

from functools import wraps
from ..thread import ParallelProcessing

from .._types import Overflow_In, Data_In
from typing import Callable, Mapping, Sequence, Optional, Union, overload
from typing_extensions import ParamSpec, TypeVar, Concatenate


_TargetT = TypeVar('_TargetT')
_TargetP = ParamSpec('_TargetP')
_DataT = TypeVar('_DataT')
TargetFunction = Callable[Concatenate[_DataT, _TargetP], _TargetT]


NoParamReturn = Callable[Concatenate[Sequence[_DataT], _TargetP], ParallelProcessing[_TargetP, _TargetT, _DataT]]
WithParamReturn = Callable[[TargetFunction[_DataT, _TargetP, _TargetT]], NoParamReturn[_DataT, _TargetP, _TargetT]]
FullParamReturn = Callable[Concatenate[Sequence[_DataT], _TargetP], ParallelProcessing[_TargetP, _TargetT, _DataT]]




@overload
def processor(__function: TargetFunction[_DataT, _TargetP, _TargetT]) -> NoParamReturn[_DataT, _TargetP, _TargetT]: ...

@overload
def processor(
*,
args: Sequence[Data_In] = (),
kwargs: Mapping[str, Data_In] = {},
ignore_errors: Sequence[type[Exception]] = (),
suppress_errors: bool = False,
**overflow_kwargs: Overflow_In
) -> WithParamReturn[_DataT, _TargetP, _TargetT]: ...

@overload
def processor(
__function: TargetFunction[_DataT, _TargetP, _TargetT],
*,
args: Sequence[Data_In] = (),
kwargs: Mapping[str, Data_In] = {},
ignore_errors: Sequence[type[Exception]] = (),
suppress_errors: bool = False,
**overflow_kwargs: Overflow_In
) -> FullParamReturn[_DataT, _TargetP, _TargetT]: ...


def processor(
__function: Optional[TargetFunction[_DataT, _TargetP, _TargetT]] = None,
*,
args: Sequence[Data_In] = (),
kwargs: Mapping[str, Data_In] = {},
ignore_errors: Sequence[type[Exception]] = (),
suppress_errors: bool = False,
**overflow_kwargs: Overflow_In
) -> Union[NoParamReturn[_DataT, _TargetP, _TargetT], WithParamReturn[_DataT, _TargetP, _TargetT], FullParamReturn[_DataT, _TargetP, _TargetT]]:
"""
Decorate a function to run it in a thread

Parameters
----------
:param __function: The function to run in a thread
:param args: Keyword-Only arguments to pass into `thread.Thread`
:param kwargs: Keyword-Only keyword arguments to pass into `thread.Thread`
:param ignore_errors: Keyword-Only arguments to pass into `thread.Thread`
:param suppress_errors: Keyword-Only arguments to pass into `thread.Thread`
:param **: Keyword-Only arguments to pass into `thread.Thread`

Returns
-------
:return decorator:

Use Case
--------
Now whenever `myfunction` is invoked, it will be executed in a thread and the `Thread` object will be returned

>>> @thread.threaded
>>> def myfunction(*args, **kwargs): ...

>>> myJob = myfunction(1, 2)
>>> type(myjob)
> Thread

You can also pass keyword arguments to change the thread behaviour, it otherwise follows the defaults of `thread.Thread`
>>> @thread.threaded(daemon = True)
>>> def myfunction(): ...

Args will be ordered infront of function-parsed args parsed into `thread.Thread.args`
>>> @thread.threaded(args = (1))
>>> def myfunction(*args):
>>> print(args)
>>>
>>> myfunction(4, 6).get_return_value()
1, 4, 6
"""

if not callable(__function):
def wrapper(func: TargetFunction[_DataT, _TargetP, _TargetT]) -> FullParamReturn[_DataT, _TargetP, _TargetT]:
return processor(
func,
args = args,
kwargs = kwargs,
ignore_errors = ignore_errors,
suppress_errors = suppress_errors,
**overflow_kwargs
)
return wrapper

overflow_kwargs.update({
'ignore_errors': ignore_errors,
'suppress_errors': suppress_errors
})

kwargs = dict(kwargs)

@wraps(__function)
def wrapped(data: Sequence[_DataT], *parsed_args: _TargetP.args, **parsed_kwargs: _TargetP.kwargs) -> ParallelProcessing[_TargetP, _TargetT, _DataT]:
kwargs.update(parsed_kwargs)

processed_args = ( *args, *parsed_args )
processed_kwargs = { i:v for i,v in kwargs.items() if i not in ['args', 'kwargs'] }

job = ParallelProcessing(
function = __function,
dataset = data,
args = processed_args,
kwargs = processed_kwargs,
**overflow_kwargs
)
job.start()
return job

return wrapped
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
"""
## Decorators
## Threaded

Documentation: https://thread.ngjx.org
"""

from functools import wraps
from .thread import Thread
from ..thread import Thread

from ._types import Overflow_In, Data_Out, Data_In
from .._types import Overflow_In, Data_In
from typing import Callable, Mapping, Sequence, Optional, Union, overload
from typing_extensions import ParamSpec, TypeVar


T = TypeVar('T')
P = ParamSpec('P')
TargetFunction = Callable[P, T]


NoParamReturn = Callable[P, Thread[P, T]]
WithParamReturn = Callable[[TargetFunction[P, T]], NoParamReturn[P, T]]
FullParamReturn = Callable[P, Thread[P, T]]
WrappedWithParamReturn = Callable[[TargetFunction[P, T]], WithParamReturn[P, T]]




@overload
Expand Down Expand Up @@ -131,5 +134,3 @@ def wrapped(*parsed_args: P.args, **parsed_kwargs: P.kwargs) -> Thread[P, T]:
return job

return wrapped


57 changes: 51 additions & 6 deletions tests/test_decorator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import time
from src.thread import threaded
from src.thread import threaded, processor


# >>>>>>>>>> Dummy Functions <<<<<<<<<< #
Expand All @@ -10,24 +10,24 @@ def _dummy_target_raiseToPower(x: float, power: float, delay: float = 0):



# >>>>>>>>>> General Use <<<<<<<<<< #
def test_creationNoParam():
# >>>>>>>>>> Threaded <<<<<<<<<< #
def test_threadedCreationNoParam():
@threaded
def _run(*args):
return _dummy_target_raiseToPower(*args)

x = _run(2, 2)
assert x.get_return_value() == 4

def test_creationEmptyParam():
def test_threadedCreationEmptyParam():
@threaded()
def _run(*args):
return _dummy_target_raiseToPower(*args)

x = _run(2, 2)
assert x.get_return_value() == 4

def test_creationWithParam():
def test_threadedCreationWithParam():
@threaded(daemon = True)
def _run(*args):
return _dummy_target_raiseToPower(*args)
Expand All @@ -36,10 +36,55 @@ def _run(*args):
assert x.daemon
assert x.get_return_value() == 4

def test_argJoin():
def test_threadedArgJoin():
@threaded(daemon = True, args = (1, 2, 3))
def _run(*args):
return args

x = _run(8, 9)
assert x.get_return_value() == (1, 2, 3, 8, 9)




def test_processorCreationNoParam():
@processor
def _run(args):
return _dummy_target_raiseToPower(*args)

x = _run([[2, 2]])
assert x.get_return_values() == [4]

def test_processorCreationEmptyParam():
@processor()
def _run(args):
return _dummy_target_raiseToPower(*args)

x = _run([[2, 2]])
assert x.get_return_values() == [4]

def test_processorCreationWithParam():
@processor(daemon = True)
def _run(args):
return _dummy_target_raiseToPower(*args)

x = _run([[2, 2]])
assert len(x._threads) == 1
assert x._threads[0].thread.daemon
assert x.get_return_values() == [4]

def test_processorArgJoin():
@processor(daemon = True, args = (1, 2, 3))
def _run(data, *args):
return [*args, *data]

x = _run([[8, 9]])
assert x.get_return_values() == [[1, 2, 3, 8, 9]]

def test_processorMultiArgJoin():
@processor(daemon = True, args = (1, 2, 3))
def _run(data, *args):
return [*args, *data]

x = _run([[8, 9], [10, 11]])
assert x.get_return_values() == [[1, 2, 3, 8, 9], [1, 2, 3, 10, 11]]