Skip to content

Commit

Permalink
feat: add an iter to prune by time range
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Oct 30, 2024
1 parent 9712295 commit 9b01654
Show file tree
Hide file tree
Showing 2 changed files with 217 additions and 10 deletions.
217 changes: 217 additions & 0 deletions src/mito2/src/read/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_time::Timestamp;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::BooleanVectorBuilder;

use crate::error::Result;
use crate::memtable::BoxedBatchIterator;
use crate::read::last_row::RowGroupLastRowCachedReader;
use crate::read::{Batch, BatchReader};
use crate::sst::file::FileTimeRange;
use crate::sst::parquet::file_range::FileRangeContextRef;
use crate::sst::parquet::reader::{ReaderMetrics, RowGroupReader};

Expand Down Expand Up @@ -112,3 +118,214 @@ impl PruneReader {
}
}
}

/// An iterator that prunes batches by time range.
pub(crate) struct PruneTimeIterator {
iter: BoxedBatchIterator,
time_range: FileTimeRange,
}

impl PruneTimeIterator {
/// Creates a new `PruneTimeIterator` with the given iterator and time range.
pub(crate) fn new(iter: BoxedBatchIterator, time_range: FileTimeRange) -> Self {
Self { iter, time_range }
}

/// Prune batch by time range.
fn prune(&self, mut batch: Batch) -> Result<Batch> {
if batch.is_empty() {
return Ok(batch);
}

// fast path, the batch is within the time range.
// Note that the time range is inclusive.
if self.time_range.0 <= batch.first_timestamp().unwrap()
&& batch.last_timestamp().unwrap() <= self.time_range.1
{
return Ok(batch);
}

// slow path, prune the batch by time range.
// Note that the timestamp precision may be different from the time range.
// Safety: We know this is the timestamp type.
let unit = batch
.timestamps()
.data_type()
.as_timestamp()
.unwrap()
.unit();
let mut filter_builder = BooleanVectorBuilder::with_capacity(batch.timestamps().len());
let timestamps = batch.timestamps_native().unwrap();
for ts in timestamps {
let ts = Timestamp::new(*ts, unit);
if self.time_range.0 <= ts && ts <= self.time_range.1 {
filter_builder.push(Some(true));
} else {
filter_builder.push(Some(false));
}
}
let filter = filter_builder.finish();

batch.filter(&filter)?;
Ok(batch)
}

// Prune and return the next non-empty batch.
fn next_non_empty_batch(&mut self) -> Result<Option<Batch>> {
while let Some(batch) = self.iter.next() {
let batch = batch?;
let pruned_batch = self.prune(batch)?;
if !pruned_batch.is_empty() {
return Ok(Some(pruned_batch));
}
}
Ok(None)
}
}

impl Iterator for PruneTimeIterator {
type Item = Result<Batch>;

fn next(&mut self) -> Option<Self::Item> {
self.next_non_empty_batch().transpose()
}
}

#[cfg(test)]
mod tests {
use api::v1::OpType;

use super::*;
use crate::test_util::new_batch;

#[test]
fn test_prune_time_iter_empty() {
let input = [];
let iter = input.into_iter().map(Ok);
let iter = PruneTimeIterator::new(
Box::new(iter),
(
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(1000),
),
);
let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
assert!(actual.is_empty());
}

#[test]
fn test_prune_time_iter_filter() {
let input = [
new_batch(
b"k1",
&[10, 11],
&[20, 20],
&[OpType::Put, OpType::Put],
&[110, 111],
),
new_batch(
b"k1",
&[15, 16],
&[20, 20],
&[OpType::Put, OpType::Put],
&[115, 116],
),
new_batch(
b"k1",
&[17, 18],
&[20, 20],
&[OpType::Put, OpType::Put],
&[117, 118],
),
];

let iter = input.clone().into_iter().map(Ok);
let iter = PruneTimeIterator::new(
Box::new(iter),
(
Timestamp::new_millisecond(10),
Timestamp::new_millisecond(15),
),
);
let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
assert_eq!(
actual,
[
new_batch(
b"k1",
&[10, 11],
&[20, 20],
&[OpType::Put, OpType::Put],
&[110, 111],
),
new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
]
);

let iter = input.clone().into_iter().map(Ok);
let iter = PruneTimeIterator::new(
Box::new(iter),
(
Timestamp::new_millisecond(11),
Timestamp::new_millisecond(20),
),
);
let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
assert_eq!(
actual,
[
new_batch(b"k1", &[11], &[20], &[OpType::Put], &[111],),
new_batch(
b"k1",
&[15, 16],
&[20, 20],
&[OpType::Put, OpType::Put],
&[115, 116],
),
new_batch(
b"k1",
&[17, 18],
&[20, 20],
&[OpType::Put, OpType::Put],
&[117, 118],
),
]
);

let iter = input.into_iter().map(Ok);
let iter = PruneTimeIterator::new(
Box::new(iter),
(
Timestamp::new_millisecond(10),
Timestamp::new_millisecond(18),
),
);
let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
assert_eq!(
actual,
[
new_batch(
b"k1",
&[10, 11],
&[20, 20],
&[OpType::Put, OpType::Put],
&[110, 111],
),
new_batch(
b"k1",
&[15, 16],
&[20, 20],
&[OpType::Put, OpType::Put],
&[115, 116],
),
new_batch(
b"k1",
&[17, 18],
&[20, 20],
&[OpType::Put, OpType::Put],
&[117, 118],
),
]
);
}
}
10 changes: 0 additions & 10 deletions src/mito2/src/test_util/memtable_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,16 +124,6 @@ impl MemtableBuilder for EmptyMemtableBuilder {
}
}

/// Empty iterator builder.
#[derive(Default)]
pub(crate) struct EmptyIterBuilder {}

impl IterBuilder for EmptyIterBuilder {
fn build(&self) -> Result<BoxedBatchIterator> {
Ok(Box::new(std::iter::empty()))
}
}

/// Creates a region metadata to test memtable with default pk.
///
/// The schema is `k0, k1, ts, v0, v1` and pk is `k0, k1`.
Expand Down

0 comments on commit 9b01654

Please sign in to comment.