Skip to content

Commit 1f86133

Browse files
authored
Merge pull request #25298 from danhhz/persist_ts_rewrite
persist: add efficient rewrite of timestamp in staged batches
2 parents 2f879a6 + fbb44b4 commit 1f86133

22 files changed

+757
-161
lines changed

src/persist-client/build.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,14 @@ fn main() {
5151
".mz_persist_client.internal.service.ProtoPushDiff",
5252
]);
5353

54+
// Setting `emit_rerun_if_changed(false)` below causes tonic to entirely
55+
// skip emitting a "rerun-if-changed", which results in us getting the
56+
// default behavior for a build script: to invalidate when any file in the
57+
// crate changes. This breaks the fast iteration cycle of datadriven tests
58+
// (i.e. touching a tests/ file results in an unnecessary recompile). Fix by
59+
// only rerunning this build script if something changes in src/ because all
60+
// mz_persist_client protos are in there.
61+
println!("cargo:rerun-if-changed=src/");
5462
tonic_build::configure()
5563
// Enabling `emit_rerun_if_changed` will rerun the build script when
5664
// anything in the include directory (..) changes. This causes quite a

src/persist-client/src/batch.rs

Lines changed: 95 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,37 @@ where
144144
self.batch.desc.lower()
145145
}
146146

147+
/// Efficiently rewrites the timestamps in this not-yet-committed batch.
148+
///
149+
/// This [Batch] represents potentially large amounts of data, which may
150+
/// have partly or entirely been spilled to s3. This call bulk edits the
151+
/// timestamps of all data in this batch in a metadata-only operation (i.e.
152+
/// without network calls).
153+
///
154+
/// Specifically, every timestamp in the batch is logically advanced_by the
155+
/// provided `frontier`.
156+
///
157+
/// This method may be called multiple times, with later calls overriding
158+
/// previous ones, but the rewrite frontier may not regress across calls.
159+
///
160+
/// When this batch was created, it was given an `upper`, which bounds the
161+
/// staged data it represents. To allow rewrite past this original `upper`,
162+
/// this call accepts a new `upper` which replaces the previous one. Like
163+
/// the rewrite frontier, the upper may not regress across calls.
164+
///
165+
/// Multiple batches with various rewrite frontiers may be used in a single
166+
/// [crate::write::WriteHandle::compare_and_append_batch] call. This is an
167+
/// expected usage.
168+
pub fn rewrite_ts(
169+
&mut self,
170+
frontier: &Antichain<T>,
171+
new_upper: Antichain<T>,
172+
) -> Result<(), InvalidUsage<T>> {
173+
self.batch
174+
.rewrite_ts(frontier, new_upper)
175+
.map_err(InvalidUsage::InvalidRewrite)
176+
}
177+
147178
/// Marks the blobs that this batch handle points to as consumed, likely
148179
/// because they were appended to a shard.
149180
///
@@ -771,8 +802,8 @@ pub(crate) struct BatchParts<T> {
771802
lower: Antichain<T>,
772803
blob: Arc<dyn Blob + Send + Sync>,
773804
isolated_runtime: Arc<IsolatedRuntime>,
774-
writing_parts: VecDeque<JoinHandle<HollowBatchPart>>,
775-
finished_parts: Vec<HollowBatchPart>,
805+
writing_parts: VecDeque<JoinHandle<HollowBatchPart<T>>>,
806+
finished_parts: Vec<HollowBatchPart<T>>,
776807
batch_metrics: BatchWriteMetrics,
777808
}
778809

@@ -906,6 +937,7 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
906937
encoded_size_bytes: payload_len,
907938
key_lower,
908939
stats,
940+
ts_rewrite: None,
909941
}
910942
}
911943
.instrument(write_span),
@@ -927,7 +959,7 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
927959
}
928960

