Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 48 additions & 18 deletions hydro_lang/src/live_collections/keyed_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1176,7 +1176,6 @@ pub enum Generate<T> {

impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
where
K: Eq + Hash,
L: Location<'a>,
{
/// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
Expand Down Expand Up @@ -1220,7 +1219,7 @@ where
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
where
K: Clone,
K: Clone + Eq + Hash,
I: Fn() -> A + 'a,
F: Fn(&mut A, V) -> Option<U> + 'a,
{
Expand Down Expand Up @@ -1291,7 +1290,7 @@ where
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
where
K: Clone,
K: Clone + Eq + Hash,
I: Fn() -> A + 'a,
F: Fn(&mut A, V) -> Generate<U> + 'a,
{
Expand Down Expand Up @@ -1393,7 +1392,7 @@ where
f: impl IntoQuotedMut<'a, F, L> + Copy,
) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
where
K: Clone,
K: Clone + Eq + Hash,
I: Fn() -> A + 'a,
F: Fn(&mut A, V) -> bool + 'a,
{
Expand Down Expand Up @@ -1452,7 +1451,7 @@ where
/// ```
pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
where
K: Clone,
K: Clone + Eq + Hash,
{
self.fold_early_stop(
q!(|| None),
Expand Down Expand Up @@ -1498,7 +1497,10 @@ where
self,
init: impl IntoQuotedMut<'a, I, L>,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
where
K: Eq + Hash,
{
let init = init.splice_fn0_ctx(&self.location).into();
let comb = comb.splice_fn2_borrow_mut_ctx(&self.location).into();

Expand Down Expand Up @@ -1547,7 +1549,10 @@ where
pub fn reduce<F: Fn(&mut V, V) + 'a>(
self,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
where
K: Eq + Hash,
{
let f = comb.splice_fn2_borrow_mut_ctx(&self.location).into();

KeyedSingleton::new(
Expand Down Expand Up @@ -1598,6 +1603,7 @@ where
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
where
K: Eq + Hash,
O: Clone,
F: Fn(&mut V, V) + 'a,
{
Expand All @@ -1624,7 +1630,6 @@ where

impl<'a, K, V, L, B: Boundedness, O: Ordering> KeyedStream<K, V, L, B, O, ExactlyOnce>
where
K: Eq + Hash,
L: Location<'a>,
{
/// Like [`Stream::fold_commutative`], aggregates the values in each group via the `comb` closure.
Expand Down Expand Up @@ -1660,7 +1665,10 @@ where
self,
init: impl IntoQuotedMut<'a, I, L>,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
where
K: Eq + Hash,
{
self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
.fold(init, comb)
}
Expand Down Expand Up @@ -1696,7 +1704,10 @@ where
pub fn reduce_commutative<F: Fn(&mut V, V) + 'a>(
self,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
where
K: Eq + Hash,
{
self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
.reduce(comb)
}
Expand Down Expand Up @@ -1733,6 +1744,7 @@ where
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
where
K: Eq + Hash,
O2: Clone,
F: Fn(&mut V, V) + 'a,
{
Expand Down Expand Up @@ -1764,7 +1776,10 @@ where
/// # }));
/// # }
/// ```
pub fn value_counts(self) -> KeyedSingleton<K, usize, L, B::WhenValueUnbounded> {
pub fn value_counts(self) -> KeyedSingleton<K, usize, L, B::WhenValueUnbounded>
where
K: Eq + Hash,
{
self.assume_ordering_trusted(
nondet!(/** ordering within each group affects neither result nor intermediates */),
)
Expand All @@ -1774,7 +1789,6 @@ where

impl<'a, K, V, L, B: Boundedness, R: Retries> KeyedStream<K, V, L, B, TotalOrder, R>
where
K: Eq + Hash,
L: Location<'a>,
{
/// Like [`Stream::fold_idempotent`], aggregates the values in each group via the `comb` closure.
Expand Down Expand Up @@ -1810,7 +1824,10 @@ where
self,
init: impl IntoQuotedMut<'a, I, L>,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
where
K: Eq + Hash,
{
self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
.fold(init, comb)
}
Expand Down Expand Up @@ -1846,7 +1863,10 @@ where
pub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>(
self,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
where
K: Eq + Hash,
{
self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
.reduce(comb)
}
Expand Down Expand Up @@ -1883,6 +1903,7 @@ where
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
where
K: Eq + Hash,
O2: Clone,
F: Fn(&mut V, V) + 'a,
{
Expand All @@ -1893,7 +1914,6 @@ where

impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
where
K: Eq + Hash,
L: Location<'a>,
{
/// Like [`Stream::fold_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
Expand Down Expand Up @@ -1930,7 +1950,10 @@ where
self,
init: impl IntoQuotedMut<'a, I, L>,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded>
where
K: Eq + Hash,
{
self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
.fold(init, comb)
Expand Down Expand Up @@ -1968,7 +1991,10 @@ where
pub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>(
self,
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
where
K: Eq + Hash,
{
self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
.reduce(comb)
Expand Down Expand Up @@ -2007,6 +2033,7 @@ where
comb: impl IntoQuotedMut<'a, F, L>,
) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
where
K: Eq + Hash,
O2: Clone,
F: Fn(&mut V, V) + 'a,
{
Expand Down Expand Up @@ -2045,7 +2072,10 @@ where
pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
self,
other: Stream<K, L, Bounded, O2, R2>,
) -> Self {
) -> Self
where
K: Eq + Hash,
{
check_matching_location(&self.location, &other.location);

KeyedStream::new(
Expand Down
Loading