diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 5415afe98..c16fba8f0 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -216,8 +216,8 @@ where if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(key)) { while let Some(val2) = cursor.get_val(&storage) { cursor.map_times(&storage, |t, d| { - if comparison(t, initial) { - output_buffer.push((t.join(time), d.into_owned())) + if comparison(&t.into_owned(), initial) { + output_buffer.push((t.into_owned().join(time), d.into_owned())) } }); consolidate(&mut output_buffer); diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index 63d0ed451..92784ef2f 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -101,7 +101,7 @@ where while let Some(value) = cursor.get_val(&storage) { let mut count = Tr::Diff::zero(); cursor.map_times(&storage, |t, d| { - if t.less_equal(time) { count.plus_equals(&d.into_owned()); } + if t.into_owned().less_equal(time) { count.plus_equals(&d.into_owned()); } }); if !count.is_zero() { let (dout, rout) = output_func(prefix, diff, value, &count); diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs index 53a284c5a..5357dc931 100644 --- a/src/operators/arrange/agent.rs +++ b/src/operators/arrange/agent.rs @@ -46,6 +46,7 @@ where type Key<'a> = Tr::Key<'a>; type Val<'a> = Tr::Val<'a>; type Time = Tr::Time; + type TimeGat<'a> = Tr::TimeGat<'a>; type Diff = Tr::Diff; type DiffGat<'a> = Tr::DiffGat<'a>; diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index efe7053a3..22f22a00f 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -120,7 +120,7 @@ where -> Arranged, TraceEnterAt> where TInner: Refines+Lattice+Timestamp+Clone+'static, - F: FnMut(Tr::Key<'_>, Tr::Val<'_>, &G::Timestamp)->TInner+Clone+'static, + F: FnMut(Tr::Key<'_>, Tr::Val<'_>, &Tr::Time)->TInner+Clone+'static, P: FnMut(&TInner)->Tr::Time+Clone+'static, { let logic1 = logic.clone(); @@ -217,7 +217,7 @@ where while let Some(val) = cursor.get_val(batch) { for datum in logic(key, val) { cursor.map_times(batch, |time, diff| { - session.give((datum.clone(), time.clone(), diff.into_owned())); + session.give((datum.clone(), time.into_owned(), diff.into_owned())); }); } cursor.step_val(batch); diff --git a/src/operators/count.rs b/src/operators/count.rs index 06414bb5a..ccb007290 100644 --- a/src/operators/count.rs +++ b/src/operators/count.rs @@ -97,14 +97,14 @@ where if let Some(count) = count.as_ref() { if !count.is_zero() { - session.give(((key.into_owned(), count.clone()), time.clone(), R2::from(-1i8))); + session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(-1i8))); } } count.as_mut().map(|c| c.plus_equals(&diff.into_owned())); if count.is_none() { count = Some(diff.into_owned()); } if let Some(count) = count.as_ref() { if !count.is_zero() { - session.give(((key.into_owned(), count.clone()), time.clone(), R2::from(1i8))); + session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(1i8))); } } }); diff --git a/src/operators/join.rs b/src/operators/join.rs index 7db301fca..0ac93446b 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -677,8 +677,10 @@ where Ordering::Greater => batch.seek_key(batch_storage, trace.key(trace_storage)), Ordering::Equal => { - thinker.history1.edits.load(trace, trace_storage, |time| time.join(meet)); - thinker.history2.edits.load(batch, batch_storage, |time| time.clone()); + use crate::trace::cursor::IntoOwned; + + thinker.history1.edits.load(trace, trace_storage, |time| time.into_owned().join(meet)); + thinker.history2.edits.load(batch, batch_storage, |time| time.into_owned()); // populate `temp` with the results in the best way we know how. thinker.think(|v1,v2,t,r1,r2| { diff --git a/src/operators/mod.rs b/src/operators/mod.rs index 101ac09fd..fdde1f242 100644 --- a/src/operators/mod.rs +++ b/src/operators/mod.rs @@ -40,7 +40,7 @@ impl<'a, C: Cursor> EditList<'a, C> { /// Loads the contents of a cursor. fn load(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L) where - L: Fn(&C::Time)->C::Time, + L: Fn(C::TimeGat<'_>)->C::Time, { self.clear(); while cursor.val_valid(storage) { @@ -103,7 +103,7 @@ impl<'storage, C: Cursor> ValueHistory<'storage, C> { } fn load(&mut self, cursor: &mut C, storage: &'storage C::Storage, logic: L) where - L: Fn(&C::Time)->C::Time, + L: Fn(C::TimeGat<'_>)->C::Time, { self.edits.load(cursor, storage, logic); } @@ -119,7 +119,7 @@ impl<'storage, C: Cursor> ValueHistory<'storage, C> { logic: L ) -> HistoryReplay<'storage, 'history, C> where - L: Fn(&C::Time)->C::Time, + L: Fn(C::TimeGat<'_>)->C::Time, { self.clear(); cursor.seek_key(storage, key); diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 3ea6ba194..e294de769 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -755,7 +755,7 @@ mod history_replay { // loaded times by performing the lattice `join` with this value. // Load the batch contents. - let mut batch_replay = self.batch_history.replay_key(batch_cursor, batch_storage, key, |time| time.clone()); + let mut batch_replay = self.batch_history.replay_key(batch_cursor, batch_storage, key, |time| time.into_owned()); // We determine the meet of times we must reconsider (those from `batch` and `times`). This meet // can be used to advance other historical times, which may consolidate their representation. As @@ -791,16 +791,16 @@ mod history_replay { // Load the input and output histories. let mut input_replay = if let Some(meet) = meet.as_ref() { - self.input_history.replay_key(source_cursor, source_storage, key, |time| time.join(meet)) + self.input_history.replay_key(source_cursor, source_storage, key, |time| time.into_owned().join(meet)) } else { - self.input_history.replay_key(source_cursor, source_storage, key, |time| time.clone()) + self.input_history.replay_key(source_cursor, source_storage, key, |time| time.into_owned()) }; let mut output_replay = if let Some(meet) = meet.as_ref() { - self.output_history.replay_key(output_cursor, output_storage, key, |time| time.join(meet)) + self.output_history.replay_key(output_cursor, output_storage, key, |time| time.into_owned().join(meet)) } else { - self.output_history.replay_key(output_cursor, output_storage, key, |time| time.clone()) + self.output_history.replay_key(output_cursor, output_storage, key, |time| time.into_owned()) }; self.synth_times.clear(); diff --git a/src/operators/threshold.rs b/src/operators/threshold.rs index f36b554c6..6c8f1a8af 100644 --- a/src/operators/threshold.rs +++ b/src/operators/threshold.rs @@ -163,7 +163,7 @@ where if let Some(difference) = difference { if !difference.is_zero() { - session.give((key.clone(), time.clone(), difference)); + session.give((key.clone(), time.into_owned(), difference)); } } }); diff --git a/src/trace/cursor/cursor_list.rs b/src/trace/cursor/cursor_list.rs index 653c7da35..f396b67a0 100644 --- a/src/trace/cursor/cursor_list.rs +++ b/src/trace/cursor/cursor_list.rs @@ -88,6 +88,7 @@ impl Cursor for CursorList { type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = C::Time; + type TimeGat<'a> = C::TimeGat<'a>; type Diff = C::Diff; type DiffGat<'a> = C::DiffGat<'a>; @@ -114,7 +115,7 @@ impl Cursor for CursorList { self.cursors[self.min_val[0]].val(&storage[self.min_val[0]]) } #[inline] - fn map_times)>(&mut self, storage: &Vec, mut logic: L) { + fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Vec, mut logic: L) { for &index in self.min_val.iter() { self.cursors[index].map_times(&storage[index], |t,d| logic(t,d)); } diff --git a/src/trace/cursor/mod.rs b/src/trace/cursor/mod.rs index f18e45109..969d7ad0f 100644 --- a/src/trace/cursor/mod.rs +++ b/src/trace/cursor/mod.rs @@ -48,6 +48,8 @@ pub trait Cursor { type Val<'a>: Copy + Clone + Ord; /// Timestamps associated with updates type Time: Timestamp + Lattice + Ord + Clone; + /// Borrowed form of timestamp. + type TimeGat<'a>: Copy + IntoOwned<'a, Owned = Self::Time>; /// Owned form of update difference. type Diff: Semigroup + 'static; /// Borrowed form of update difference. @@ -81,7 +83,7 @@ pub trait Cursor { /// Applies `logic` to each pair of time and difference. Intended for mutation of the /// closure's scope. - fn map_times)>(&mut self, storage: &Self::Storage, logic: L); + fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L); /// Advances the cursor to the next key. fn step_key(&mut self, storage: &Self::Storage); @@ -111,7 +113,7 @@ pub trait Cursor { while self.val_valid(storage) { let mut kv_out = Vec::new(); self.map_times(storage, |ts, r| { - kv_out.push((ts.clone(), r.into_owned())); + kv_out.push((ts.into_owned(), r.into_owned())); }); out.push(((self.key(storage).into_owned(), self.val(storage).into_owned()), kv_out)); self.step_val(storage); diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index 4a7ce8598..66396d8c3 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -145,6 +145,7 @@ mod val_batch { type Key<'a> = ::ReadItem<'a>; type Val<'a> = ::ReadItem<'a>; type Time = ::Time; + type TimeGat<'a> = ::ReadItem<'a>; type Diff = ::Diff; type DiffGat<'a> = ::ReadItem<'a>; @@ -462,6 +463,7 @@ mod val_batch { type Key<'a> = ::ReadItem<'a>; type Val<'a> = ::ReadItem<'a>; type Time = ::Time; + type TimeGat<'a> = ::ReadItem<'a>; type Diff = ::Diff; type DiffGat<'a> = ::ReadItem<'a>; @@ -469,12 +471,12 @@ mod val_batch { fn key<'a>(&self, storage: &'a OrdValBatch) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) } fn val<'a>(&self, storage: &'a OrdValBatch) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) } - fn map_times)>(&mut self, storage: &OrdValBatch, mut logic: L2) { + fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &OrdValBatch, mut logic: L2) { let (lower, upper) = storage.storage.updates_for_value(self.val_cursor); for index in lower .. upper { - let time = storage.storage.times.index(index).into_owned(); + let time = storage.storage.times.index(index); let diff = storage.storage.diffs.index(index); - logic(&time, diff); + logic(time, diff); } } fn key_valid(&self, storage: &OrdValBatch) -> bool { self.key_cursor < storage.storage.keys.len() } @@ -705,6 +707,7 @@ mod key_batch { type Key<'a> = ::ReadItem<'a>; type Val<'a> = &'a (); type Time = ::Time; + type TimeGat<'a> = ::ReadItem<'a>; type Diff = ::Diff; type DiffGat<'a> = ::ReadItem<'a>; @@ -932,6 +935,7 @@ mod key_batch { type Key<'a> = ::ReadItem<'a>; type Val<'a> = &'a (); type Time = ::Time; + type TimeGat<'a> = ::ReadItem<'a>; type Diff = ::Diff; type DiffGat<'a> = ::ReadItem<'a>; @@ -939,12 +943,12 @@ mod key_batch { fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) } fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() } - fn map_times)>(&mut self, storage: &Self::Storage, mut logic: L2) { + fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) { let (lower, upper) = storage.storage.updates_for_key(self.key_cursor); for index in lower .. upper { - let time = storage.storage.times.index(index).into_owned(); + let time = storage.storage.times.index(index); let diff = storage.storage.diffs.index(index); - logic(&time, diff); + logic(time, diff); } } fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() } diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index 6f5ad1877..352e3941e 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -274,6 +274,7 @@ mod val_batch { type Key<'a> = ::ReadItem<'a>; type Val<'a> = ::ReadItem<'a>; type Time = ::Time; + type TimeGat<'a> = ::ReadItem<'a>; type Diff = ::Diff; type DiffGat<'a> = ::ReadItem<'a>; @@ -631,6 +632,7 @@ mod val_batch { type Key<'a> = ::ReadItem<'a>; type Val<'a> = ::ReadItem<'a>; type Time = ::Time; + type TimeGat<'a> = ::ReadItem<'a>; type Diff = ::Diff; type DiffGat<'a> = ::ReadItem<'a>; @@ -640,12 +642,12 @@ mod val_batch { storage.storage.keys.index(self.key_cursor) } fn val<'a>(&self, storage: &'a RhhValBatch) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) } - fn map_times)>(&mut self, storage: &RhhValBatch, mut logic: L2) { + fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &RhhValBatch, mut logic: L2) { let (lower, upper) = storage.storage.updates_for_value(self.val_cursor); for index in lower .. upper { - let time = storage.storage.times.index(index).into_owned(); + let time = storage.storage.times.index(index); let diff = storage.storage.diffs.index(index); - logic(&time, diff); + logic(time, diff); } } fn key_valid(&self, storage: &RhhValBatch) -> bool { self.key_cursor < storage.storage.keys.len() } diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index 52fc396c4..eb6b44585 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -112,6 +112,7 @@ where type Key<'a> = B::Key<'a>; type Val<'a> = B::Val<'a>; type Time = B::Time; + type TimeGat<'a> = B::TimeGat<'a>; type Diff = B::Diff; type DiffGat<'a> = B::DiffGat<'a>; diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 1021c9abb..7085e7d12 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -57,6 +57,8 @@ pub trait TraceReader { type Val<'a>: Copy + Clone; /// Timestamps associated with updates type Time: Timestamp + Lattice + Ord + Clone; + /// Borrowed form of timestamp. + type TimeGat<'a>: Copy + IntoOwned<'a, Owned = Self::Time>; /// Owned form of update difference. type Diff: Semigroup + 'static; /// Borrowed form of update difference. @@ -263,6 +265,8 @@ where type Val<'a>: Copy + Clone; /// Timestamps associated with updates type Time: Timestamp + Lattice + Ord + Clone; + /// Borrowed form of timestamp. + type TimeGat<'a>: Copy + IntoOwned<'a, Owned = Self::Time>; /// Owned form of update difference. type Diff: Semigroup + 'static; /// Borrowed form of update difference. @@ -375,6 +379,7 @@ pub mod rc_blanket_impls { type Key<'a> = B::Key<'a>; type Val<'a> = B::Val<'a>; type Time = B::Time; + type TimeGat<'a> = B::TimeGat<'a>; type Diff = B::Diff; type DiffGat<'a> = B::DiffGat<'a>; @@ -409,6 +414,7 @@ pub mod rc_blanket_impls { type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = C::Time; + type TimeGat<'a> = C::TimeGat<'a>; type Diff = C::Diff; type DiffGat<'a> = C::DiffGat<'a>; @@ -421,7 +427,7 @@ pub mod rc_blanket_impls { #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) } #[inline] - fn map_times)>(&mut self, storage: &Self::Storage, logic: L) { + fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) { self.cursor.map_times(storage, logic) } @@ -478,6 +484,7 @@ pub mod abomonated_blanket_impls { type Key<'a> = B::Key<'a>; type Val<'a> = B::Val<'a>; type Time = B::Time; + type TimeGat<'a> = B::TimeGat<'a>; type Diff = B::Diff; type DiffGat<'a> = B::DiffGat<'a>; @@ -512,6 +519,7 @@ pub mod abomonated_blanket_impls { type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = C::Time; + type TimeGat<'a> = C::TimeGat<'a>; type Diff = C::Diff; type DiffGat<'a> = C::DiffGat<'a>; @@ -524,7 +532,7 @@ pub mod abomonated_blanket_impls { #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) } #[inline] - fn map_times)>(&mut self, storage: &Self::Storage, logic: L) { + fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) { self.cursor.map_times(storage, logic) } diff --git a/src/trace/wrappers/enter.rs b/src/trace/wrappers/enter.rs index 071e7d185..4f8280c70 100644 --- a/src/trace/wrappers/enter.rs +++ b/src/trace/wrappers/enter.rs @@ -34,6 +34,7 @@ where type Key<'a> = Tr::Key<'a>; type Val<'a> = Tr::Val<'a>; type Time = TInner; + type TimeGat<'a> = &'a TInner; type Diff = Tr::Diff; type DiffGat<'a> = Tr::DiffGat<'a>; @@ -117,6 +118,7 @@ where type Key<'a> = B::Key<'a>; type Val<'a> = B::Val<'a>; type Time = TInner; + type TimeGat<'a> = &'a TInner; type Diff = B::Diff; type DiffGat<'a> = B::DiffGat<'a>; @@ -170,6 +172,7 @@ where type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = TInner; + type TimeGat<'a> = &'a TInner; type Diff = C::Diff; type DiffGat<'a> = C::DiffGat<'a>; @@ -183,8 +186,9 @@ where #[inline] fn map_times)>(&mut self, storage: &Self::Storage, mut logic: L) { + use crate::trace::cursor::IntoOwned; self.cursor.map_times(storage, |time, diff| { - logic(&TInner::to_inner(time.clone()), diff) + logic(&TInner::to_inner(time.into_owned()), diff) }) } @@ -222,6 +226,7 @@ where type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = TInner; + type TimeGat<'a> = &'a TInner; type Diff = C::Diff; type DiffGat<'a> = C::DiffGat<'a>; @@ -235,8 +240,9 @@ where #[inline] fn map_times)>(&mut self, storage: &Self::Storage, mut logic: L) { + use crate::trace::cursor::IntoOwned; self.cursor.map_times(&storage.batch, |time, diff| { - logic(&TInner::to_inner(time.clone()), diff) + logic(&TInner::to_inner(time.into_owned()), diff) }) } diff --git a/src/trace/wrappers/enter_at.rs b/src/trace/wrappers/enter_at.rs index 692764bc7..c177b473b 100644 --- a/src/trace/wrappers/enter_at.rs +++ b/src/trace/wrappers/enter_at.rs @@ -6,6 +6,7 @@ use timely::progress::{Antichain, frontier::AntichainRef}; use crate::lattice::Lattice; use crate::trace::{TraceReader, BatchReader, Description}; use crate::trace::cursor::Cursor; +use crate::trace::cursor::IntoOwned; /// Wrapper to provide trace to nested scope. /// @@ -52,6 +53,7 @@ where type Key<'a> = Tr::Key<'a>; type Val<'a> = Tr::Val<'a>; type Time = TInner; + type TimeGat<'a> = &'a TInner; type Diff = Tr::Diff; type DiffGat<'a> = Tr::DiffGat<'a>; @@ -140,6 +142,7 @@ where type Key<'a> = B::Key<'a>; type Val<'a> = B::Val<'a>; type Time = TInner; + type TimeGat<'a> = &'a TInner; type Diff = B::Diff; type DiffGat<'a> = B::DiffGat<'a>; @@ -197,6 +200,7 @@ where type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = TInner; + type TimeGat<'a> = &'a TInner; type Diff = C::Diff; type DiffGat<'a> = C::DiffGat<'a>; @@ -214,7 +218,7 @@ where let val = self.val(storage); let logic2 = &mut self.logic; self.cursor.map_times(storage, |time, diff| { - logic(&logic2(key, val, time), diff) + logic(&logic2(key, val, &time.into_owned()), diff) }) } @@ -255,6 +259,7 @@ where type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = TInner; + type TimeGat<'a> = &'a TInner; type Diff = C::Diff; type DiffGat<'a> = C::DiffGat<'a>; @@ -272,7 +277,7 @@ where let val = self.val(storage); let logic2 = &mut self.logic; self.cursor.map_times(&storage.batch, |time, diff| { - logic(&logic2(key, val, time), diff) + logic(&logic2(key, val, &time.into_owned()), diff) }) } diff --git a/src/trace/wrappers/filter.rs b/src/trace/wrappers/filter.rs index f68efe751..f43ff0dc1 100644 --- a/src/trace/wrappers/filter.rs +++ b/src/trace/wrappers/filter.rs @@ -33,6 +33,7 @@ where type Key<'a> = Tr::Key<'a>; type Val<'a> = Tr::Val<'a>; type Time = Tr::Time; + type TimeGat<'a> = Tr::TimeGat<'a>; type Diff = Tr::Diff; type DiffGat<'a> = Tr::DiffGat<'a>; @@ -86,6 +87,7 @@ where type Key<'a> = B::Key<'a>; type Val<'a> = B::Val<'a>; type Time = B::Time; + type TimeGat<'a> = B::TimeGat<'a>; type Diff = B::Diff; type DiffGat<'a> = B::DiffGat<'a>; @@ -134,6 +136,7 @@ where type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = C::Time; + type TimeGat<'a> = C::TimeGat<'a>; type Diff = C::Diff; type DiffGat<'a> = C::DiffGat<'a>; @@ -146,7 +149,7 @@ where #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) } #[inline] - fn map_times)>(&mut self, storage: &Self::Storage, logic: L) { + fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) { let key = self.key(storage); let val = self.val(storage); if (self.logic)(key, val) { @@ -188,6 +191,7 @@ where type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = C::Time; + type TimeGat<'a> = C::TimeGat<'a>; type Diff = C::Diff; type DiffGat<'a> = C::DiffGat<'a>; @@ -200,7 +204,7 @@ where #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) } #[inline] - fn map_times)>(&mut self, storage: &Self::Storage, logic: L) { + fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, logic: L) { let key = self.key(storage); let val = self.val(storage); if (self.logic)(key, val) { diff --git a/src/trace/wrappers/freeze.rs b/src/trace/wrappers/freeze.rs index 52304c766..3f8843460 100644 --- a/src/trace/wrappers/freeze.rs +++ b/src/trace/wrappers/freeze.rs @@ -26,6 +26,7 @@ use timely::progress::frontier::AntichainRef; use crate::operators::arrange::Arranged; use crate::trace::{TraceReader, BatchReader, Description}; use crate::trace::cursor::Cursor; +use crate::trace::cursor::IntoOwned; /// Freezes updates to an arrangement using a supplied function. /// @@ -78,6 +79,7 @@ where type Key<'a> = Tr::Key<'a>; type Val<'a> = Tr::Val<'a>; type Time = Tr::Time; + type TimeGat<'a> = Tr::TimeGat<'a>; type Diff = Tr::Diff; type DiffGat<'a> = Tr::DiffGat<'a>; @@ -141,6 +143,7 @@ where type Key<'a> = B::Key<'a>; type Val<'a> = B::Val<'a>; type Time = B::Time; + type TimeGat<'a> = B::TimeGat<'a>; type Diff = B::Diff; type DiffGat<'a> = B::DiffGat<'a>; @@ -184,6 +187,7 @@ where type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = C::Time; + type TimeGat<'a> = C::TimeGat<'a>; type Diff = C::Diff; type DiffGat<'a> = C::DiffGat<'a>; @@ -195,11 +199,11 @@ where #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(storage) } #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) } - #[inline] fn map_times)>(&mut self, storage: &Self::Storage, mut logic: L) { + #[inline] fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) { let func = &self.func; self.cursor.map_times(storage, |time, diff| { - if let Some(time) = func(time) { - logic(&time, diff); + if let Some(time) = func(&time.into_owned()) { + logic( as IntoOwned>::borrow_as(&time), diff); } }) } @@ -235,6 +239,7 @@ where type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = C::Time; + type TimeGat<'a> = C::TimeGat<'a>; type Diff = C::Diff; type DiffGat<'a> = C::DiffGat<'a>; @@ -246,11 +251,11 @@ where #[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { self.cursor.key(&storage.batch) } #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) } - #[inline] fn map_times)>(&mut self, storage: &Self::Storage, mut logic: L) { + #[inline] fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) { let func = &self.func; self.cursor.map_times(&storage.batch, |time, diff| { - if let Some(time) = func(time) { - logic(&time, diff); + if let Some(time) = func(&time.into_owned()) { + logic( as IntoOwned>::borrow_as(&time), diff); } }) } diff --git a/src/trace/wrappers/frontier.rs b/src/trace/wrappers/frontier.rs index c5e2926a4..8379b8bf6 100644 --- a/src/trace/wrappers/frontier.rs +++ b/src/trace/wrappers/frontier.rs @@ -35,6 +35,7 @@ impl TraceReader for TraceFrontier { type Key<'a> = Tr::Key<'a>; type Val<'a> = Tr::Val<'a>; type Time = Tr::Time; + type TimeGat<'a> = Tr::TimeGat<'a>; type Diff = Tr::Diff; type DiffGat<'a> = Tr::DiffGat<'a>; @@ -85,6 +86,7 @@ impl BatchReader for BatchFrontier { type Key<'a> = B::Key<'a>; type Val<'a> = B::Val<'a>; type Time = B::Time; + type TimeGat<'a> = B::TimeGat<'a>; type Diff = B::Diff; type DiffGat<'a> = B::DiffGat<'a>; @@ -129,6 +131,7 @@ impl Cursor for CursorFrontier { type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = C::Time; + type TimeGat<'a> = C::TimeGat<'a>; type Diff = C::Diff; type DiffGat<'a> = C::DiffGat<'a>; @@ -141,15 +144,16 @@ impl Cursor for CursorFrontier { #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(storage) } #[inline] - fn map_times)>(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) { let since = self.since.borrow(); let until = self.until.borrow(); let mut temp: C::Time = ::minimum(); self.cursor.map_times(storage, |time, diff| { - temp.clone_from(time); + use crate::trace::cursor::IntoOwned; + time.clone_onto(&mut temp); temp.advance_by(since); if !until.less_equal(&temp) { - logic(&temp, diff); + logic( as IntoOwned>::borrow_as(&temp), diff); } }) } @@ -190,6 +194,7 @@ where type Key<'a> = C::Key<'a>; type Val<'a> = C::Val<'a>; type Time = C::Time; + type TimeGat<'a> = C::TimeGat<'a>; type Diff = C::Diff; type DiffGat<'a> = C::DiffGat<'a>; @@ -202,15 +207,16 @@ where #[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { self.cursor.val(&storage.batch) } #[inline] - fn map_times)>(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L) { let since = self.since.borrow(); let until = self.until.borrow(); let mut temp: C::Time = ::minimum(); self.cursor.map_times(&storage.batch, |time, diff| { - temp.clone_from(time); + use crate::trace::cursor::IntoOwned; + time.clone_onto(&mut temp); temp.advance_by(since); if !until.less_equal(&temp) { - logic(&temp, diff); + logic( as IntoOwned>::borrow_as(&temp), diff); } }) } diff --git a/src/trace/wrappers/rc.rs b/src/trace/wrappers/rc.rs index ef4a30e1a..4a877031a 100644 --- a/src/trace/wrappers/rc.rs +++ b/src/trace/wrappers/rc.rs @@ -82,6 +82,7 @@ impl TraceReader for TraceRc { type Key<'a> = Tr::Key<'a>; type Val<'a> = Tr::Val<'a>; type Time = Tr::Time; + type TimeGat<'a> = Tr::TimeGat<'a>; type Diff = Tr::Diff; type DiffGat<'a> = Tr::DiffGat<'a>;