929961
#[instrument(level = "debug", name = "batch::finish_upload", fields(shard = %self.shard_id))]
930-
pub(crate) async fn finish(self) -> Vec<HollowBatchPart> {
962+
pub(crate) async fn finish(self) -> Vec<HollowBatchPart<T>> {
931963
let mut parts = self.finished_parts;
932964
for handle in self.writing_parts {
933965
let part = handle.wait_and_assert_finished().await;
@@ -938,9 +970,39 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
938970
}
939971

940972
pub(crate) fn validate_truncate_batch<T: Timestamp>(
941-
batch: &Description<T>,
973+
batch: &HollowBatch<T>,
942974
truncate: &Description<T>,
975+
any_batch_rewrite: bool,
943976
) -> Result<(), InvalidUsage<T>> {
977+
// If rewrite_ts is used, we don't allow truncation, to keep things simpler
978+
// to reason about.
979+
if any_batch_rewrite {
980+
// We allow a new upper to be specified at rewrite time, so that's easy:
981+
// it must match exactly. This is both consistent with the upper
982+
// requirement below and proves that there is no data to truncate past
983+
// the upper.
984+
if truncate.upper() != batch.desc.upper() {
985+
return Err(InvalidUsage::InvalidRewrite(format!(
986+
"rewritten batch might have data past {:?} up to {:?}",
987+
truncate.upper().elements(),
988+
batch.desc.upper().elements(),
989+
)));
990+
}
991+
// To prove that there is no data to truncate below the lower, require
992+
// that the lower is <= the rewrite ts.
993+
for part in batch.parts.iter() {
994+
let part_lower_bound = part.ts_rewrite.as_ref().unwrap_or(batch.desc.lower());
995+
if !PartialOrder::less_equal(truncate.lower(), part_lower_bound) {
996+
return Err(InvalidUsage::InvalidRewrite(format!(
997+
"rewritten batch might have data below {:?} at {:?}",
998+
truncate.lower().elements(),
999+
part_lower_bound.elements(),
1000+
)));
1001+
}
1002+
}
1003+
}
1004+
1005+
let batch = &batch.desc;
9441006
if !PartialOrder::less_equal(batch.lower(), truncate.lower())
9451007
|| PartialOrder::less_than(batch.upper(), truncate.upper())
9461008
{
@@ -958,7 +1020,7 @@ pub(crate) fn validate_truncate_batch<T: Timestamp>(
9581020
mod tests {
9591021
use crate::cache::PersistClientCache;
9601022
use crate::internal::paths::{BlobKey, PartialBlobKey};
961-
use crate::tests::{all_ok, CodecProduct};
1023+
use crate::tests::{all_ok, new_test_client, CodecProduct};
9621024
use crate::PersistLocation;
9631025

9641026
use super::*;
@@ -1167,4 +1229,32 @@ mod tests {
11671229
assert!(untrimmable.should_retain("ww-XYZ"));
11681230
assert!(!untrimmable.should_retain("xya"));
11691231
}
1232+
1233+
// NB: Most edge cases are exercised in datadriven tests.
1234+
#[mz_ore::test(tokio::test)]
1235+
#[cfg_attr(miri, ignore)] // too slow
1236+
async fn rewrite_ts_example() {
1237+
let client = new_test_client().await;
1238+
let (mut write, read) = client
1239+
.expect_open::<String, (), u64, i64>(ShardId::new())
1240+
.await;
1241+
1242+
let mut batch = write.builder(Antichain::from_elem(0));
1243+
batch.add(&"foo".to_owned(), &(), &0, &1).await.unwrap();
1244+
let batch = batch.finish(Antichain::from_elem(1)).await.unwrap();
1245+
1246+
// Roundtrip through a transmittable batch.
1247+
let batch = batch.into_transmittable_batch();
1248+
let mut batch = write.batch_from_transmittable_batch(batch);
1249+
batch
1250+
.rewrite_ts(&Antichain::from_elem(2), Antichain::from_elem(3))
1251+
.unwrap();
1252+
write
1253+
.expect_compare_and_append_batch(&mut [&mut batch], 0, 3)
1254+
.await;
1255+
1256+
let (actual, _) = read.expect_listen(0).await.read_until(&3).await;
1257+
let expected = vec![(((Ok("foo".to_owned())), Ok(())), 2, 1)];
1258+
assert_eq!(actual, expected);
1259+
}
11701260
}

src/persist-client/src/cli/inspect.rs

Lines changed: 44 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use crate::internal::encoding::{Rollup, UntypedState};
3939
use crate::internal::paths::{
4040
BlobKey, BlobKeyPrefix, PartialBatchKey, PartialBlobKey, PartialRollupKey, WriterKey,
4141
};
42-
use crate::internal::state::{ProtoRollup, ProtoStateDiff, State};
42+
use crate::internal::state::{HollowBatchPart, ProtoRollup, ProtoStateDiff, State};
4343
use crate::rpc::NoopPubSubSender;
4444
use crate::usage::{HumanBytes, StorageUsageClient};
4545
use crate::{Metrics, PersistClient, PersistConfig, ShardId};
@@ -338,16 +338,28 @@ pub async fn blob_batch_part(
338338
let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
339339
let blob = make_blob(&cfg, blob_uri, NO_COMMIT, Arc::clone(&metrics)).await?;
340340

341-
let key = PartialBatchKey(partial_key).complete(&shard_id);
342-
let part = blob
343-
.get(&*key)
341+
let key = PartialBatchKey(partial_key);
342+
let buf = blob
343+
.get(&*key.complete(&shard_id))
344344
.await
345345
.expect("blob exists")
346346
.expect("part exists");
347-
let part = BlobTraceBatchPart::<u64>::decode(&part, &metrics.columnar).expect("decodable");
348-
let desc = part.desc.clone();
349-
350-
let encoded_part = EncodedPart::new(&*key, part.desc.clone(), part);
347+
let parsed = BlobTraceBatchPart::<u64>::decode(&buf, &metrics.columnar).expect("decodable");
348+
let desc = parsed.desc.clone();
349+
350+
let part = HollowBatchPart {
351+
key,
352+
encoded_size_bytes: 0,
353+
key_lower: vec![],
354+
stats: None,
355+
ts_rewrite: None,
356+
};
357+
let encoded_part = EncodedPart::new(
358+
metrics.read.snapshot.clone(),
359+
parsed.desc.clone(),
360+
&part,
361+
parsed,
362+
);
351363
let mut out = BatchPartOutput {
352364
desc,
353365
updates: Vec::new(),
@@ -382,29 +394,30 @@ async fn consolidated_size(args: &StateArgs) -> Result<(), anyhow::Error> {
382394
let as_of = state.upper().borrow();
383395

384396
let mut updates = Vec::new();
385-
for part in state
386-
.collections
387-
.trace
388-
.batches()
389-
.into_iter()
390-
.flat_map(|x| x.parts.iter())
391-
{
392-
let key = part.key.complete(&shard_id);
393-
tracing::info!("fetching {}", key);
394-
let part = state_versions
395-
.blob
396-
.get(&*key)
397-
.await
398-
.expect("blob exists")
399-
.expect("part exists");
400-
let part = BlobTraceBatchPart::<u64>::decode(&part, &state_versions.metrics.columnar)
401-
.expect("decodable");
402-
let encoded_part = EncodedPart::new(&*key, part.desc.clone(), part);
403-
let mut cursor = Cursor::default();
404-
while let Some((k, v, mut t, d)) = cursor.pop(&encoded_part) {
405-
t.advance_by(as_of);
406-
let d = <i64 as Codec64>::decode(d);
407-
updates.push(((k.to_owned(), v.to_owned()), t, d));
397+
for batch in state.collections.trace.batches() {
398+
for part in batch.parts.iter() {
399+
let key = part.key.complete(&shard_id);
400+
tracing::info!("fetching {}", key);
401+
let buf = state_versions
402+
.blob
403+
.get(&*key)
404+
.await
405+
.expect("blob exists")
406+
.expect("part exists");
407+
let parsed = BlobTraceBatchPart::<u64>::decode(&buf, &state_versions.metrics.columnar)
408+
.expect("decodable");
409+
let encoded_part = EncodedPart::new(
410+
state_versions.metrics.read.snapshot.clone(),
411+
batch.desc.clone(),
412+
part,
413+
parsed,
414+
);
415+
let mut cursor = Cursor::default();
416+
while let Some((k, v, mut t, d)) = cursor.pop(&encoded_part) {
417+
t.advance_by(as_of);
418+
let d = <i64 as Codec64>::decode(d);
419+
updates.push(((k.to_owned(), v.to_owned()), t, d));
420+
}
408421
}
409422
}
410423

src/persist-client/src/error.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ pub enum InvalidUsage<T> {
7878
},
7979
/// The requested codecs don't match the actual ones in durable storage.
8080
CodecMismatch(Box<CodecMismatch>),
81+
/// An invalid usage of [crate::batch::Batch::rewrite_ts].
82+
InvalidRewrite(String),
8183
}
8284

8385
impl<T: Debug> std::fmt::Display for InvalidUsage<T> {
@@ -123,8 +125,8 @@ impl<T: Debug> std::fmt::Display for InvalidUsage<T> {
123125
"finalized without fully advancing since {since:?} and upper {upper:?}"
124126
)
125127
}
126-
127128
InvalidUsage::CodecMismatch(err) => std::fmt::Display::fmt(err, f),
129+
InvalidUsage::InvalidRewrite(err) => write!(f, "invalid rewrite: {err}"),
128130
}
129131
}
130132
}

0 commit comments

Comments
 (0)