Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion vortex-file/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use std::ops::Range;
use std::sync::Arc;

use itertools::Itertools;
use vortex_array::ArrayRef;
use vortex_array::stats::StatsSet;
use vortex_dtype::{DType, Field, FieldMask, FieldPath, FieldPathSet};
Expand Down Expand Up @@ -136,6 +137,11 @@ impl VortexFile {
}

pub fn splits(&self) -> VortexResult<Vec<Range<u64>>> {
SplitBy::Layout.splits(self.layout_reader()?.as_ref(), &[FieldMask::All])
Ok(SplitBy::Layout
.splits(self.layout_reader()?.as_ref(), &[FieldMask::All])?
.into_iter()
.tuple_windows()
.map(|(start, end)| start..end)
.collect())
}
}
34 changes: 23 additions & 11 deletions vortex-scan/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::cmp;
use std::collections::BTreeSet;
use std::ops::Range;
use std::sync::Arc;
use std::{cmp, iter};

use futures::future::BoxFuture;
use itertools::Itertools;
Expand Down Expand Up @@ -326,7 +327,7 @@ pub struct RepeatedScan<A: 'static + Send> {
/// The selection mask to apply to the selected row range.
selection: Selection,
/// The natural splits of the file.
splits: Vec<Range<u64>>,
splits: BTreeSet<u64>,
/// The number of splits to make progress on concurrently **per-thread**.
concurrency: usize,
/// Function to apply to each [`ArrayRef`] within the spawned split tasks.
Expand All @@ -342,27 +343,38 @@ impl<A: 'static + Send> RepeatedScan<A> {
&self,
row_range: Option<Range<u64>>,
) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
let row_range = intersect_ranges(self.row_range.as_ref(), row_range);

let ctx = Arc::new(TaskContext {
row_range,
selection: self.selection.clone(),
filter: self.filter.clone().map(|f| Arc::new(FilterExpr::new(f))),
reader: self.layout_reader.clone(),
projection: self.projection.clone(),
mapper: self.map_fn.clone(),
});

let row_range = intersect_ranges(self.row_range.as_ref(), row_range);
let splits_iter: Box<dyn Iterator<Item = _>> = match row_range {
None => Box::new(self.splits.iter().copied()),
Some(range) => {
if range.start > range.end {
return Ok(Vec::new());
}
Box::new(
iter::once(range.start)
.chain(self.splits.range(range.clone()).copied())
.chain(iter::once(range.end)),
)
}
};

// Create a task that executes the full scan pipeline for each split.
let mut limit = self.limit;
let split_tasks = self
.splits
.iter()
.filter_map(|split_range| {
if limit.is_some_and(|l| l == 0) {
let split_tasks = splits_iter
.tuple_windows()
.filter_map(|(start, end)| {
if limit.is_some_and(|l| l == 0) || start >= end {
None
} else {
Some(split_exec(ctx.clone(), split_range.clone(), limit.as_mut()))
Some(split_exec(ctx.clone(), start..end, limit.as_mut()))
}
})
.try_collect()?;
Expand Down
22 changes: 5 additions & 17 deletions vortex-scan/src/split_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::collections::BTreeSet;
use std::ops::Range;
use std::iter::once;

use itertools::Itertools;
use vortex_array::stats::StatBound;
use vortex_dtype::FieldMask;
use vortex_error::{VortexResult, vortex_err};
Expand All @@ -31,32 +30,21 @@ impl SplitBy {
&self,
layout_reader: &dyn LayoutReader,
field_mask: &[FieldMask],
) -> VortexResult<Vec<Range<u64>>> {
) -> VortexResult<BTreeSet<u64>> {
Ok(match *self {
SplitBy::Layout => {
let mut row_splits = BTreeSet::<u64>::new();
row_splits.insert(0);

// Register the splits for all the layouts.
layout_reader.register_splits(field_mask, 0, &mut row_splits)?;

row_splits
.into_iter()
.tuple_windows()
.map(|(start, end)| start..end)
.collect()
}
SplitBy::RowCount(n) => {
let row_count = *layout_reader.row_count().to_exact().ok_or_else(|| {
vortex_err!("Cannot split layout by row count, row count is not exact")
})?;
let mut splits =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<3 I always found this a weird location for this logic

Vec::with_capacity(usize::try_from((row_count + n as u64) / n as u64)?);
for start in (0..row_count).step_by(n) {
let end = (start + n as u64).min(row_count);
splits.push(start..end);
}
splits
(0..row_count).step_by(n).chain(once(row_count)).collect()
}
})
}
Expand Down Expand Up @@ -103,7 +91,7 @@ mod test {
let splits = SplitBy::Layout
.splits(reader.as_ref(), &[FieldMask::Exact(FieldPath::root())])
.unwrap();
assert_eq!(splits, vec![0..10]);
assert_eq!(splits, [0, 10].into_iter().collect());
}

#[test]
Expand Down Expand Up @@ -132,6 +120,6 @@ mod test {
let splits = SplitBy::RowCount(3)
.splits(reader.as_ref(), &[FieldMask::Exact(FieldPath::root())])
.unwrap();
assert_eq!(splits, vec![0..3, 3..6, 6..9, 9..10]);
assert_eq!(splits, [0, 3, 6, 9, 10].into_iter().collect());
}
}
20 changes: 1 addition & 19 deletions vortex-scan/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,8 @@ pub(super) fn split_exec<A: 'static + Send>(
split: Range<u64>,
limit: Option<&mut usize>,
) -> VortexResult<TaskFuture<Option<A>>> {
// Step 1: using the caller-provided row range and selection, attempt to disregard this split.
let read_range = match &ctx.row_range {
None => split,
Some(row_range) => {
if row_range.start >= split.end || row_range.end < split.start {
// No overlap for this task
return Ok(ok(None).boxed());
}

let intersect_start = row_range.start.max(split.start);
let intersect_end = row_range.end.min(split.end);
intersect_start..intersect_end
}
};

// Apply the selection to calculate a read mask
let read_mask = ctx.selection.row_mask(&read_range);
let read_mask = ctx.selection.row_mask(&split);
let row_range = read_mask.row_range();
let row_mask = read_mask.mask().clone();
if row_mask.all_false() {
Expand Down Expand Up @@ -167,9 +152,6 @@ pub(super) fn split_exec<A: 'static + Send>(

/// Information needed to execute a single split task.
pub(super) struct TaskContext<A> {
/// A caller-provided range of the file to read. All tasks should intersect their reads
/// with this range to ensure that they are split as well.
pub(super) row_range: Option<Range<u64>>,
/// A row selection to apply.
pub(super) selection: Selection,
/// The shared filter expression.
Expand Down
Loading