Skip to content

Commit

Permalink
feat: Add PyDataType and PySchema (#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Aug 6, 2024
1 parent f779a8f commit 7a08452
Show file tree
Hide file tree
Showing 4 changed files with 426 additions and 217 deletions.
8 changes: 7 additions & 1 deletion pyo3-polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ description = "Expression plugins and PyO3 types for polars"

[dependencies]
ciborium = { version = "0.2.1", optional = true }
libc = "0.2" # pyo3 depends on libc already, so this does not introduce an extra dependence.
libc = "0.2" # pyo3 depends on libc already, so this does not introduce an extra dependence.
once_cell = "1"
polars = { workspace = true, default-features = false }
polars-core = { workspace = true, default-features = false }
Expand All @@ -27,3 +27,9 @@ thiserror = "1"
[features]
lazy = ["polars/serde-lazy", "polars-plan", "polars-lazy/serde", "ciborium"]
derive = ["pyo3-polars-derive", "polars-plan", "polars-ffi", "serde-pickle", "serde"]
dtype-full = ["polars/dtype-full", "dtype-decimal", "dtype-array", "dtype-categorical"]
object = ["polars/object"]
dtype-decimal = ["polars/dtype-decimal"]
dtype-struct = ["polars/dtype-struct"]
dtype-array = ["polars/dtype-array"]
dtype-categorical = ["polars/dtype-categorical"]
223 changes: 8 additions & 215 deletions pyo3-polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,223 +48,16 @@ pub mod error;
#[cfg(feature = "derive")]
pub mod export;
mod ffi;
mod types;

pub use crate::alloc::PolarsAllocator;
use crate::error::PyPolarsErr;
use crate::ffi::to_py::to_py_array;
use polars::export::arrow;
use polars::prelude::*;
use pyo3::ffi::Py_uintptr_t;
use once_cell::sync::Lazy;
use pyo3::prelude::*;
use pyo3::types::PyDict;
#[cfg(feature = "lazy")]
use {polars_lazy::frame::LazyFrame, polars_plan::plans::DslPlan};
pub use types::*;

#[repr(transparent)]
#[derive(Debug, Clone)]
/// A wrapper around a [`Series`] that can be converted to and from python with `pyo3`.
pub struct PySeries(pub Series);
pub(crate) static POLARS: Lazy<PyObject> = Lazy::new(|| {
Python::with_gil(|py| PyModule::import_bound(py, "polars").unwrap().to_object(py))
});

#[repr(transparent)]
#[derive(Debug, Clone)]
/// A wrapper around a [`DataFrame`] that can be converted to and from python with `pyo3`.
pub struct PyDataFrame(pub DataFrame);

#[cfg(feature = "lazy")]
#[repr(transparent)]
#[derive(Clone)]
/// A wrapper around a [`DataFrame`] that can be converted to and from python with `pyo3`.
/// # Warning
/// If the [`LazyFrame`] contains in memory data,
/// such as a [`DataFrame`] this will be serialized/deserialized.
///
/// It is recommended to only have `LazyFrame`s that scan data
/// from disk
pub struct PyLazyFrame(pub LazyFrame);

impl From<PyDataFrame> for DataFrame {
fn from(value: PyDataFrame) -> Self {
value.0
}
}

impl From<PySeries> for Series {
fn from(value: PySeries) -> Self {
value.0
}
}

#[cfg(feature = "lazy")]
impl From<PyLazyFrame> for LazyFrame {
fn from(value: PyLazyFrame) -> Self {
value.0
}
}

impl AsRef<Series> for PySeries {
fn as_ref(&self) -> &Series {
&self.0
}
}

impl AsRef<DataFrame> for PyDataFrame {
fn as_ref(&self) -> &DataFrame {
&self.0
}
}

#[cfg(feature = "lazy")]
impl AsRef<LazyFrame> for PyLazyFrame {
fn as_ref(&self) -> &LazyFrame {
&self.0
}
}

impl<'a> FromPyObject<'a> for PySeries {
fn extract_bound(ob: &Bound<'a, PyAny>) -> PyResult<Self> {
let ob = ob.call_method0("rechunk")?;

let name = ob.getattr("name")?;
let py_name = name.str()?;
let name = py_name.to_cow()?;

let kwargs = PyDict::new_bound(ob.py());
if let Ok(compat_level) = ob.call_method0("_newest_compat_level") {
let compat_level = compat_level.extract().unwrap();
let compat_level =
CompatLevel::with_level(compat_level).unwrap_or(CompatLevel::newest());
kwargs.set_item("compat_level", compat_level.get_level())?;
}
let arr = ob.call_method("to_arrow", (), Some(&kwargs))?;
let arr = ffi::to_rust::array_to_rust(&arr)?;
Ok(PySeries(
Series::try_from((&*name, arr)).map_err(PyPolarsErr::from)?,
))
}
}

impl<'a> FromPyObject<'a> for PyDataFrame {
fn extract_bound(ob: &Bound<'a, PyAny>) -> PyResult<Self> {
let series = ob.call_method0("get_columns")?;
let n = ob.getattr("width")?.extract::<usize>()?;
let mut columns = Vec::with_capacity(n);
for pyseries in series.iter()? {
let pyseries = pyseries?;
let s = pyseries.extract::<PySeries>()?.0;
columns.push(s);
}
unsafe { Ok(PyDataFrame(DataFrame::new_no_checks(columns))) }
}
}

#[cfg(feature = "lazy")]
impl<'a> FromPyObject<'a> for PyLazyFrame {
fn extract_bound(ob: &Bound<'a, PyAny>) -> PyResult<Self> {
let s = ob.call_method0("__getstate__")?.extract::<Vec<u8>>()?;
let lp: DslPlan = ciborium::de::from_reader(&*s).map_err(
|e| PyPolarsErr::Other(
format!("Error when deserializing LazyFrame. This may be due to mismatched polars versions. {}", e)
)
)?;
Ok(PyLazyFrame(LazyFrame::from(lp)))
}
}

impl IntoPy<PyObject> for PySeries {
fn into_py(self, py: Python<'_>) -> PyObject {
let polars = py.import_bound("polars").expect("polars not installed");
let s = polars.getattr("Series").unwrap();
match s
.getattr("_import_arrow_from_c")
.or_else(|_| s.getattr("_import_from_c"))
{
// Go via polars
Ok(import_arrow_from_c) => {
// Get supported compatibility level
let compat_level = CompatLevel::with_level(
s.getattr("_newest_compat_level")
.map_or(1, |newest_compat_level| {
newest_compat_level.call0().unwrap().extract().unwrap()
}),
)
.unwrap_or(CompatLevel::newest());
// Prepare pointers on the heap.
let mut chunk_ptrs = Vec::with_capacity(self.0.n_chunks());
for i in 0..self.0.n_chunks() {
let array = self.0.to_arrow(i, compat_level);
let schema = Box::leak(Box::new(arrow::ffi::export_field_to_c(
&ArrowField::new("", array.data_type().clone(), true),
)));
let array = Box::leak(Box::new(arrow::ffi::export_array_to_c(array.clone())));

let schema_ptr: *const arrow::ffi::ArrowSchema = &*schema;
let array_ptr: *const arrow::ffi::ArrowArray = &*array;
chunk_ptrs.push((schema_ptr as Py_uintptr_t, array_ptr as Py_uintptr_t))
}
// Somehow we need to clone the Vec, because pyo3 doesn't accept a slice here.
let pyseries = import_arrow_from_c
.call1((self.0.name(), chunk_ptrs.clone()))
.unwrap();
// Deallocate boxes
for (schema_ptr, array_ptr) in chunk_ptrs {
let schema_ptr = schema_ptr as *mut arrow::ffi::ArrowSchema;
let array_ptr = array_ptr as *mut arrow::ffi::ArrowArray;
unsafe {
// We can drop both because the `schema` isn't read in an owned matter on the other side.
let _ = Box::from_raw(schema_ptr);

// The array is `ptr::read_unaligned` so there are two owners.
// We drop the box, and forget the content so the other process is the owner.
let array = Box::from_raw(array_ptr);
// We must forget because the other process will call the release callback.
let array = *array;
std::mem::forget(array);
}
}

pyseries.to_object(py)
}
// Go via pyarrow
Err(_) => {
let s = self.0.rechunk();
let name = s.name();
let arr = s.to_arrow(0, CompatLevel::oldest());
let pyarrow = py.import_bound("pyarrow").expect("pyarrow not installed");

let arg = to_py_array(arr, py, pyarrow).unwrap();
let s = polars.call_method1("from_arrow", (arg,)).unwrap();
let s = s.call_method1("rename", (name,)).unwrap();
s.to_object(py)
}
}
}
}

impl IntoPy<PyObject> for PyDataFrame {
fn into_py(self, py: Python<'_>) -> PyObject {
let pyseries = self
.0
.get_columns()
.iter()
.map(|s| PySeries(s.clone()).into_py(py))
.collect::<Vec<_>>();

let polars = py.import_bound("polars").expect("polars not installed");
let df_object = polars.call_method1("DataFrame", (pyseries,)).unwrap();
df_object.into_py(py)
}
}

#[cfg(feature = "lazy")]
impl IntoPy<PyObject> for PyLazyFrame {
fn into_py(self, py: Python<'_>) -> PyObject {
let polars = py.import_bound("polars").expect("polars not installed");
let cls = polars.getattr("LazyFrame").unwrap();
let instance = cls.call_method1("__new__", (&cls,)).unwrap();
let mut writer: Vec<u8> = vec![];
ciborium::ser::into_writer(&self.0.logical_plan, &mut writer).unwrap();

instance.call_method1("__setstate__", (&*writer,)).unwrap();
instance.into_py(py)
}
}
pub(crate) static SERIES: Lazy<PyObject> =
Lazy::new(|| Python::with_gil(|py| POLARS.getattr(py, "Series").unwrap()));
Loading

0 comments on commit 7a08452

Please sign in to comment.