Skip to content

Commit 580ddb1

Browse files
authored
Reclaim memory where possible, dedup while iterating over remap_trace (#33367)
Shrinks heap allocations for some of the containers used in reclock. Replaces a vec with a chained iterator and in-line dedup. ### Motivation fixes MaterializeInc/database-issues#9587
1 parent efd6961 commit 580ddb1

File tree

4 files changed

+219
-8
lines changed

4 files changed

+219
-8
lines changed

Cargo.lock

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/timely-util/Cargo.toml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ publish = false
99
[lints]
1010
workspace = true
1111

12+
[[bench]]
13+
name = "reclock"
14+
harness = false
15+
1216
[dependencies]
1317
ahash = { version = "0.8.12", default-features = false }
1418
bincode = "1.3.3"
@@ -29,8 +33,17 @@ tracing = "0.1.37"
2933
uuid = { version = "1.17.0", features = ["serde", "v4"] }
3034
workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }
3135

36+
# This crate is used for capturing memory usage in a unit test. It is feature
37+
# flagged because it replaces the global memory allocator. Unfortunately,
38+
# optional dependencies are not allowed in `dev-dependencies`, so it lives here.
39+
allocation-counter = { version = "0", optional = true }
40+
41+
[dev-dependencies]
42+
criterion = { version = "0.6.0" }
43+
3244
[features]
3345
default = ["mz-ore/default", "workspace-hack"]
46+
count-allocations = ["allocation-counter"]
3447

3548
[package.metadata.cargo-udeps.ignore]
3649
normal = ["workspace-hack"]

