Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,8 @@
# Minimum lines number of a similarity.
min-similarity-lines=-1

[tests/*.py]
disable=missing-function-docstring,no-member,missing-class-docstring,too-few-public-methods,too-many-public-methods,cyclic-import,import-error

[MESSAGES CONTROL]
disable=missing-function-docstring,missing-class-docstring,missing-module-docstring,import-error,too-few-public-methods,invalid-name,no-member,too-many-public-methods
disable=invalid-name,missing-module-docstring
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "streams.py"
version = "0.3.1"
version = "0.3.2"
authors = ["Stefan Garlonta <stefan@pickwicksoft.org>"]
description = "A stream library for Python inspired by Java Stream API"
keywords = ["streams", "parallel", "data"]
Expand Down
2 changes: 1 addition & 1 deletion pystreamapi/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pystreamapi.__stream import Stream
from pystreamapi._streams.error.__levels import ErrorLevel

__version__ = "0.3.1"
__version__ = "0.3.2"
__all__ = ["Stream", "ErrorLevel"]
37 changes: 37 additions & 0 deletions pystreamapi/__stream_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from pystreamapi._streams.__base_stream import BaseStream
from pystreamapi._streams.__parallel_stream import ParallelStream
from pystreamapi._streams.__sequential_stream import SequentialStream
from pystreamapi._streams.numeric.__numeric_base_stream import NumericBaseStream
from pystreamapi._streams.numeric.__parallel_numeric_stream import ParallelNumericStream
from pystreamapi._streams.numeric.__sequential_numeric_stream import SequentialNumericStream


class StreamConverter:
"""Class for converting streams to other types of streams."""

@staticmethod
def to_numeric_stream(stream: BaseStream) -> NumericBaseStream:
"""Converts a stream to a numeric stream."""
if isinstance(stream, SequentialStream):
stream.__class__ = SequentialNumericStream
if isinstance(stream, ParallelStream):
stream.__class__ = ParallelNumericStream
return stream

@staticmethod
def to_parallel_stream(stream: BaseStream) -> ParallelStream:
"""Converts a stream to a parallel stream."""
if isinstance(stream, SequentialNumericStream):
stream.__class__ = ParallelNumericStream
elif isinstance(stream, SequentialStream):
stream.__class__ = ParallelStream
return stream

@staticmethod
def to_sequential_stream(stream: BaseStream) -> SequentialStream:
"""Converts a stream to a sequential stream."""
if isinstance(stream, ParallelNumericStream):
stream.__class__ = SequentialNumericStream
elif isinstance(stream, ParallelStream):
stream.__class__ = SequentialStream
return stream
22 changes: 20 additions & 2 deletions pystreamapi/_streams/__base_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

if TYPE_CHECKING:
from pystreamapi._streams.numeric.__numeric_base_stream import NumericBaseStream
from pystreamapi._streams.__parallel_stream import ParallelStream
from pystreamapi._streams.__sequential_stream import SequentialStream

K = TypeVar('K')
_V = TypeVar('_V')
Expand Down Expand Up @@ -237,6 +239,13 @@ def __map_to_str(self):
"""Converts the stream to strings."""
self._map(str)

@_operation
def parallel(self) -> 'ParallelStream[K]':
"""Returns a parallel stream. If the stream is already parallel, it is returned."""
# pylint: disable=import-outside-toplevel
from pystreamapi.__stream_converter import StreamConverter
return StreamConverter.to_parallel_stream(self)

@_operation
def peek(self, action: Callable) -> 'BaseStream[K]':
"""
Expand Down Expand Up @@ -269,6 +278,13 @@ def __reversed(self):
except TypeError:
self._source = reversed(list(self._source))

@_operation
def sequential(self) -> SequentialStream[K]:
"""Returns a sequential stream. If the stream is already sequential, it is returned."""
# pylint: disable=import-outside-toplevel
from pystreamapi.__stream_converter import StreamConverter
return StreamConverter.to_sequential_stream(self)

@_operation
def skip(self, n: int) -> 'BaseStream[K]':
"""
Expand Down Expand Up @@ -430,6 +446,8 @@ def to_dict(self, key_mapper: Callable[[K], Any]) -> dict:
:param key_mapper:
"""

@abstractmethod
def _to_numeric_stream(self) -> NumericBaseStream:
"""Converts a stream to a numeric stream. To be implemented by subclasses."""
"""Converts a stream to a numeric stream using the stream converter"""
# pylint: disable=import-outside-toplevel
from pystreamapi.__stream_converter import StreamConverter
return StreamConverter.to_numeric_stream(self)
6 changes: 0 additions & 6 deletions pystreamapi/_streams/__parallel_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,3 @@ def _set_parallelizer_src(self):

def __mapper(self, mapper):
return lambda x: self._one(mapper=mapper, item=x)

def _to_numeric_stream(self):
# pylint: disable=import-outside-toplevel
from pystreamapi._streams.numeric.__parallel_numeric_stream import ParallelNumericStream
self.__class__ = ParallelNumericStream
return self
6 changes: 0 additions & 6 deletions pystreamapi/_streams/__sequential_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,3 @@ def reduce(self, predicate: Callable, identity=_identity_missing, depends_on_sta
@stream.terminal
def to_dict(self, key_mapper: Callable[[Any], Any]) -> dict:
return self._group_to_dict(key_mapper)

def _to_numeric_stream(self):
# pylint: disable=import-outside-toplevel
from pystreamapi._streams.numeric.__sequential_numeric_stream import SequentialNumericStream
self.__class__ = SequentialNumericStream
return self
53 changes: 53 additions & 0 deletions tests/test_stream_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from unittest import TestCase

from pystreamapi._streams.__parallel_stream import ParallelStream
from pystreamapi._streams.__sequential_stream import SequentialStream
from pystreamapi._streams.numeric.__parallel_numeric_stream import ParallelNumericStream
from pystreamapi._streams.numeric.__sequential_numeric_stream import SequentialNumericStream


class TestStreamConverter(TestCase):

def test_convert_to_numeric_stream_sequential(self):
stream = SequentialStream(["1", "2", "3"]).map_to_int()
self.assertIsInstance(stream, SequentialNumericStream)

def test_convert_to_numeric_stream_parallel(self):
stream = ParallelStream(["1", "2", "3"]).map_to_int()
self.assertIsInstance(stream, ParallelNumericStream)

def test_convert_to_numeric_stream_numeric_parallel(self):
stream = ParallelNumericStream(["1", "2", "3"]).map_to_int()
self.assertIsInstance(stream, ParallelNumericStream)

def test_convert_to_parallel_stream_sequential(self):
stream = SequentialStream(["1", "2", "3"]).parallel()
self.assertIsInstance(stream, ParallelStream)

def test_convert_to_parallel_stream_sequential_numeric(self):
stream = SequentialNumericStream(["1", "2", "3"]).parallel()
self.assertIsInstance(stream, ParallelNumericStream)

def test_convert_to_parallel_stream_parallel(self):
stream = ParallelStream(["1", "2", "3"]).parallel()
self.assertIsInstance(stream, ParallelStream)

def test_convert_to_parallel_stream_parallel_numeric(self):
stream = ParallelNumericStream(["1", "2", "3"]).parallel()
self.assertIsInstance(stream, ParallelNumericStream)

def test_convert_to_sequential_stream_sequential(self):
stream = SequentialStream(["1", "2", "3"]).sequential()
self.assertIsInstance(stream, SequentialStream)

def test_convert_to_sequential_stream_sequential_numeric(self):
stream = SequentialNumericStream(["1", "2", "3"]).sequential()
self.assertIsInstance(stream, SequentialNumericStream)

def test_convert_to_sequential_stream_parallel(self):
stream = ParallelStream(["1", "2", "3"]).sequential()
self.assertIsInstance(stream, SequentialStream)

def test_convert_to_sequential_stream_parallel_numeric(self):
stream = ParallelNumericStream(["1", "2", "3"]).sequential()
self.assertIsInstance(stream, SequentialNumericStream)