Skip to content

Commit

Permalink
feat: filter rows from mem range
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Oct 30, 2024
1 parent 9b01654 commit 35e93cf
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 5 deletions.
53 changes: 53 additions & 0 deletions src/mito2/src/engine/prune_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,56 @@ async fn test_prune_memtable_complex_expr() {
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}

#[tokio::test]
async fn test_mem_range_prune() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();

let column_schemas = rows_schema(&request);

engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

put_rows(
&engine,
region_id,
Rows {
schema: column_schemas.clone(),
rows: build_rows(5, 8),
},
)
.await;

// Starts scan and gets the memtable time range.
let stream = engine
.scan_to_stream(region_id, ScanRequest::default())
.await
.unwrap();

put_rows(
&engine,
region_id,
Rows {
schema: column_schemas.clone(),
rows: build_rows(10, 12),
},
)
.await;

let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 6 | 6.0 | 1970-01-01T00:00:06 |
| 7 | 7.0 | 1970-01-01T00:00:07 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
8 changes: 6 additions & 2 deletions src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ pub use crate::memtable::key_values::KeyValues;
use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::metrics::WRITE_BUFFER_BYTES;
use crate::read::prune::PruneTimeIterator;
use crate::read::Batch;
use crate::region::options::{MemtableOptions, MergeMode};
use crate::sst::file::FileTimeRange;

pub mod bulk;
pub mod key_values;
Expand Down Expand Up @@ -355,8 +357,10 @@ impl MemtableRange {
}

/// Builds an iterator to read the range.
pub fn build_iter(&self) -> Result<BoxedBatchIterator> {
self.context.builder.build()
/// Filters the result by the specific time range.
pub fn build_iter(&self, time_range: FileTimeRange) -> Result<BoxedBatchIterator> {
let iter = self.context.builder.build()?;
Ok(Box::new(PruneTimeIterator::new(iter, time_range)))
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/mito2/src/read/scan_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::error::Result;
use crate::read::range::RowGroupIndex;
use crate::read::scan_region::StreamContext;
use crate::read::{Batch, ScannerMetrics, Source};
use crate::sst::file::FileTimeRange;
use crate::sst::parquet::reader::ReaderMetrics;

struct PartitionMetricsInner {
Expand Down Expand Up @@ -128,13 +129,14 @@ pub(crate) fn scan_mem_ranges(
stream_ctx: Arc<StreamContext>,
part_metrics: PartitionMetrics,
index: RowGroupIndex,
time_range: FileTimeRange,
) -> impl Stream<Item = Result<Batch>> {
try_stream! {
let ranges = stream_ctx.build_mem_ranges(index);
part_metrics.inc_num_mem_ranges(ranges.len());
for range in ranges {
let build_reader_start = Instant::now();
let iter = range.build_iter()?;
let iter = range.build_iter(time_range)?;
part_metrics.inc_build_reader_cost(build_reader_start.elapsed());

let mut source = Source::Iter(iter);
Expand Down
7 changes: 6 additions & 1 deletion src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,12 @@ fn build_sources(
sources.reserve(range_meta.row_group_indices.len());
for index in &range_meta.row_group_indices {
let stream = if stream_ctx.is_mem_range_index(*index) {
let stream = scan_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
let stream = scan_mem_ranges(
stream_ctx.clone(),
part_metrics.clone(),
*index,
range_meta.time_range,
);
Box::pin(stream) as _
} else {
let read_type = if compaction {
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/read/unordered_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl UnorderedScan {
let range_meta = &stream_ctx.ranges[part_range_id];
for index in &range_meta.row_group_indices {
if stream_ctx.is_mem_range_index(*index) {
let stream = scan_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
let stream = scan_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index, range_meta.time_range);
for await batch in stream {
yield batch;
}
Expand Down

0 comments on commit 35e93cf

Please sign in to comment.