src/timely-util/benches/reclock.rs

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
// Copyright Materialize, Inc. and contributors. All rights reserved.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the LICENSE file.
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0.
9+
10+
use std::sync::mpsc::Receiver;
11+
12+
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
13+
use differential_dataflow::{
14+
ExchangeData,
15+
input::{Input, InputSession},
16+
};
17+
18+
use mz_ore::Overflowing;
19+
use mz_timely_util::capture::PusherCapture;
20+
use mz_timely_util::order::Partitioned;
21+
use mz_timely_util::reclock::reclock;
22+
use timely::communication::allocator::Thread;
23+
use timely::dataflow::Scope;
24+
use timely::dataflow::operators::capture::Event;
25+
use timely::dataflow::operators::unordered_input::UnorderedHandle;
26+
use timely::dataflow::operators::{ActivateCapability, Capture, UnorderedInput};
27+
use timely::progress::timestamp::Refines;
28+
use timely::progress::{Antichain, Timestamp};
29+
use timely::worker::Worker;
30+
31+
type Diff = Overflowing<i64>;
32+
type FromTime = Partitioned<u64, u64>;
33+
type IntoTime = u64;
34+
type BindingHandle<FromTime> = InputSession<IntoTime, FromTime, Diff>;
35+
type DataHandle<D, FromTime> = (
36+
UnorderedHandle<FromTime, (D, FromTime, Diff)>,
37+
ActivateCapability<FromTime>,
38+
);
39+
type ReclockedStream<D> = Receiver<Event<IntoTime, Vec<(D, IntoTime, Diff)>>>;
40+
41+
fn harness<FromTime, D, F, R>(as_of: Antichain<IntoTime>, test_logic: F) -> R
42+
where
43+
FromTime: Timestamp + Refines<()>,
44+
D: ExchangeData,
45+
F: FnOnce(
46+
&mut Worker<Thread>,
47+
BindingHandle<FromTime>,
48+
DataHandle<D, FromTime>,
49+
ReclockedStream<D>,
50+
) -> R
51+
+ Send
52+
+ Sync
53+
+ 'static,
54+
R: Send + 'static,
55+
{
56+
timely::execute_directly(move |worker| {
57+
let (bindings, data, data_cap, reclocked) = worker.dataflow::<(), _, _>(|scope| {
58+
let (bindings, data_pusher, reclocked) =
59+
scope.scoped::<IntoTime, _, _>("IntoScope", move |scope| {
60+
let (binding_handle, binding_collection) = scope.new_collection();
61+
let (data_pusher, reclocked_collection) = reclock(&binding_collection, as_of);
62+
let reclocked_capture = reclocked_collection.inner.capture();
63+
(binding_handle, data_pusher, reclocked_capture)
64+
});
65+
66+
let (data, data_cap) = scope.scoped::<FromTime, _, _>("FromScope", move |scope| {
67+
let ((handle, cap), data) = scope.new_unordered_input();
68+
data.capture_into(PusherCapture(data_pusher));
69+
(handle, cap)
70+
});
71+
72+
(bindings, data, data_cap, reclocked)
73+
});
74+
75+
test_logic(worker, bindings, (data, data_cap), reclocked)
76+
})
77+
}
78+
79+
fn step(worker: &mut Worker<Thread>) {
80+
for _ in 0..4 {
81+
worker.step();
82+
}
83+
}
84+
85+
fn bench_reclock_simple(c: &mut Criterion) {
86+
let mut group = c.benchmark_group("bench_reclock_simple_group");
87+
88+
for n in [100_u64, 1_000_u64, 10_000_u64] {
89+
group.bench_with_input(BenchmarkId::new("bench_reclock_simple", n), &n, |b, d| {
90+
b.iter(|| {
91+
let upto = *d;
92+
let downgrade_interval = upto / 3;
93+
let data_interval = upto / 10;
94+
harness::<FromTime, u64, _, _>(
95+
Antichain::from_elem(0),
96+
move |worker, mut bindings, (mut data, mut data_cap), _| {
97+
for ts in 0..upto {
98+
if ts > 0 {
99+
bindings.update_at(
100+
Partitioned::new_singleton(0, ts - 1),
101+
ts,
102+
Diff::MINUS_ONE,
103+
);
104+
if ts.is_multiple_of(downgrade_interval) {
105+
data_cap.downgrade(&Partitioned::new_singleton(0, ts - 1));
106+
}
107+
if ts.is_multiple_of(data_interval) {
108+
data.session(data_cap.clone()).give((
109+
ts,
110+
Partitioned::new_singleton(0, ts),
111+
Diff::ONE,
112+
));
113+
}
114+
}
115+
bindings.update_at(Partitioned::new_singleton(0, ts), ts, Diff::ONE);
116+
bindings.advance_to(ts + 1);
117+
bindings.flush();
118+
step(worker);
119+
}
120+
},
121+
)
122+
})
123+
});
124+
}
125+
}
126+
127+
criterion_group!(benches, bench_reclock_simple);
128+
criterion_main!(benches);

src/timely-util/src/reclock.rs

Lines changed: 70 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,6 @@ where
248248
let mut source_frontier = MutableAntichain::new_bottom(FromTime::minimum());
249249

250250
let mut binding_buffer = Vec::new();
251-
let mut interesting_times = Vec::new();
252251

