Skip to content

Commit

Permalink
🚨 Fix issues in new stream implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
garlontas committed Nov 25, 2024
1 parent 6f4fdb1 commit 67bc4a1
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 14 deletions.
19 changes: 10 additions & 9 deletions pystreamapi/_itertools/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ def reduce(function, sequence, initial=_initial_missing, handler: ErrorHandler =
try:
value = next(it)
except StopIteration:
raise TypeError(
"reduce() of empty iterable with no initial value") from None
raise TypeError("reduce() of empty iterable with no initial value") from None
else:
value = initial

Expand All @@ -56,31 +55,33 @@ def reduce(function, sequence, initial=_initial_missing, handler: ErrorHandler =


def peek(iterable: Iterable, mapper):
"""Generator wrapper that applies a function to every item of the iterable
and yields the item unchanged."""
for item in iterable:
mapper(item)
yield item

def distinct(iterable):

def distinct(iterable: Iterable):
"""Generator wrapper that returns unique elements from the iterable."""
seen = set()
for item in iterable:
if item not in seen:
seen.add(item)
yield item


def limit(source: Iterable, max_nr: int):
"""Generator wrapper that returns the first n elements of the iterable."""
iterator = iter(source)
for _ in range(max_nr):
try:
yield next(iterator)
except StopIteration:
break

def any_match(iterable):
for item in iterable:
if item:
return True
return False

def flat_map(iterable):
def flat_map(iterable: Iterable):
"""Generator wrapper that flattens the Stream iterable."""
for stream in iterable:
yield from stream.to_list()
5 changes: 3 additions & 2 deletions pystreamapi/_streams/__base_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from typing import Iterable, Callable, Any, TypeVar, Iterator, TYPE_CHECKING, Union

from pystreamapi.__optional import Optional
from pystreamapi._itertools.tools import dropwhile, distinct, limit, any_match
from pystreamapi._itertools.tools import dropwhile, distinct, limit
from pystreamapi._lazy.process import Process
from pystreamapi._lazy.queue import ProcessQueue
from pystreamapi._streams.error.__error import ErrorHandler
Expand Down Expand Up @@ -369,11 +369,12 @@ def any_match(self, predicate: Callable[[K], bool]):
:param predicate: The callable predicate
"""
def _one_wrapper(iterable, mapper):
"""Generator wrapper for any_match."""
for i in iterable:
yield self._one(mapper, item=i)

self._source = _one_wrapper(self._source, predicate)
return any_match(self._source)
return any(self._source)

@terminal
def count(self):
Expand Down
7 changes: 4 additions & 3 deletions pystreamapi/_streams/__parallel_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ def _filter(self, predicate: Callable[[Any], bool]):

@terminal
def find_any(self):
if len(self._source) > 0:
return Optional.of(self._source[0])
return Optional.empty()
try:
return Optional.of(next(iter(self._source)))
except StopIteration:
return Optional.empty()

def _flat_map(self, mapper: Callable[[Any], stream.BaseStream]):
new_src = []
Expand Down

0 comments on commit 67bc4a1

Please sign in to comment.