Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion vortex-cxx/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl VortexScanBuilder {
}

pub(crate) fn with_limit(&mut self, limit: usize) {
take_mut::take(&mut self.inner, |inner| inner.with_limit(limit));
take_mut::take(&mut self.inner, |inner| inner.with_limit(limit as u64));
}

pub(crate) unsafe fn with_output_schema(&mut self, output_schema: *mut u8) -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub(crate) struct VortexOpener {
/// A hint for the desired row count of record batches returned from the scan.
pub batch_size: usize,
/// If provided, the scan will not return more than this many rows.
pub limit: Option<usize>,
pub limit: Option<u64>,
/// A metrics object for tracking performance of the scan.
pub metrics: VortexMetrics,
/// A shared cache of file readers.
Expand Down
2 changes: 1 addition & 1 deletion vortex-datafusion/src/persistent/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl FileSource for VortexSource {
expr_adapter_factory,
table_schema: self.table_schema.clone(),
batch_size,
limit: base_config.limit,
limit: base_config.limit.map(|l| l as u64),
metrics: partition_metrics,
layout_readers: self.layout_readers.clone(),
has_output_ordering: !base_config.output_ordering.is_empty(),
Expand Down
3 changes: 3 additions & 0 deletions vortex-python/python/vortex/_lib/file.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class VortexFile:
projection: IntoProjection = None,
*,
expr: Expr | None = None,
limit: int | None = None,
indices: Array | None = None,
batch_size: int | None = None,
) -> ArrayIterator: ...
Expand All @@ -34,6 +35,7 @@ class VortexFile:
projection: IntoProjection = None,
*,
expr: Expr | None = None,
limit: int | None = None,
indices: Array | None = None,
batch_size: int | None = None,
) -> RepeatedScan: ...
Expand All @@ -42,6 +44,7 @@ class VortexFile:
projection: IntoProjection = None,
*,
expr: Expr | None = None,
limit: int | None = None,
batch_size: int | None = None,
) -> pa.RecordBatchReader: ...
def to_dataset(self) -> VortexDataset: ...
Expand Down
17 changes: 12 additions & 5 deletions vortex-python/python/vortex/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def scan(
projection: IntoProjection = None,
*,
expr: Expr | None = None,
limit: int | None = None,
indices: Array | None = None,
batch_size: int | None = None,
) -> ArrayIterator:
Expand All @@ -94,6 +95,8 @@ def scan(
The projection expression to read, or else read all columns.
expr : :class:`vortex.Expr` | None
The predicate used to filter rows. The filter columns do not need to be in the projection.
limit : :class:`int` | None
The maximum number of rows to read after filtering. If None, read all rows.
indices : :class:`vortex.Array` | None
The indices of the rows to read. Must be sorted and non-null.
batch_size : :class:`int` | None
Expand Down Expand Up @@ -164,13 +167,14 @@ def scan(
"Mikhail"
]
"""
return self._file.scan(projection, expr=expr, indices=indices, batch_size=batch_size)
return self._file.scan(projection, expr=expr, limit=limit, indices=indices, batch_size=batch_size)

def to_repeated_scan(
self,
projection: IntoProjection = None,
*,
expr: Expr | None = None,
limit: int | None = None,
indices: Array | None = None,
batch_size: int | None = None,
) -> RepeatedScan:
Expand All @@ -187,12 +191,15 @@ def to_repeated_scan(
batch_size : :class:`int` | None
The number of rows to read per chunk.
"""
return RepeatedScan(self._file.prepare(projection, expr=expr, indices=indices, batch_size=batch_size))
return RepeatedScan(
self._file.prepare(projection, expr=expr, limit=limit, indices=indices, batch_size=batch_size)
)

def to_arrow(
self,
projection: IntoProjection = None,
*,
limit: int | None = None,
expr: Expr | None = None,
batch_size: int | None = None,
) -> RecordBatchReader:
Expand All @@ -209,7 +216,7 @@ def to_arrow(
The number of rows to read per chunk.

"""
return self._file.to_arrow(projection, expr=expr, batch_size=batch_size)
return self._file.to_arrow(projection, expr=expr, limit=limit, batch_size=batch_size)

def to_dataset(self) -> VortexDataset:
"""Scan the Vortex file using the :class:`pyarrow.dataset.Dataset` API."""
Expand All @@ -227,12 +234,12 @@ def to_polars(self) -> polars.LazyFrame:
def _io_source(
with_columns: list[str] | None,
predicate: pl.Expr | None,
_n_rows: int | None,
n_rows: int | None,
_batch_size: int | None,
) -> Iterator[pl.DataFrame]:
vx_predicate: Expr | None = None if predicate is None else polars_to_vortex(predicate)

reader = self.to_arrow(projection=with_columns, expr=vx_predicate)
reader = self.to_arrow(projection=with_columns, expr=vx_predicate, limit=n_rows)

for batch in reader:
batch = pl.DataFrame._from_arrow(batch, rechunk=False) # pyright: ignore[reportPrivateUsage]
Expand Down
20 changes: 17 additions & 3 deletions vortex-python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,19 @@ impl PyVortexFile {
PyDType::init(slf.py(), slf.get().vxf.dtype().clone())
}

#[pyo3(signature = (projection = None, *, expr = None, indices = None, batch_size = None))]
#[pyo3(signature = (projection = None, *, expr = None, limit = None, indices = None, batch_size = None))]
fn scan(
slf: Bound<Self>,
projection: Option<PyIntoProjection>,
expr: Option<PyExpr>,
limit: Option<u64>,
indices: Option<PyArrayRef>,
batch_size: Option<usize>,
) -> PyVortexResult<PyArrayIterator> {
let builder = slf.get().scan_builder(
projection.map(|p| p.0),
expr.map(|e| e.into_inner()),
limit,
indices.map(|i| i.into_inner()),
batch_size,
)?;
Expand All @@ -121,17 +123,19 @@ impl PyVortexFile {
)))
}

#[pyo3(signature = (projection = None, *, expr = None, indices = None, batch_size = None))]
#[pyo3(signature = (projection = None, *, expr = None, limit = None, indices = None, batch_size = None))]
fn prepare(
slf: Bound<Self>,
projection: Option<PyIntoProjection>,
expr: Option<PyExpr>,
limit: Option<u64>,
indices: Option<PyArrayRef>,
batch_size: Option<usize>,
) -> PyVortexResult<PyRepeatedScan> {
let builder = slf.get().scan_builder(
projection.map(|p| p.0),
expr.map(|e| e.into_inner()),
limit,
indices.map(|i| i.into_inner()),
batch_size,
)?;
Expand All @@ -144,11 +148,12 @@ impl PyVortexFile {
})
}

#[pyo3(signature = (projection = None, *, expr = None, batch_size = None))]
#[pyo3(signature = (projection = None, *, expr = None, limit = None, batch_size = None))]
fn to_arrow(
slf: Bound<Self>,
projection: Option<PyIntoProjection>,
expr: Option<PyExpr>,
limit: Option<u64>,
batch_size: Option<usize>,
) -> PyVortexResult<Py<PyAny>> {
let vxf = slf.get().vxf.clone();
Expand All @@ -159,6 +164,10 @@ impl PyVortexFile {
.with_some_filter(expr.map(|e| e.into_inner()))
.with_projection(projection.map(|p| p.0).unwrap_or_else(root));

if let Some(limit) = limit {
builder = builder.with_limit(limit);
}

if let Some(batch_size) = batch_size {
builder = builder.with_split_by(SplitBy::RowCount(batch_size));
}
Expand Down Expand Up @@ -191,6 +200,7 @@ impl PyVortexFile {
&self,
projection: Option<Expression>,
expr: Option<Expression>,
limit: Option<u64>,
indices: Option<ArrayRef>,
batch_size: Option<usize>,
) -> VortexResult<ScanBuilder<ArrayRef>> {
Expand All @@ -200,6 +210,10 @@ impl PyVortexFile {
.with_some_filter(expr)
.with_projection(projection.unwrap_or_else(root));

if let Some(limit) = limit {
builder = builder.with_limit(limit);
}

if let Some(indices) = indices {
let indices = cast(indices.as_ref(), &DType::Primitive(PType::U64, NonNullable))?
.to_primitive()
Expand Down
38 changes: 38 additions & 0 deletions vortex-python/test/test_polars_.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright the Vortex contributors

import math
import os

import polars as pl
import pyarrow as pa
import pytest

import vortex as vx
import vortex.expr as ve
from vortex.polars_ import polars_to_vortex

Expand Down Expand Up @@ -34,3 +39,36 @@
def test_exprs(polars: pl.Expr, vortex: ve.Expr):
# Dump the clickbench filters
assert polars_to_vortex(polars) == vortex


@pytest.fixture(scope="module")
def vxf(tmpdir_factory): # pyright: ignore[reportUnknownParameterType, reportMissingParameterType]
fname = tmpdir_factory.mktemp("data") / "polars_test.vortex" # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType]

if not os.path.exists(fname): # pyright: ignore[reportUnknownArgumentType]
a = pa.array([{"index": x, "value": math.sqrt(x)} for x in range(1_000_000)])
vx.io.write(vx.compress(vx.array(a)), str(fname)) # pyright: ignore[reportUnknownArgumentType]
return vx.open(str(fname), without_segment_cache=True) # pyright: ignore[reportUnknownArgumentType]


def test_to_polars_with_limit(vxf: vx.VortexFile):
df = vxf.to_polars().limit(100).collect()
assert len(df) == 100


def test_to_polars_with_filter(vxf: vx.VortexFile):
df = vxf.to_polars().filter(pl.col("index") < 500).collect()
assert len(df) == 500
assert df["index"].to_list() == list(range(500))


def test_to_polars_with_projection(vxf: vx.VortexFile):
df = vxf.to_polars().select("index").limit(10).collect()
assert df.columns == ["index"]
assert len(df) == 10


def test_to_polars_with_projection_and_filter(vxf: vx.VortexFile):
df = vxf.to_polars().select("index", "value").filter(pl.col("index") < 100).collect()
assert df.columns == ["index", "value"]
assert len(df) == 100
2 changes: 0 additions & 2 deletions vortex-scan/src/layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ impl DataSource for LayoutReaderDataSource {
}

if let Some(limit) = scan_request.limit {
// TODO(ngates): ScanBuilder limit should be u64
let limit = usize::try_from(limit)?;
builder = builder.with_limit(limit);
}

Expand Down
4 changes: 2 additions & 2 deletions vortex-scan/src/repeated_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub struct RepeatedScan<A: 'static + Send> {
/// Function to apply to each [`ArrayRef`] within the spawned split tasks.
map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
/// Maximal number of rows to read (after filtering)
limit: Option<usize>,
limit: Option<u64>,
/// The dtype of the projected arrays.
dtype: DType,
}
Expand Down Expand Up @@ -98,7 +98,7 @@ impl<A: 'static + Send> RepeatedScan<A> {
splits: Splits,
concurrency: usize,
map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
limit: Option<usize>,
limit: Option<u64>,
dtype: DType,
) -> Self {
Self {
Expand Down
4 changes: 2 additions & 2 deletions vortex-scan/src/scan_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub struct ScanBuilder<A> {
/// Should we try to prune the file (using stats) on open.
file_stats: Option<Arc<[StatsSet]>>,
/// Maximal number of rows to read (after filtering)
limit: Option<usize>,
limit: Option<u64>,
/// The row-offset assigned to the first row of the file. Used by the `row_idx` expression,
/// but not by the scan [`Selection`] which remains relative.
row_offset: u64,
Expand Down Expand Up @@ -180,7 +180,7 @@ impl<A: 'static + Send> ScanBuilder<A> {
self
}

pub fn with_limit(mut self, limit: usize) -> Self {
pub fn with_limit(mut self, limit: u64) -> Self {
self.limit = Some(limit);
self
}
Expand Down
9 changes: 6 additions & 3 deletions vortex-scan/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub type TaskFuture<A> = BoxFuture<'static, VortexResult<A>>;
pub(super) fn split_exec<A: 'static + Send>(
ctx: Arc<TaskContext<A>>,
split: Range<u64>,
limit: Option<&mut usize>,
limit: Option<&mut u64>,
) -> VortexResult<TaskFuture<Option<A>>> {
// Apply the selection to calculate a read mask
let read_mask = ctx.selection.row_mask(&split);
Expand All @@ -55,8 +55,11 @@ pub(super) fn split_exec<A: 'static + Send>(
Some(l) if *l == 0 => Mask::new_false(row_mask.len()),
Some(l) => {
let true_count = row_mask.true_count();
let row_mask = row_mask.limit(*l);
*l -= usize::min(*l, true_count);
let mask_limit = usize::try_from(*l)
.map(|l| l.min(true_count))
.unwrap_or(true_count);
let row_mask = row_mask.limit(mask_limit);
*l -= mask_limit as u64;
row_mask
}
None => row_mask,
Expand Down
Loading