Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 98 additions & 7 deletions crates/commitlog/src/commitlog.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
use std::{fmt::Debug, io, marker::PhantomData, mem, ops::Range, vec};
use std::{
fmt::Debug,
io,
marker::PhantomData,
mem,
ops::{Range, RangeBounds},
vec,
};

use itertools::Itertools;
use log::{debug, info, trace, warn};
Expand Down Expand Up @@ -301,7 +308,22 @@ impl<R: Repo, T: Encode> Generic<R, T> {
D: Decoder,
D::Error: From<error::Traversal>,
{
fold_transactions_internal(self.commits_from(offset).with_log_format_version(), decoder, offset)
fold_transactions_internal(self.commits_from(offset).with_log_format_version(), decoder, offset..)
}

pub fn fold_transaction_range<D>(&self, range: impl RangeBounds<u64>, decoder: D) -> Result<(), D::Error>
where
D: Decoder,
D::Error: From<error::Traversal>,
{
use std::ops::Bound::*;

let start = match range.start_bound() {
Included(x) => *x,
Excluded(x) => x + 1,
Unbounded => 0,
};
fold_transactions_internal(self.commits_from(start).with_log_format_version(), decoder, range)
}
}

Expand Down Expand Up @@ -387,8 +409,29 @@ where
D: Decoder,
D::Error: From<error::Traversal> + From<io::Error>,
{
let commits = commits_from(repo, max_log_format_version, offset)?;
fold_transactions_internal(commits.with_log_format_version(), de, offset)
fold_transaction_range(repo, max_log_format_version, offset.., de)
}

pub fn fold_transaction_range<R, D>(
repo: R,
max_log_format_version: u8,
range: impl RangeBounds<u64>,
de: D,
) -> Result<(), D::Error>
where
R: Repo,
D: Decoder,
D::Error: From<error::Traversal> + From<io::Error>,
{
use std::ops::Bound::*;

let start = match range.start_bound() {
Included(x) => *x,
Excluded(x) => x + 1,
Unbounded => 0,
};
let commits = commits_from(repo, max_log_format_version, start)?;
fold_transactions_internal(commits.with_log_format_version(), de, range)
}

fn transactions_from_internal<'a, R, D, T>(
Expand All @@ -409,12 +452,38 @@ where
.map(|x| x.and_then(|y| y))
}

fn fold_transactions_internal<R, D>(mut commits: CommitsWithVersion<R>, de: D, from: u64) -> Result<(), D::Error>
fn fold_transactions_internal<R, D>(
mut commits: CommitsWithVersion<R>,
de: D,
range: impl RangeBounds<u64>,
) -> Result<(), D::Error>
where
R: Repo,
D: Decoder,
D::Error: From<error::Traversal>,
{
use std::ops::Bound::*;

// Avoid reading the first commit if it wouldn't be in the range anyway.
if range_is_empty(&range) {
return Ok(());
}

// `true` if `offset` is outside `range`, s.t. it is smaller than the start
// bound.
let before_start = |offset: &u64| match range.start_bound() {
Included(x) => offset < x,
Excluded(x) => offset <= x,
Unbounded => false,
};
// `true` if `offset` is outside `range`, s.t. it is greater than the end
// bound.
let past_end = |offset: &u64| match range.end_bound() {
Included(x) => offset > x,
Excluded(x) => offset >= x,
Unbounded => false,
};

while let Some(commit) = commits.next() {
let (version, commit) = match commit {
Ok(version_and_commit) => version_and_commit,
Expand All @@ -434,15 +503,18 @@ where
trace!("commit {} n={} version={}", commit.min_tx_offset, commit.n, version);

let max_tx_offset = commit.min_tx_offset + commit.n as u64;
if max_tx_offset <= from {
// Skip if no transaction in the commit is in range.
if before_start(&max_tx_offset) {
continue;
}

let records = &mut commit.records.as_slice();
for n in 0..commit.n {
let tx_offset = commit.min_tx_offset + n as u64;
if tx_offset < from {
if before_start(&tx_offset) {
de.skip_record(version, tx_offset, records)?;
} else if past_end(&tx_offset) {
return Ok(());
} else {
de.consume_record(version, tx_offset, records)?;
}
Expand Down Expand Up @@ -809,6 +881,25 @@ fn try_seek_using_offset_index<R: Repo>(
.map(|pos| (index, pos))
}

// `range_bounds_is_empty` https://github.com/rust-lang/rust/issues/137300
//
// This is correct for integers, but unsound for arbitrary `T`, so unlikely to
// be stabilized.
fn range_is_empty(range: &impl RangeBounds<u64>) -> bool {
use std::ops::Bound::*;

#[rustfmt::skip]
let not_empty = match (range.start_bound(), range.end_bound()) {
(Unbounded, _) | (_, Unbounded) => true,
(Included(start), Excluded(end))
| (Excluded(start), Included(end))
| (Excluded(start), Excluded(end)) => start < end,
(Included(start), Included(end)) => start <= end,
};

!not_empty
}

#[cfg(test)]
mod tests {
use std::{cell::Cell, iter::repeat};
Expand Down
9 changes: 9 additions & 0 deletions crates/commitlog/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
io,
num::{NonZeroU16, NonZeroU64},
ops::RangeBounds,
sync::RwLock,
};

Expand Down Expand Up @@ -609,3 +610,11 @@ where
{
commitlog::fold_transactions_from(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, offset, de)
}

pub fn fold_transaction_range<D>(root: CommitLogDir, range: impl RangeBounds<u64>, de: D) -> Result<(), D::Error>
where
D: Decoder,
D::Error: From<error::Traversal> + From<io::Error>,
{
commitlog::fold_transaction_range(repo::Fs::new(root)?, DEFAULT_LOG_FORMAT_VERSION, range, de)
}
Loading