|
| 1 | +//! `RecordedUpdates<U>`: the columnar stream container. |
| 2 | +//! |
| 3 | +//! This is the CONTAINER tier of the columnar chunk: the type that flows on |
| 4 | +//! dataflow edges (it is `Accountable` and `ContainerBytes`), built input-side |
| 5 | +//! by the collection [`Builder`](super::Builder), shuffled by [`Pact`](super::Pact), |
| 6 | +//! and melded into [`ColChunk`](crate::columnar::trace::ColChunk) batches by the |
| 7 | +//! trace face's [`Chunker`](crate::columnar::trace::Chunker). It wraps the |
| 8 | +//! columnar [`Updates`](crate::columnar::updates::Updates) trie with the |
| 9 | +//! pre-consolidation record count timely's exchange accounting needs. |
| 10 | +
|
| 11 | +use crate::columnar::{layout, updates}; |
| 12 | + |
| 13 | +/// A thin wrapper around `Updates` that tracks the pre-consolidation record count |
| 14 | +/// for timely's exchange accounting. This wrapper is the stream container type; |
| 15 | +/// the `TrieChunker` strips it, passing bare `UpdatesTyped` into the merge batcher. |
| 16 | +pub struct RecordedUpdates<U: layout::ColumnarUpdate> { |
| 17 | + /// The trie of `(key, val, time, diff)` updates. |
| 18 | + pub updates: updates::Updates<U>, |
| 19 | + /// Number of records in `updates` before consolidation. |
| 20 | + pub records: usize, |
| 21 | + /// Whether `updates` is known to be sorted and consolidated |
| 22 | + /// (no duplicate (key, val, time) triples, no zero diffs). |
| 23 | + pub consolidated: bool, |
| 24 | +} |
| 25 | + |
| 26 | +impl<U: layout::ColumnarUpdate> Default for RecordedUpdates<U> { |
| 27 | + fn default() -> Self { Self { updates: Default::default(), records: 0, consolidated: true } } |
| 28 | +} |
| 29 | + |
| 30 | +impl<U: layout::ColumnarUpdate> Clone for RecordedUpdates<U> { |
| 31 | + fn clone(&self) -> Self { Self { updates: self.updates.clone(), records: self.records, consolidated: self.consolidated } } |
| 32 | +} |
| 33 | + |
| 34 | +impl<U: layout::ColumnarUpdate> timely::Accountable for RecordedUpdates<U> { |
| 35 | + #[inline] fn record_count(&self) -> i64 { self.records as i64 } |
| 36 | +} |
| 37 | + |
| 38 | +impl<U: layout::ColumnarUpdate> timely::dataflow::channels::ContainerBytes for RecordedUpdates<U> { |
| 39 | + // Wire format: a 16-byte prefix (`records`, `consolidated`) then the bare |
| 40 | + // four-column `Updates` codec — so the wire and spill paths share one encoder |
| 41 | + // ([`Updates::write_to`](crate::columnar::updates::Updates::write_to)). |
| 42 | + fn from_bytes(mut bytes: timely::bytes::arc::Bytes) -> Self { |
| 43 | + let prefix = bytes.extract_to(16); |
| 44 | + let records = u64::from_le_bytes(prefix[0..8].try_into().unwrap()) as usize; |
| 45 | + let consolidated = u64::from_le_bytes(prefix[8..16].try_into().unwrap()) != 0; |
| 46 | + RecordedUpdates { updates: updates::Updates::read_from(bytes), records, consolidated } |
| 47 | + } |
| 48 | + |
| 49 | + fn length_in_bytes(&self) -> usize { |
| 50 | + 16 + self.updates.length_in_bytes() |
| 51 | + } |
| 52 | + |
| 53 | + fn into_bytes<W: std::io::Write>(&self, writer: &mut W) { |
| 54 | + writer.write_all(&(self.records as u64).to_le_bytes()).unwrap(); |
| 55 | + writer.write_all(&(self.consolidated as u64).to_le_bytes()).unwrap(); |
| 56 | + self.updates.write_to(writer); |
| 57 | + } |
| 58 | +} |
| 59 | + |
| 60 | +// Container trait impls for RecordedUpdates, enabling iterative scopes. |
| 61 | +mod container_impls { |
| 62 | + use columnar::{Columnar, Index, Len, Push}; |
| 63 | + use timely::progress::{Timestamp, timestamp::Refines}; |
| 64 | + use crate::difference::Abelian; |
| 65 | + use crate::collection::containers::{Negate, Enter, Leave, ResultsIn}; |
| 66 | + |
| 67 | + use crate::columnar::layout::ColumnarUpdate as Update; |
| 68 | + use crate::columnar::updates::{self, UpdatesTyped}; |
| 69 | + use super::RecordedUpdates; |
| 70 | + |
| 71 | + impl<U: Update<Diff: Abelian>> Negate for RecordedUpdates<U> { |
| 72 | + fn negate(self) -> Self { |
| 73 | + use columnar::Container; |
| 74 | + let RecordedUpdates { mut updates, records, consolidated } = self; |
| 75 | + let view = updates.view(); |
| 76 | + let old_diffs = view.diffs.values; |
| 77 | + let mut new_diffs = <<U::Diff as Columnar>::Container as Container>::with_capacity_for([old_diffs].into_iter()); |
| 78 | + let mut owned = U::Diff::default(); |
| 79 | + for i in 0..old_diffs.len() { |
| 80 | + columnar::Columnar::copy_from(&mut owned, old_diffs.get(i)); |
| 81 | + owned.negate(); |
| 82 | + new_diffs.push(&owned); |
| 83 | + } |
| 84 | + // TODO: avoid make_typed() call as we are overwriting. |
| 85 | + updates.diffs.make_typed().values = new_diffs; |
| 86 | + RecordedUpdates { updates, records, consolidated } |
| 87 | + } |
| 88 | + } |
| 89 | + |
| 90 | + impl<K, V, T1, T2, R> Enter<T1, T2> for RecordedUpdates<(K, V, T1, R)> |
| 91 | + where |
| 92 | + (K, V, T1, R): Update<Key=K, Val=V, Time=T1, Diff=R>, |
| 93 | + (K, V, T2, R): Update<Key=K, Val=V, Time=T2, Diff=R>, |
| 94 | + T1: Timestamp + Columnar + Default + Clone, |
| 95 | + T2: Refines<T1> + Columnar + Default + Clone, |
| 96 | + K: Columnar, V: Columnar, R: Columnar, |
| 97 | + { |
| 98 | + type InnerContainer = RecordedUpdates<(K, V, T2, R)>; |
| 99 | + fn enter(self) -> Self::InnerContainer { |
| 100 | + // Rebuild the time column from a borrowed view; keys/vals/diffs |
| 101 | + // move untouched, preserving any Stash::Bytes backing. |
| 102 | + use columnar::bytes::stash::Stash; |
| 103 | + let RecordedUpdates { updates, records, consolidated } = self; |
| 104 | + let times = updates.times.borrow(); |
| 105 | + let times_values = times.values; |
| 106 | + let mut new_times = <<T2 as Columnar>::Container as Default>::default(); |
| 107 | + let mut t1_owned = T1::default(); |
| 108 | + for i in 0..times_values.len() { |
| 109 | + Columnar::copy_from(&mut t1_owned, times_values.get(i)); |
| 110 | + let t2 = T2::to_inner(t1_owned.clone()); |
| 111 | + new_times.push(&t2); |
| 112 | + } |
| 113 | + // TODO: Assumes Enter (to_inner) is order-preserving on times. |
| 114 | + // Deconstruct `updates` to reform with same parts but different time type. |
| 115 | + let updates::Updates { keys, vals, mut times, diffs } = updates; |
| 116 | + // TODO: Avoid make_typed() call, as we are overwriting. |
| 117 | + times.make_typed(); |
| 118 | + let Stash::Typed(times_lists) = times else { unreachable!() }; |
| 119 | + let times = Stash::Typed(updates::Lists { |
| 120 | + values: new_times, |
| 121 | + bounds: times_lists.bounds, |
| 122 | + }); |
| 123 | + RecordedUpdates { |
| 124 | + updates: updates::Updates { keys, vals, times, diffs }, |
| 125 | + records, |
| 126 | + consolidated, |
| 127 | + } |
| 128 | + } |
| 129 | + } |
| 130 | + |
| 131 | + impl<K, V, T1, T2, R> Leave<T1, T2> for RecordedUpdates<(K, V, T1, R)> |
| 132 | + where |
| 133 | + (K, V, T1, R): Update<Key=K, Val=V, Time=T1, Diff=R>, |
| 134 | + (K, V, T2, R): Update<Key=K, Val=V, Time=T2, Diff=R>, |
| 135 | + T1: Refines<T2> + Columnar + Default + Clone, |
| 136 | + T2: Timestamp + Columnar + Default + Clone, |
| 137 | + K: Columnar, V: Columnar, R: Columnar, |
| 138 | + { |
| 139 | + type OuterContainer = RecordedUpdates<(K, V, T2, R)>; |
| 140 | + fn leave(self) -> Self::OuterContainer { |
| 141 | + // Rebuild the time column from a borrowed view; keys/vals/diffs |
| 142 | + // move untouched. Distinct T1 times can collapse to the same T2 |
| 143 | + // time, so the result is consolidated. |
| 144 | + use columnar::bytes::stash::Stash; |
| 145 | + let RecordedUpdates { updates, records, consolidated: _ } = self; |
| 146 | + let times = updates.times.borrow(); |
| 147 | + let times_values = times.values; |
| 148 | + let mut new_times = <<T2 as Columnar>::Container as Default>::default(); |
| 149 | + let mut t1_owned = T1::default(); |
| 150 | + for i in 0..times_values.len() { |
| 151 | + Columnar::copy_from(&mut t1_owned, times_values.get(i)); |
| 152 | + let t2: T2 = t1_owned.clone().to_outer(); |
| 153 | + new_times.push(&t2); |
| 154 | + } |
| 155 | + let updates::Updates { keys, vals, mut times, diffs } = updates; |
| 156 | + // Extract `times` bounds via make_typed (one-column copy if Bytes-backed). |
| 157 | + times.make_typed(); |
| 158 | + let Stash::Typed(times_lists) = times else { unreachable!() }; |
| 159 | + let times = Stash::Typed(updates::Lists { |
| 160 | + values: new_times, |
| 161 | + bounds: times_lists.bounds, |
| 162 | + }); |
| 163 | + let mid = updates::Updates { keys, vals, times, diffs }; |
| 164 | + // Collapse adjacent (k,v,t2) duplicates created by `to_outer`. |
| 165 | + RecordedUpdates { |
| 166 | + updates: mid.into_typed().consolidate().into(), |
| 167 | + records, |
| 168 | + consolidated: true, |
| 169 | + } |
| 170 | + } |
| 171 | + } |
| 172 | + |
| 173 | + impl<U: Update> ResultsIn<<U::Time as Timestamp>::Summary> for RecordedUpdates<U> { |
| 174 | + fn results_in(self, step: &<U::Time as Timestamp>::Summary) -> Self { |
| 175 | + use timely::progress::PathSummary; |
| 176 | + // Apply results_in to each time; drop updates whose time maps to None. |
| 177 | + // This must rebuild the trie since some entries may be removed. |
| 178 | + let mut output = UpdatesTyped::<U>::default(); |
| 179 | + let mut time_owned = U::Time::default(); |
| 180 | + // TODO: Build all times first, and if no `None` outputs, can re-use k, v, d. |
| 181 | + for (k, v, t, d) in self.updates.view().iter() { |
| 182 | + Columnar::copy_from(&mut time_owned, t); |
| 183 | + if let Some(new_time) = step.results_in(&time_owned) { |
| 184 | + output.push((k, v, &new_time, d)); |
| 185 | + } |
| 186 | + } |
| 187 | + // TODO: Time advancement may not be order preserving, but .. it could be. |
| 188 | + // TODO: Before this is consolidated the above would need to be `form`ed. |
| 189 | + RecordedUpdates { updates: output.into(), records: self.records, consolidated: false } |
| 190 | + } |
| 191 | + } |
| 192 | +} |
| 193 | + |
| 194 | +#[cfg(test)] |
| 195 | +mod test { |
| 196 | + use columnar::Push; |
| 197 | + use timely::dataflow::channels::ContainerBytes; |
| 198 | + use crate::columnar::updates::UpdatesTyped; |
| 199 | + use super::RecordedUpdates; |
| 200 | + |
| 201 | + type Upd = (u64, u64, u64, i64); |
| 202 | + |
| 203 | + // The wire codec must round-trip: `into_bytes` then `from_bytes` reproduces |
| 204 | + // the record count, the consolidated flag, and the four-column contents. The |
| 205 | + // body delegates to `Updates::write_to`/`read_from` (shared with spill); this |
| 206 | + // covers the 16-byte `records` / `consolidated` prefix the wire format adds, |
| 207 | + // and asserts `length_in_bytes` agrees with what `into_bytes` emits. |
| 208 | + #[test] |
| 209 | + fn recorded_updates_bytes_round_trip() { |
| 210 | + let rows: Vec<Upd> = vec![ |
| 211 | + (0, 0, 0, 1), (0, 1, 0, 1), (1, 0, 0, 2), (1, 0, 3, -1), (2, 5, 7, 4), |
| 212 | + ]; |
| 213 | + let mut trie = UpdatesTyped::<Upd>::default(); |
| 214 | + for (k, v, t, d) in &rows { trie.push((k, v, t, d)); } |
| 215 | + let trie = trie.consolidate(); |
| 216 | + let records = trie.len(); |
| 217 | + let original = RecordedUpdates::<Upd> { updates: trie.into(), records, consolidated: true }; |
| 218 | + let want: Vec<Upd> = original.updates.view().iter().map(|(k, v, t, d)| (*k, *v, *t, *d)).collect(); |
| 219 | + |
| 220 | + let mut buf = Vec::new(); |
| 221 | + original.into_bytes(&mut buf); |
| 222 | + assert_eq!(buf.len(), original.length_in_bytes(), "length_in_bytes disagrees with into_bytes"); |
| 223 | + |
| 224 | + let bytes = timely::bytes::arc::BytesMut::from(buf).freeze(); |
| 225 | + let decoded = RecordedUpdates::<Upd>::from_bytes(bytes); |
| 226 | + |
| 227 | + assert_eq!(decoded.records, records); |
| 228 | + assert!(decoded.consolidated); |
| 229 | + let got: Vec<Upd> = decoded.updates.view().iter().map(|(k, v, t, d)| (*k, *v, *t, *d)).collect(); |
| 230 | + assert_eq!(got, want); |
| 231 | + } |
| 232 | +} |
0 commit comments