253252
// Accumulation buffer for `remap_input` updates.
254253
use timely::progress::ChangeBatch;
@@ -365,13 +364,22 @@ where
365364
// updates.
366365
let mut min_time = IntoTime::minimum();
367366
min_time.advance_by(remap_since.borrow());
368-
interesting_times.push(min_time);
369-
interesting_times.extend(remap_trace.iter().map(|(_, t, _)| t.clone()));
370-
interesting_times.dedup();
371-
for cur_time in interesting_times.drain(..) {
367+
let mut prev_cur_time = None;
368+
for cur_time in [min_time]
369+
.iter()
370+
.chain(remap_trace.iter().map(|(_, t, _)| t))
371+
.filter(|&v| {
372+
if prev_cur_time.is_some_and(|pv| pv == v) {
373+
false
374+
} else {
375+
prev_cur_time = Some(v);
376+
true
377+
}
378+
})
379+
{
372380
// 4.0. Load updates of `cur_time` from the trace into `cur_binding` to
373381
// construct the `[FromTime]` frontier that `cur_time` maps to.
374-
while let Some((t_from, _, diff)) = remap.next_if(|(_, t, _)| t == &cur_time) {
382+
while let Some((t_from, _, diff)) = remap.next_if(|(_, t, _)| t == cur_time) {
375383
binding_buffer.push((t_from.clone(), *diff));
376384
}
377385
cur_binding.update_iter(binding_buffer.drain(..));
@@ -387,7 +395,7 @@ where
387395
// reclocked with the bindings until `prev_remap_upper`. For this reason
388396
// we only need to reconsider these updates when we start looking at new
389397
// bindings, i.e bindings that are beyond `prev_remap_upper`.
390-
if prev_remap_upper.less_equal(&cur_time) {
398+
if prev_remap_upper.less_equal(cur_time) {
391399
deferred_source_updates.retain_mut(|batch| {
392400
for (data, _, diff) in batch.extract(cur_binding) {
393401
session.give((data, cur_time.clone(), diff));
@@ -406,7 +414,7 @@ where
406414
.iter()
407415
.any(|t| !cur_binding.less_equal(t))
408416
{
409-
reclocked_source_frontier.insert(cur_time);
417+
reclocked_source_frontier.insert(cur_time.clone());
410418
}
411419
}
412420

@@ -419,6 +427,13 @@ where
419427
consolidation::consolidate_updates(&mut remap_trace);
420428
remap_trace
421429
.sort_unstable_by(|(_, t1, _): &(_, IntoTime, _), (_, t2, _)| t1.cmp(t2));
430+
431+
// If using less than a quarter of the capacity, shrink the container. To avoid having
432+
// to resize the container on a subsequent push, shrink to 2x the length, which is
433+
// what push would grow it to.
434+
if remap_trace.len() < remap_trace.capacity() / 4 {
435+
remap_trace.shrink_to(remap_trace.len() * 2);
436+
}
422437
}
423438

424439
// STEP 6. Tidy up deferred updates
@@ -436,6 +451,13 @@ where
436451
let b = dsu.pop().unwrap();
437452
dsu.push(a.merge_with(b));
438453
}
454+
455+
// If using less than a quarter of the capacity, shrink the container. To avoid having
456+
// to resize the container on a subsequent push, shrink to 2x the length, which is
457+
// what push would grow it to.
458+
if deferred_source_updates.len() < deferred_source_updates.capacity() / 4 {
459+
deferred_source_updates.shrink_to(deferred_source_updates.len() * 2);
460+
}
439461
}
440462
});
441463

@@ -1154,4 +1176,44 @@ mod test {
11541176
},
11551177
);
11561178
}
1179+
1180+
#[cfg(feature = "count-allocations")]
1181+
#[mz_ore::test]
1182+
#[cfg_attr(miri, ignore)] // too slow
1183+
fn test_shrinking() {
1184+
let as_of = 1000_u64;
1185+
1186+
// Test that supplying a single big batch of unconsolidated bindings gets
1187+
// consolidated after a single worker step.
1188+
1189+
harness::<FromTime, u64, _, _>(
1190+
Antichain::from_elem(0),
1191+
move |worker, mut bindings, (_data, mut data_cap), _| {
1192+
let info1 = allocation_counter::measure(|| {
1193+
step(worker);
1194+
for ts in 0..as_of {
1195+
if ts > 0 {
1196+
bindings.update_at(
1197+
Partitioned::new_singleton(0, ts - 1),
1198+
ts,
1199+
Diff::MINUS_ONE,
1200+
);
1201+
}
1202+
bindings.update_at(Partitioned::new_singleton(0, ts), ts, Diff::ONE);
1203+
bindings.advance_to(ts + 1);
1204+
bindings.flush();
1205+
step(worker);
1206+
}
1207+
});
1208+
println!("info = {info1:?}");
1209+
1210+
let info2 = allocation_counter::measure(|| {
1211+
data_cap.downgrade(&Partitioned::new_singleton(0, 900));
1212+
step(worker);
1213+
});
1214+
println!("info = {info2:?}");
1215+
assert!(info1.bytes_current + info2.bytes_current < (info1.bytes_current / 4));
1216+
},
1217+
);
1218+
}
11571219
}

0 commit comments

Comments
 (0)