Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 63 additions & 8 deletions vortex-python/src/io.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use arrow_array::RecordBatchReader;
use arrow_array::ffi_stream::ArrowArrayStreamReader;
use arrow_pyarrow::FromPyArrow;
use pyo3::exceptions::PyTypeError;
use pyo3::prelude::*;
use pyo3::pyfunction;
use tokio::fs::File;
use vortex::arrow::FromArrowArray;
use vortex::dtype::DType;
use vortex::dtype::arrow::FromArrowType;
use vortex::error::{VortexError, VortexResult};
use vortex::file::VortexWriteOptions;
use vortex::iter::{ArrayIterator, ArrayIteratorExt};
use vortex::{Canonical, IntoArray};
use vortex::iter::{ArrayIterator, ArrayIteratorAdapter, ArrayIteratorExt};
use vortex::{ArrayRef, Canonical, IntoArray};

use crate::arrays::{PyArray, PyArrayRef};
use crate::dataset::PyVortexDataset;
Expand Down Expand Up @@ -81,16 +88,17 @@ pub fn read_url<'py>(
///
/// Parameters
/// ----------
/// array : :class:`~vortex.Array`
/// The array. Must be an array of structures.
/// iter : :class:`~vortex.Array` | :class:`~vortex.ArrayIterator` | :class:`pyarrow.Table` | :class:`pyarrow.RecordBatchReader`
/// The data to write. Can be a single array, an array iterator, or a PyArrow object that supports streaming.
/// When using PyArrow objects, data is streamed directly without loading the entire dataset into memory.
///
/// f : :class:`str`
/// path : :class:`str`
/// The file path.
///
/// Examples
/// --------
///
/// Write the array `a` to the local file `a.vortex`.
/// Write a single Vortex array `a` to the local file `a.vortex`.
///
/// >>> import vortex as vx
/// >>> a = vx.array([
Expand All @@ -100,7 +108,21 @@ pub fn read_url<'py>(
/// ... {'x': 11},
/// ... {'x': None},
/// ... ])
/// >>> vx.io.write(a, "a.vortex")
/// >>> vx.io.write(a, "a.vortex") # doctest: +SKIP
///
/// Stream a PyArrow Table directly to Vortex without loading into memory:
///
/// >>> import pyarrow as pa
/// >>> import vortex as vx
/// >>> table = pa.table({'x': [1, 2, 3, 4, 5]})
/// >>> vx.io.write(table, "streamed.vortex") # doctest: +SKIP
///
/// Stream from a PyArrow RecordBatchReader:
///
/// >>> import pyarrow as pa
/// >>> import vortex as vx
/// >>> reader = pa.RecordBatchReader.from_batches(schema, batches) # doctest: +SKIP
/// >>> vx.io.write(reader, "streamed.vortex") # doctest: +SKIP
///
#[pyfunction]
#[pyo3(signature = (iter, path))]
Expand Down Expand Up @@ -141,8 +163,41 @@ impl<'py> FromPyObject<'py> for PyIntoArrayIterator {
)));
}

// Try to convert from Arrow objects (Table, RecordBatchReader, etc.)
if let Ok(arrow_iter) = try_arrow_stream_to_iterator(ob) {
return Ok(PyVortex(arrow_iter));
}

Err(PyTypeError::new_err(
"Expected an object that can be converted to an ArrayIterator (PyArrayIterator, PyArray, or PyArrow object with streaming support)",
))
}
}

/// Try to convert a PyArrow object to a Vortex ArrayIterator using Arrow FFI streams.
fn try_arrow_stream_to_iterator(ob: &Bound<'_, PyAny>) -> PyResult<Box<dyn ArrayIterator + Send>> {
let py = ob.py();
let pa = py.import("pyarrow")?;
let pa_table = pa.getattr("Table")?;
let pa_record_batch_reader = pa.getattr("RecordBatchReader")?;

if ob.is_instance(&pa_table)? || ob.is_instance(&pa_record_batch_reader)? {
// Convert to Arrow stream using FFI
let arrow_stream = ArrowArrayStreamReader::from_pyarrow_bound(ob)?;
let dtype = DType::from_arrow(arrow_stream.schema());

// Convert Arrow RecordBatch stream to Vortex ArrayIterator
let vortex_iter = arrow_stream
.into_iter()
.map(|batch_result| -> VortexResult<ArrayRef> {
let batch = batch_result.map_err(VortexError::from)?;
Ok(ArrayRef::from_arrow(batch, false))
});

Ok(Box::new(ArrayIteratorAdapter::new(dtype, vortex_iter)))
} else {
Err(PyTypeError::new_err(
"Expected an object that can be converted to an ArrayIterator",
"Object is not a supported Arrow streaming type",
))
}
}
16 changes: 16 additions & 0 deletions vortex-python/test/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,19 @@ def test_empty_file(tmpdir_factory):
# writing file should succeed
empty_file = tmpdir_factory.mktemp("data") / "empty.vortex"
vortex.io.write(empty, str(empty_file))


def test_stream_pyarrow(tmpdir_factory):
import pyarrow.parquet as pq

data_dir = tmpdir_factory.mktemp("data")
table = pa.Table.from_pydict(
{
"names": ["Alice", "Bob", "Carol"],
"ages": [21, 22, 23],
}
)
pq.write_table(table, str(data_dir / "names.parquet"))

df = pq.read_table(str(data_dir / "names.parquet"))
vortex.io.write(df, str(data_dir / "names.vortex"))
Loading