From f224a9cce3008bff161bc182d40b22cd267df433 Mon Sep 17 00:00:00 2001 From: NuclearMissile Date: Thu, 22 Feb 2024 11:20:59 +0900 Subject: [PATCH 1/6] add test case --- tests/_parallel/test_parallel_stream.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 tests/_parallel/test_parallel_stream.py diff --git a/tests/_parallel/test_parallel_stream.py b/tests/_parallel/test_parallel_stream.py new file mode 100644 index 0000000..ebea2b8 --- /dev/null +++ b/tests/_parallel/test_parallel_stream.py @@ -0,0 +1,24 @@ +from unittest import TestCase + +from pystreamapi import Stream +from pystreamapi.conditions import prime + + +class TestParallelStream(TestCase): + def test_parallel(self): + # success 0 + (Stream.of(range(10)).parallel() + .map(lambda x: x * 2) + .for_each(print)) + + # success 1 + (Stream.parallel_of(range(10)) + .map(lambda x: x * 2) + .filter(prime()) + .for_each(print)) + + # failed + (Stream.of(range(10)).parallel() + .map(lambda x: x * 2) + .filter(prime()) + .for_each(print)) From 1449b5319ad2f459ae06f4dd084dddee3392be2a Mon Sep 17 00:00:00 2001 From: NuclearMissile Date: Thu, 22 Feb 2024 11:20:59 +0900 Subject: [PATCH 2/6] fix .parallel() bug --- pystreamapi/__stream_converter.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pystreamapi/__stream_converter.py b/pystreamapi/__stream_converter.py index f9a075e..df61519 100644 --- a/pystreamapi/__stream_converter.py +++ b/pystreamapi/__stream_converter.py @@ -1,3 +1,4 @@ +from pystreamapi._parallel.fork_and_join import Parallelizer from pystreamapi._streams.__base_stream import BaseStream from pystreamapi._streams.__parallel_stream import ParallelStream from pystreamapi._streams.__sequential_stream import SequentialStream @@ -25,6 +26,7 @@ def to_parallel_stream(stream: BaseStream) -> ParallelStream: stream.__class__ = ParallelNumericStream elif isinstance(stream, SequentialStream): stream.__class__ = ParallelStream + stream._parallelizer = Parallelizer() return stream @staticmethod From 3a9649170c58d99ef9cb0d2e4b8af45a08080038 Mon Sep 17 00:00:00 2001 From: NuclearMissile Date: Fri, 23 Feb 2024 11:18:25 +0900 Subject: [PATCH 3/6] edit comments --- tests/_parallel/test_parallel_stream.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/_parallel/test_parallel_stream.py b/tests/_parallel/test_parallel_stream.py index ebea2b8..26e5956 100644 --- a/tests/_parallel/test_parallel_stream.py +++ b/tests/_parallel/test_parallel_stream.py @@ -6,18 +6,18 @@ class TestParallelStream(TestCase): def test_parallel(self): - # success 0 + # success (Stream.of(range(10)).parallel() .map(lambda x: x * 2) .for_each(print)) - # success 1 + # success (Stream.parallel_of(range(10)) .map(lambda x: x * 2) .filter(prime()) .for_each(print)) - # failed + # fail -> fixed (Stream.of(range(10)).parallel() .map(lambda x: x * 2) .filter(prime()) From 426c4e44f5265fda0a8625787ea9182eb026dfb5 Mon Sep 17 00:00:00 2001 From: Stefan Garlonta Date: Fri, 23 Feb 2024 21:17:58 +0100 Subject: [PATCH 4/6] :bug: Fix parallel() not initializing _parallelizer properly --- pystreamapi/__stream_converter.py | 5 +++-- pystreamapi/_streams/__parallel_stream.py | 3 +++ tests/_parallel/test_parallel_stream.py | 24 ----------------------- tests/_streams/test_stream_converter.py | 9 +++++++++ 4 files changed, 15 insertions(+), 26 deletions(-) delete mode 100644 tests/_parallel/test_parallel_stream.py diff --git a/pystreamapi/__stream_converter.py b/pystreamapi/__stream_converter.py index df61519..12b4a1c 100644 --- a/pystreamapi/__stream_converter.py +++ b/pystreamapi/__stream_converter.py @@ -1,4 +1,3 @@ -from pystreamapi._parallel.fork_and_join import Parallelizer from pystreamapi._streams.__base_stream import BaseStream from pystreamapi._streams.__parallel_stream import ParallelStream from pystreamapi._streams.__sequential_stream import SequentialStream @@ -17,6 +16,7 @@ def to_numeric_stream(stream: BaseStream) -> NumericBaseStream: stream.__class__ = SequentialNumericStream if isinstance(stream, ParallelStream): stream.__class__ = ParallelNumericStream + stream._init_parallelizer() return stream @staticmethod @@ -24,9 +24,10 @@ def to_parallel_stream(stream: BaseStream) -> ParallelStream: """Converts a stream to a parallel stream.""" if isinstance(stream, SequentialNumericStream): stream.__class__ = ParallelNumericStream + stream._init_parallelizer() elif isinstance(stream, SequentialStream): stream.__class__ = ParallelStream - stream._parallelizer = Parallelizer() + stream._init_parallelizer() return stream @staticmethod diff --git a/pystreamapi/_streams/__parallel_stream.py b/pystreamapi/_streams/__parallel_stream.py index 7583b20..833c9bd 100644 --- a/pystreamapi/_streams/__parallel_stream.py +++ b/pystreamapi/_streams/__parallel_stream.py @@ -20,6 +20,9 @@ def __init__(self, source: Iterable[stream.K]): super().__init__(source) self._parallelizer = Parallelizer() + def _init_parallelizer(self): + self._parallelizer = Parallelizer() + @terminal def all_match(self, predicate: Callable[[Any], bool]): return all(Parallel(n_jobs=-1, prefer="threads", handler=self) diff --git a/tests/_parallel/test_parallel_stream.py b/tests/_parallel/test_parallel_stream.py deleted file mode 100644 index 26e5956..0000000 --- a/tests/_parallel/test_parallel_stream.py +++ /dev/null @@ -1,24 +0,0 @@ -from unittest import TestCase - -from pystreamapi import Stream -from pystreamapi.conditions import prime - - -class TestParallelStream(TestCase): - def test_parallel(self): - # success - (Stream.of(range(10)).parallel() - .map(lambda x: x * 2) - .for_each(print)) - - # success - (Stream.parallel_of(range(10)) - .map(lambda x: x * 2) - .filter(prime()) - .for_each(print)) - - # fail -> fixed - (Stream.of(range(10)).parallel() - .map(lambda x: x * 2) - .filter(prime()) - .for_each(print)) diff --git a/tests/_streams/test_stream_converter.py b/tests/_streams/test_stream_converter.py index 5af92d0..9d21b28 100644 --- a/tests/_streams/test_stream_converter.py +++ b/tests/_streams/test_stream_converter.py @@ -1,5 +1,7 @@ from unittest import TestCase +from parameterized import parameterized + from pystreamapi._streams.__parallel_stream import ParallelStream from pystreamapi._streams.__sequential_stream import SequentialStream from pystreamapi._streams.numeric.__parallel_numeric_stream import ParallelNumericStream @@ -51,3 +53,10 @@ def test_convert_to_sequential_stream_parallel(self): def test_convert_to_sequential_stream_parallel_numeric(self): stream = ParallelNumericStream(["1", "2", "3"]).sequential() self.assertIsInstance(stream, SequentialNumericStream) + + @parameterized.expand([("sequential stream", SequentialStream), + ("sequential numeric stream", SequentialNumericStream)]) + def test_convert_sequential_to_parallel_parallelizer_working(self, _, stream): + res = [] + stream([1, 2, 3]).parallel().filter(lambda x: x > 1).for_each(lambda x: res.append(x)) + self.assertEqual(res, [2, 3]) From 342a96260f466a2e3c59b168fcf4c81b33b32e53 Mon Sep 17 00:00:00 2001 From: Stefan Garlonta Date: Fri, 23 Feb 2024 21:25:24 +0100 Subject: [PATCH 5/6] :rotating_light: Fix pylint warnings --- pystreamapi/__stream_converter.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pystreamapi/__stream_converter.py b/pystreamapi/__stream_converter.py index 12b4a1c..b907d38 100644 --- a/pystreamapi/__stream_converter.py +++ b/pystreamapi/__stream_converter.py @@ -1,3 +1,4 @@ +# pylint: disable=protected-access from pystreamapi._streams.__base_stream import BaseStream from pystreamapi._streams.__parallel_stream import ParallelStream from pystreamapi._streams.__sequential_stream import SequentialStream From 30d8a50bc385391509427427f34ed6048bd01f48 Mon Sep 17 00:00:00 2001 From: Stefan Garlonta Date: Fri, 23 Feb 2024 21:29:03 +0100 Subject: [PATCH 6/6] :rotating_light: Fix deepsource warning --- tests/_streams/test_stream_converter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/_streams/test_stream_converter.py b/tests/_streams/test_stream_converter.py index 9d21b28..a2b81d6 100644 --- a/tests/_streams/test_stream_converter.py +++ b/tests/_streams/test_stream_converter.py @@ -58,5 +58,5 @@ def test_convert_to_sequential_stream_parallel_numeric(self): ("sequential numeric stream", SequentialNumericStream)]) def test_convert_sequential_to_parallel_parallelizer_working(self, _, stream): res = [] - stream([1, 2, 3]).parallel().filter(lambda x: x > 1).for_each(lambda x: res.append(x)) + stream([1, 2, 3]).parallel().filter(lambda x: x > 1).for_each(res.append) self.assertEqual(res, [2, 3])