Skip to content

Commit

Permalink
.distinct/.groupby: rename by-> key
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Jan 12, 2025
1 parent 994d0bd commit a0db78f
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 52 deletions.
10 changes: 5 additions & 5 deletions streamable/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ def catch(

def distinct(
iterator: Iterator[T],
by: Optional[Callable[[T], Any]] = None,
key: Optional[Callable[[T], Any]] = None,
consecutive_only: bool = False,
) -> Iterator[T]:
validate_iterator(iterator)
if consecutive_only:
return ConsecutiveDistinctIterator(iterator, by)
return DistinctIterator(iterator, by)
return ConsecutiveDistinctIterator(iterator, key)
return DistinctIterator(iterator, key)


def flatten(iterator: Iterator[Iterable[T]], concurrency: int = 1) -> Iterator[T]:
Expand Down Expand Up @@ -110,14 +110,14 @@ def group(

def groupby(
iterator: Iterator[T],
by: Callable[[T], U],
key: Callable[[T], U],
size: Optional[int] = None,
interval: Optional[datetime.timedelta] = None,
) -> Iterator[Tuple[U, List[T]]]:
validate_iterator(iterator)
validate_group_size(size)
validate_group_interval(interval)
return GroupbyIterator(iterator, by, size, interval)
return GroupbyIterator(iterator, key, size, interval)


def map(
Expand Down
28 changes: 14 additions & 14 deletions streamable/iterators.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,36 +96,36 @@ def __next__(self) -> T:


class DistinctIterator(Iterator[T]):
def __init__(self, iterator: Iterator[T], by: Optional[Callable[[T], Any]]) -> None:
def __init__(self, iterator: Iterator[T], key: Optional[Callable[[T], Any]]) -> None:
validate_iterator(iterator)
self.iterator = iterator
self.by = wrap_error(by, StopIteration) if by else None
self.key = wrap_error(key, StopIteration) if key else None
self._already_seen: Set[Any] = set()

def __next__(self) -> T:
while True:
elem = next(self.iterator)
value = self.by(elem) if self.by else elem
if value not in self._already_seen:
key = self.key(elem) if self.key else elem
if key not in self._already_seen:
break
self._already_seen.add(value)
self._already_seen.add(key)
return elem


class ConsecutiveDistinctIterator(Iterator[T]):
def __init__(self, iterator: Iterator[T], by: Optional[Callable[[T], Any]]) -> None:
def __init__(self, iterator: Iterator[T], key: Optional[Callable[[T], Any]]) -> None:
validate_iterator(iterator)
self.iterator = iterator
self.by = wrap_error(by, StopIteration) if by else None
self._last_value: Any = object()
self.key = wrap_error(key, StopIteration) if key else None
self._last_key: Any = object()

def __next__(self) -> T:
while True:
elem = next(self.iterator)
value = self.by(elem) if self.by else elem
if value != self._last_value:
key = self.key(elem) if self.key else elem
if key != self._last_key:
break
self._last_value = value
self._last_key = key
return elem


Expand Down Expand Up @@ -210,18 +210,18 @@ class GroupbyIterator(_GroupIteratorMixin[T], Iterator[Tuple[U, List[T]]]):
def __init__(
self,
iterator: Iterator[T],
by: Callable[[T], U],
key: Callable[[T], U],
size: Optional[int],
interval: Optional[datetime.timedelta],
) -> None:
super().__init__(iterator, size, interval)
self.by = wrap_error(by, StopIteration)
self.key = wrap_error(key, StopIteration)
self._is_exhausted = False
self._groups_by: DefaultDict[U, List[T]] = defaultdict(list)

def _group_next_elem(self) -> None:
elem = next(self.iterator)
self._groups_by[self.by(elem)].append(elem)
self._groups_by[self.key(elem)].append(elem)

def _pop_full_group(self) -> Optional[Tuple[U, List[T]]]:
for key, group in self._groups_by.items():
Expand Down
61 changes: 32 additions & 29 deletions streamable/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,12 @@ def display(self, level: int = logging.INFO) -> "Stream[T]":
return self

def distinct(
self, by: Optional[Callable[[T], Any]] = None, consecutive_only: bool = False
self, key: Optional[Callable[[T], Any]] = None, consecutive_only: bool = False
) -> "Stream":
"""
Filters the stream to yield only distinct elements, `foo` and `bar` considered duplicates if `hash(foo) == hash(bar)`.
If `by` is specified, `foo` and `bar` are considered duplicates if `hash(by(foo)) == hash(by(bar))`.
Filters the stream to yield only distinct elements.
If a deduplication `key` is specified, `foo` and `bar` are treated as duplicates when `key(foo) == key(bar)`.
Among duplicates, the first encountered occurence in upstream order is yielded.
Expand All @@ -177,17 +178,17 @@ def distinct(
Alternatively, remove only consecutive duplicates without memory footprint by setting `consecutive_only=True`.
Args:
by (Callable[[T], Any], optional): Elements are deduplicated based on the value returned by `by(elem)`. (by default: the deduplication is performed on the elements themselves)
key (Callable[[T], Any], optional): Elements are deduplicated based on `key(elem)`. (by default: the deduplication is performed on the elements themselves)
consecutive_only (bool, optional): Whether to deduplicate only consecutive duplicates, or globally. (by default: the deduplication is global)
Returns:
Stream: A stream containing only unique upstream elements.
"""
return DistinctStream(self, by, consecutive_only)
return DistinctStream(self, key, consecutive_only)

def filter(self, when: Callable[[T], Any] = bool) -> "Stream[T]":
"""
Yields only upstream elements satisfying the `when` predicate.
Filters the stream to yield only elements satisfying the `when` predicate.
Args:
when (Callable[[T], Any], optional): An element is kept when `when(elem)` is truthy. (by default: keeps all truthy elements)
Expand Down Expand Up @@ -324,17 +325,19 @@ def group(
by: Optional[Callable[[T], Any]] = None,
) -> "Stream[List[T]]":
"""
Yields upstream elements grouped into lists.
A group is a list of `size` elements for which `by` returns the same value, but it may contain fewer elements in these cases:
- `interval` have passed since the last yield of a group
- upstream is exhausted
- upstream raises an exception
Groups upstream elements into lists.
A group is yielded when any of the following conditions is met:
- The group reaches `size` elements.
- `interval` seconds have passed since the last group was yielded.
- The upstream source is exhausted.
If `by` is specified, groups will only contain elements sharing the same `by(elem)` value (see `.groupby` for `(key, elements)` pairs).
Args:
size (Optional[int], optional): Maximum number of elements per group. (by default: no limit on the size of the group)
size (Optional[int], optional): The maximum number of elements per group (default: no size limit).
interval (float, optional): Yields a group if `interval` seconds have passed since the last group was yielded. (by default: no limit on the time interval between yields)
by (Optional[Callable[[T], Any]], optional): If specified, a group will only contain elements for which this function returns the same value. (by default: does not cogroup)
by (Optional[Callable[[T], Any]], optional): If specified, groups will only contain elements sharing the same `by(elem)` value. (Default: does not co-group elements.)
Returns:
Stream[List[T]]: A stream of upstream elements grouped into lists.
"""
Expand All @@ -344,27 +347,27 @@ def group(

def groupby(
self,
by: Callable[[T], U],
key: Callable[[T], U],
size: Optional[int] = None,
interval: Optional[datetime.timedelta] = None,
) -> "Stream[Tuple[U, List[T]]]":
"""
Yields elements grouped by key as `(key, elements)` tuples.
Key is returned by `by(elem)`.
The group will contain `size` elements, but it may contain fewer elements in these cases:
- `interval` have passed since the last yield of a group
- upstream is exhausted
- upstream raises an exception
Groups upstream elements into `(key, elements)` tuples.
A group is yielded when any of the following conditions is met:
- A group reaches `size` elements.
- `interval` seconds have passed since the last group was yielded.
- The upstream source is exhausted.
Args:
by (Callable[[T], Any]): Function returning the group's key.
size (Optional[int], optional): Maximum number of elements per group. (by default: no limit on the size of the group)
interval (float, optional): Yields a group if `interval` seconds have passed since the last group was yielded. (by default: no limit on the time interval between yields)
key (Callable[[T], U]): A function that returns the group key for an element.
size (Optional[int], optional): The maximum number of elements per group (default: no size limit).
interval (Optional[datetime.timedelta], optional): If specified, yields a group if `interval` seconds have passed since the last group was yielded (default: no time interval limit).
Returns:
Stream[Tuple[U, List[T]]]: A stream of upstream elements grouped by key, as `(key, elements)` tuples.
"""
return GroupbyStream(self, by, size, interval)
return GroupbyStream(self, key, size, interval)

def map(
self,
Expand Down Expand Up @@ -528,11 +531,11 @@ class DistinctStream(DownStream[T, T]):
def __init__(
self,
upstream: Stream[T],
by: Optional[Callable[[T], Any]],
key: Optional[Callable[[T], Any]],
consecutive_only: bool,
) -> None:
super().__init__(upstream)
self._by = by
self._key = key
self._consecutive_only = consecutive_only

def accept(self, visitor: "Visitor[V]") -> V:
Expand Down Expand Up @@ -614,12 +617,12 @@ class GroupbyStream(DownStream[T, Tuple[U, List[T]]]):
def __init__(
self,
upstream: Stream[T],
by: Callable[[T], U],
key: Callable[[T], U],
size: Optional[int],
interval: Optional[datetime.timedelta],
) -> None:
super().__init__(upstream)
self._by = by
self._key = key
self._size = size
self._interval = interval

Expand Down
4 changes: 2 additions & 2 deletions streamable/visitors/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def visit_catch_stream(self, stream: CatchStream[T]) -> Iterator[T]:
def visit_distinct_stream(self, stream: DistinctStream[T]) -> Iterator[T]:
return functions.distinct(
stream.upstream.accept(self),
stream._by,
stream._key,
stream._consecutive_only,
)

Expand Down Expand Up @@ -91,7 +91,7 @@ def visit_groupby_stream(self, stream: GroupbyStream[U, T]) -> Iterator[T]:
Iterator[T],
functions.groupby(
stream.upstream.accept(IteratorVisitor[U]()),
stream._by,
stream._key,
stream._size,
stream._interval,
),
Expand Down
4 changes: 2 additions & 2 deletions streamable/visitors/representation.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def visit_catch_stream(self, stream: CatchStream[T]) -> str:

def visit_distinct_stream(self, stream: DistinctStream[T]) -> str:
self.methods_reprs.append(
f"distinct({self.to_string(stream._by)}, consecutive_only={self.to_string(stream._consecutive_only)})"
f"distinct({self.to_string(stream._key)}, consecutive_only={self.to_string(stream._consecutive_only)})"
)
return stream.upstream.accept(self)

Expand Down Expand Up @@ -82,7 +82,7 @@ def visit_group_stream(self, stream: GroupStream[U]) -> str:

def visit_groupby_stream(self, stream: GroupbyStream[U, T]) -> str:
self.methods_reprs.append(
f"groupby({self.to_string(stream._by)}, size={self.to_string(stream._size)}, interval={self.to_string(stream._interval)})"
f"groupby({self.to_string(stream._key)}, size={self.to_string(stream._size)}, interval={self.to_string(stream._interval)})"
)
return stream.upstream.accept(self)

Expand Down

0 comments on commit a0db78f

Please sign in to comment.