diff --git a/streamable/functions.py b/streamable/functions.py index f9795ca..d66f69f 100644 --- a/streamable/functions.py +++ b/streamable/functions.py @@ -72,13 +72,13 @@ def catch( def distinct( iterator: Iterator[T], - by: Optional[Callable[[T], Any]] = None, + key: Optional[Callable[[T], Any]] = None, consecutive_only: bool = False, ) -> Iterator[T]: validate_iterator(iterator) if consecutive_only: - return ConsecutiveDistinctIterator(iterator, by) - return DistinctIterator(iterator, by) + return ConsecutiveDistinctIterator(iterator, key) + return DistinctIterator(iterator, key) def flatten(iterator: Iterator[Iterable[T]], concurrency: int = 1) -> Iterator[T]: @@ -110,14 +110,14 @@ def group( def groupby( iterator: Iterator[T], - by: Callable[[T], U], + key: Callable[[T], U], size: Optional[int] = None, interval: Optional[datetime.timedelta] = None, ) -> Iterator[Tuple[U, List[T]]]: validate_iterator(iterator) validate_group_size(size) validate_group_interval(interval) - return GroupbyIterator(iterator, by, size, interval) + return GroupbyIterator(iterator, key, size, interval) def map( diff --git a/streamable/iterators.py b/streamable/iterators.py index aa3cbb1..aa8def2 100644 --- a/streamable/iterators.py +++ b/streamable/iterators.py @@ -96,36 +96,36 @@ def __next__(self) -> T: class DistinctIterator(Iterator[T]): - def __init__(self, iterator: Iterator[T], by: Optional[Callable[[T], Any]]) -> None: + def __init__(self, iterator: Iterator[T], key: Optional[Callable[[T], Any]]) -> None: validate_iterator(iterator) self.iterator = iterator - self.by = wrap_error(by, StopIteration) if by else None + self.key = wrap_error(key, StopIteration) if key else None self._already_seen: Set[Any] = set() def __next__(self) -> T: while True: elem = next(self.iterator) - value = self.by(elem) if self.by else elem - if value not in self._already_seen: + key = self.key(elem) if self.key else elem + if key not in self._already_seen: break - self._already_seen.add(value) + self._already_seen.add(key) return elem class ConsecutiveDistinctIterator(Iterator[T]): - def __init__(self, iterator: Iterator[T], by: Optional[Callable[[T], Any]]) -> None: + def __init__(self, iterator: Iterator[T], key: Optional[Callable[[T], Any]]) -> None: validate_iterator(iterator) self.iterator = iterator - self.by = wrap_error(by, StopIteration) if by else None - self._last_value: Any = object() + self.key = wrap_error(key, StopIteration) if key else None + self._last_key: Any = object() def __next__(self) -> T: while True: elem = next(self.iterator) - value = self.by(elem) if self.by else elem - if value != self._last_value: + key = self.key(elem) if self.key else elem + if key != self._last_key: break - self._last_value = value + self._last_key = key return elem @@ -210,18 +210,18 @@ class GroupbyIterator(_GroupIteratorMixin[T], Iterator[Tuple[U, List[T]]]): def __init__( self, iterator: Iterator[T], - by: Callable[[T], U], + key: Callable[[T], U], size: Optional[int], interval: Optional[datetime.timedelta], ) -> None: super().__init__(iterator, size, interval) - self.by = wrap_error(by, StopIteration) + self.key = wrap_error(key, StopIteration) self._is_exhausted = False self._groups_by: DefaultDict[U, List[T]] = defaultdict(list) def _group_next_elem(self) -> None: elem = next(self.iterator) - self._groups_by[self.by(elem)].append(elem) + self._groups_by[self.key(elem)].append(elem) def _pop_full_group(self) -> Optional[Tuple[U, List[T]]]: for key, group in self._groups_by.items(): diff --git a/streamable/stream.py b/streamable/stream.py index de51be0..3951559 100644 --- a/streamable/stream.py +++ b/streamable/stream.py @@ -164,11 +164,12 @@ def display(self, level: int = logging.INFO) -> "Stream[T]": return self def distinct( - self, by: Optional[Callable[[T], Any]] = None, consecutive_only: bool = False + self, key: Optional[Callable[[T], Any]] = None, consecutive_only: bool = False ) -> "Stream": """ - Filters the stream to yield only distinct elements, `foo` and `bar` considered duplicates if `hash(foo) == hash(bar)`. - If `by` is specified, `foo` and `bar` are considered duplicates if `hash(by(foo)) == hash(by(bar))`. + Filters the stream to yield only distinct elements. + If a deduplication `key` is specified, `foo` and `bar` are treated as duplicates when `key(foo) == key(bar)`. + Among duplicates, the first encountered occurence in upstream order is yielded. @@ -177,13 +178,13 @@ def distinct( Alternatively, remove only consecutive duplicates without memory footprint by setting `consecutive_only=True`. Args: - by (Callable[[T], Any], optional): Elements are deduplicated based on the value returned by `by(elem)`. (by default: the deduplication is performed on the elements themselves) + key (Callable[[T], Any], optional): Elements are deduplicated based on `key(elem)`. (by default: the deduplication is performed on the elements themselves) consecutive_only (bool, optional): Whether to deduplicate only consecutive duplicates, or globally. (by default: the deduplication is global) Returns: Stream: A stream containing only unique upstream elements. """ - return DistinctStream(self, by, consecutive_only) + return DistinctStream(self, key, consecutive_only) def filter(self, when: Callable[[T], Any] = bool) -> "Stream[T]": """ @@ -324,17 +325,19 @@ def group( by: Optional[Callable[[T], Any]] = None, ) -> "Stream[List[T]]": """ - Yields upstream elements grouped into lists. - A group is a list of `size` elements for which `by` returns the same value, but it may contain fewer elements in these cases: - - `interval` have passed since the last yield of a group - - upstream is exhausted - - upstream raises an exception + Groups upstream elements into lists. + + A group is yielded if one of the following conditions is met: + - The group reaches `size` elements. + - `interval` seconds have passed since the last group was yielded. + - The upstream source is exhausted. + + If `by` is specified, a group will contain only elements for which `by` returns the same value (see `.groupby` for `(key, elements)` pairs). Args: - size (Optional[int], optional): Maximum number of elements per group. (by default: no limit on the size of the group) + size (Optional[int], optional): The maximum number of elements per group (default: no size limit). interval (float, optional): Yields a group if `interval` seconds have passed since the last group was yielded. (by default: no limit on the time interval between yields) - by (Optional[Callable[[T], Any]], optional): If specified, a group will only contain elements for which this function returns the same value. (by default: does not cogroup) - + by (Optional[Callable[[T], Any]], optional): If specified, determines the group key. A group will only contain elements that share the same `by(elem)` value. (Default: does not co-group elements.) Returns: Stream[List[T]]: A stream of upstream elements grouped into lists. """ @@ -344,27 +347,27 @@ def group( def groupby( self, - by: Callable[[T], U], + key: Callable[[T], U], size: Optional[int] = None, interval: Optional[datetime.timedelta] = None, ) -> "Stream[Tuple[U, List[T]]]": """ - Yields elements grouped by key as `(key, elements)` tuples. - Key is returned by `by(elem)`. - The group will contain `size` elements, but it may contain fewer elements in these cases: - - `interval` have passed since the last yield of a group - - upstream is exhausted - - upstream raises an exception + Groups upstream elements into `(key, elements)` tuples. + + A group is yielded if one of the following conditions is met: + - A group reaches `size` elements. + - `interval` seconds have passed since the last group was yielded. + - The upstream source is exhausted. Args: - by (Callable[[T], Any]): Function returning the group's key. - size (Optional[int], optional): Maximum number of elements per group. (by default: no limit on the size of the group) - interval (float, optional): Yields a group if `interval` seconds have passed since the last group was yielded. (by default: no limit on the time interval between yields) + key (Callable[[T], U]): A function that returns the group key for an element. + size (Optional[int], optional): The maximum number of elements per group (default: no size limit). + interval (Optional[datetime.timedelta], optional): If specified, yields a group if `interval` seconds have passed since the last group was yielded (default: no time interval limit). Returns: Stream[Tuple[U, List[T]]]: A stream of upstream elements grouped by key, as `(key, elements)` tuples. """ - return GroupbyStream(self, by, size, interval) + return GroupbyStream(self, key, size, interval) def map( self, @@ -528,11 +531,11 @@ class DistinctStream(DownStream[T, T]): def __init__( self, upstream: Stream[T], - by: Optional[Callable[[T], Any]], + key: Optional[Callable[[T], Any]], consecutive_only: bool, ) -> None: super().__init__(upstream) - self._by = by + self._key = key self._consecutive_only = consecutive_only def accept(self, visitor: "Visitor[V]") -> V: @@ -614,12 +617,12 @@ class GroupbyStream(DownStream[T, Tuple[U, List[T]]]): def __init__( self, upstream: Stream[T], - by: Callable[[T], U], + key: Callable[[T], U], size: Optional[int], interval: Optional[datetime.timedelta], ) -> None: super().__init__(upstream) - self._by = by + self._key = key self._size = size self._interval = interval diff --git a/streamable/visitors/iterator.py b/streamable/visitors/iterator.py index b5353e5..8fa3892 100644 --- a/streamable/visitors/iterator.py +++ b/streamable/visitors/iterator.py @@ -38,7 +38,7 @@ def visit_catch_stream(self, stream: CatchStream[T]) -> Iterator[T]: def visit_distinct_stream(self, stream: DistinctStream[T]) -> Iterator[T]: return functions.distinct( stream.upstream.accept(self), - stream._by, + stream._key, stream._consecutive_only, ) @@ -91,7 +91,7 @@ def visit_groupby_stream(self, stream: GroupbyStream[U, T]) -> Iterator[T]: Iterator[T], functions.groupby( stream.upstream.accept(IteratorVisitor[U]()), - stream._by, + stream._key, stream._size, stream._interval, ), diff --git a/streamable/visitors/representation.py b/streamable/visitors/representation.py index 651c81f..a5ff8d1 100644 --- a/streamable/visitors/representation.py +++ b/streamable/visitors/representation.py @@ -47,7 +47,7 @@ def visit_catch_stream(self, stream: CatchStream[T]) -> str: def visit_distinct_stream(self, stream: DistinctStream[T]) -> str: self.methods_reprs.append( - f"distinct({self.to_string(stream._by)}, consecutive_only={self.to_string(stream._consecutive_only)})" + f"distinct({self.to_string(stream._key)}, consecutive_only={self.to_string(stream._consecutive_only)})" ) return stream.upstream.accept(self) @@ -82,7 +82,7 @@ def visit_group_stream(self, stream: GroupStream[U]) -> str: def visit_groupby_stream(self, stream: GroupbyStream[U, T]) -> str: self.methods_reprs.append( - f"groupby({self.to_string(stream._by)}, size={self.to_string(stream._size)}, interval={self.to_string(stream._interval)})" + f"groupby({self.to_string(stream._key)}, size={self.to_string(stream._size)}, interval={self.to_string(stream._interval)})" ) return stream.upstream.accept(self)