Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature requests #195

Open
martindurant opened this issue Sep 26, 2024 · 18 comments
Open

Feature requests #195

martindurant opened this issue Sep 26, 2024 · 18 comments

Comments

@martindurant
Copy link

It would be great:

  • you could select columns for reading from parquet, or, even better, select from the schema hierarchy in general for deeper structured datasets
  • you allow reading row-group X from a parquet dataset; this would allow for distributing the work to threads or even a cluster. Of course, the reader would need to reveal how many row-groups it contains
  • some to_buffers kind of method exists to expose the internal buffers of an arrow structure, in the order defined in the arrow docs; also the corresponding from_buffers

Doing all of this would essentially answer what is envisaged in dask/fastparquet#931 : getting what we really need out of arrow without the cruft. It would interoperate nicely with awkward, for example.

Other nice to haves (and I realise you wish to keep the scope as small as possible)

  • parquet filter
  • str and dt compute functions
@kylebarron
Copy link
Owner

kylebarron commented Sep 26, 2024

  • you could select columns for reading from parquet, or, even better, select from the schema hierarchy in general for deeper structured datasets

This is relatively easy to do; you just have to map input into a ProjectionMask.

  • you allow reading row-group X from a parquet dataset; this would allow for distributing the work to threads or even a cluster. Of course, the reader would need to reveal how many row-groups it contains

I've done this for other bindings (e.g. https://geoarrow.org/geoarrow-rs/python/latest/api/io/functions/#geoarrow.rust.io.ParquetFile and https://kylebarron.dev/parquet-wasm/classes/bundler_parquet_wasm.ParquetFile.html). I'd say it's in scope but I'm not sure of a good way to expose both sync and async methods from a single class.

  • some to_buffers kind of method exists to expose the internal buffers of an arrow structure, in the order defined in the arrow docs; also the corresponding from_buffers

This should be relatively easy. I already started prototyping this in #156, which adds a Buffer class that implements the Python buffer protocol. So to_buffers would export a list of arrow buffers according to the spec. I think ArrayData::buffers should export the buffers in the order expected by the C Data Interface, so we should be able to reuse that.

Other nice to haves (and I realise you wish to keep the scope as small as possible)

The goal isn't to have the scope strictly as small as possible, but rather have minimal new code here. E.g. if the functionality already exists as a compute function in arrow::compute, then it should be easy to bind for export here, and thus is in scope.

The core module (arro3.core) should stay small, but e.g. the arro3.compute module can bring in more of the underlying Rust compute functions.

  • str and dt compute functions

dt compute functions are pretty easy to implement. See date_part. substring and regexp also exist.

  • parquet filter

This is a bit more complex because we'd need more work on the binding side to be able to express the parquet filter to pass to Rust. Let's work on the other topics first and come back to this.


PRs for these are welcome if you're interested; preferably one at a time

@martindurant
Copy link
Author

This is relatively easy to do; you just have to map input into a ProjectionMask.

I mean in the python API, of course. So input would be a list of strings.

I already started prototyping this in #156, which adds a Buffer class that implements the Python buffer protocol.

I think the latest pyo3 should be able to pass bytes-like objects back and forth between python and rust without copying, and you no longer need to implement Buffer.
e.g., https://docs.rs/pyo3/latest/pyo3/buffer/struct.PyBuffer.html

if the functionality already exists as a compute function in arrow::compute

There seems to be less functionality there than via pyarrow compute.

@kylebarron
Copy link
Owner

This is relatively easy to do; you just have to map input into a ProjectionMask.

I mean in the python API, of course. So input would be a list of strings.

The Python API needs to map some Python input into a Rust ProjectionMask. The input could be a list of strings, but it could also be defined in other ways.

  • A List[str] where the . character is used to define nested paths.
  • A List[str] | List[List[str]] where the inner list is used to define nested paths.
  • Or, to more directly mirror the Rust API, it could be a List[int] that map to the leaf or root column indexes.

I already started prototyping this in #156, which adds a Buffer class that implements the Python buffer protocol.

I think the latest pyo3 should be able to pass bytes-like objects back and forth between python and rust without copying, and you no longer need to implement Buffer. e.g., docs.rs/pyo3/latest/pyo3/buffer/struct.PyBuffer.html

For data export (Rust to Python) we want to expose existing Arrow Buffer objects to Python. It's imperative not to copy the existing Arrow data into a new object. I believe that PyBuffer can only represent Python memory regions. The only constructor methods I see on PyBuffer are copy_from_slice. Therefore I believe we really do need to implement the buffer protocol on top of Arrow-native buffers.

The PyBuffer struct is more easily used for importing data zero-copy from Python. However PyBuffer makes you declare T as a constant, and doesn't let you dynamically infer the data type provided by Python. That's why we need the AnyBuffer struct in our code. So that you can pass an array of multiple types, and we can convert that to Arrow data.

if the functionality already exists as a compute function in arrow::compute

There seems to be less functionality there than via pyarrow compute.

Yes, indeed. There's relatively less functionality in core arrow and more functionality in datafusion. DataFusion is async and more complex. For arro3 only the core arrow compute functions are in scope.

@martindurant
Copy link
Author

The Python API needs to map some Python input into a Rust ProjectionMask

Agreed. List[str] is what pyarrow supports, but it cares about columns, not nested things. So I would say that either the dotted approach or list[list[str]] approach should be implemented, where the former would need some way to deal with schema name parts including ".". Integers would be fine at the rust API border, but I don't think any end user wants to do that, they think in fields/columns.

The PyBuffer struct is more easily used for importing data zero-copy from Python.

Right, it's one half of solution only.

It's imperative not to copy

Totally agree

That's why we need the AnyBuffer struct in our code.

A python list would have done :). Eventually each buffer is just a uint8 slice after all.

DataFusion is async and more complex

I believe you, but why would CPU kernels ever need async?

@kylebarron
Copy link
Owner

kylebarron commented Oct 1, 2024

The Python API needs to map some Python input into a Rust ProjectionMask

Agreed. List[str] is what pyarrow supports, but it cares about columns, not nested things. So I would say that either the dotted approach or list[list[str]] approach should be implemented, where the former would need some way to deal with schema name parts including ".". Integers would be fine at the rust API border, but I don't think any end user wants to do that, they think in fields/columns.

I'd probably go for List[str] | List[List[str]], where most of the time the user won't have a nested column and could pass in a bare List[str].

That's why we need the AnyBuffer struct in our code.

A python list would have done :). Eventually each buffer is just a uint8 slice after all.

I don't follow. There's no place here where a Python list could be zero-copy, either for import or export.

DataFusion is async and more complex

I believe you, but why would CPU kernels ever need async?

DataFusion is a pluggable query engine. It's not the kernels themselves that need async, but the fact that they're part of a query engine that might need async to access the data, and which uses an async engine in its scheduler.

It might be possible to use some of DataFusion's operations directly in a sync manner, but I doubt it. I think you'd need to use the main DataFusion APIs, which are async.

@martindurant
Copy link
Author

I'd probably go for List[str] | List[List[str]], where most of the time the user won't have a nested column and could pass in a bare List[str].

+1

There's no place here where a Python list could be zero-copy, either for import or export.

A python list containing buffers (memoryview, bytearray, etc): you can iterate over each item and try_into to test it has the correct type. These days, maybe it casts directly to Vec of PyBackedBytes

@kylebarron
Copy link
Owner

kylebarron commented Oct 1, 2024

A python list containing buffers (memoryview, bytearray, etc): you can iterate over each item and try_into to test it has the correct type

I still don't understand what you're saying...

#[pyfunction]
fn accept_buffer_protocol(py: Python, buf: PyBuffer<u8>) {
    dbg!(buf.to_vec(py).unwrap());
}

Running that gives this error:

import numpy as np
from arro3.compute import accept_buffer_protocol

arr = np.array([1, 2, 3], dtype=np.uint8)
accept_buffer_protocol(arr)
# [arro3-compute/src/lib.rs:54:5] buf.to_vec(py).unwrap() = [
#     1,
#     2,
#     3,
# ]


arr = np.array([1, 2, 3], dtype=np.uint64)
accept_buffer_protocol(arr)
# ---------------------------------------------------------------------------
# BufferError                               Traceback (most recent call last)
# File /Users/kyle/github/kylebarron/arro3/tests/tmp.py:1
# ----> 1 accept_buffer_protocol(arr)

# BufferError: buffer contents are not compatible with u8

And if I try a function with a PyList input, it doesn't work

import numpy as np
from arro3.compute import accept_buffer_protocol

arr = np.array([1, 2, 3], dtype=np.uint8)
accept_buffer_protocol(arr)
# ---------------------------------------------------------------------------
# TypeError                                 Traceback (most recent call last)
# File /Users/kyle/github/kylebarron/arro3/tests/tmp.py:1
# ----> 1 accept_buffer_protocol(arr)
#       2 # [arro3-compute/src/lib.rs:54:5] buf.to_vec(py).unwrap() = [
#       3 #     1,
#       4 #     2,
#       5 #     3,
#       6 # ]

# TypeError: argument 'buf': 'ndarray' object cannot be converted to 'PyList'

@martindurant
Copy link
Author

I mean something like

x: PyList
let bufs: Vec<PyBackedBuffer> = x.iter().map(|buf|buf.try_into()?)).collect()

but as I say, this may "just work" with Vec<PyBackedBuffer> as an input argument to a rust function.

But yes, I imagine those buffers are always u8, and that this is a requirement, i.e., the calling code would need to use np.array([1, 2, 3], dtype=np.uint64).view('uint8') (or .tobuffer()).

@martindurant
Copy link
Author

An interesting implementation of cross-language buffers: https://github.com/milesgranger/cramjam/blob/master/src/io.rs ; functions can generally take python objects or rust-side RustyBuffers. This has the additional feature of "read/seek" file-like methods, which you don't need.

@kylebarron
Copy link
Owner

kylebarron commented Oct 1, 2024

x: PyList
let bufs: Vec<PyBackedBuffer> = x.iter().map(|buf|buf.try_into()?)).collect()

PyBackedBuffer does not support buffer protocol objects; it only supports bytes and bytearray objects. And for bytearray objects it will copy the input. You must use a PyBuffer object to ensure you don't copy the input.

I think this is for safety reasons because technically Rust needs the external buffers to be immutable. bytes objects are immutable but there's no guaranteed that buffer protocol objects will be immutable. So it's technically unsafe in the rust world to interpret a buffer protocol object as a Vec<u8>. (In practice I think it's ok as long as you tell the user they must not mutate the input object during your Rust function). See https://alexgaynor.net/2022/oct/23/buffers-on-the-edge/


I figured out another approach from your suggestion, which is probably cleaner than vendoring upstream pyo3 code and internals from PyBuffer.

pub enum AnyBufferProtocol {
    UInt8(PyBuffer<u8>),
    UInt16(PyBuffer<u16>),
    UInt32(PyBuffer<u32>),
    UInt64(PyBuffer<u64>),
    Int8(PyBuffer<i8>),
    Int16(PyBuffer<i16>),
    Int32(PyBuffer<i32>),
    Int64(PyBuffer<i64>),
    Float32(PyBuffer<f32>),
    Float64(PyBuffer<f64>),
}

impl<'py> FromPyObject<'py> for AnyBufferProtocol {
    fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
        if let Ok(buf) = ob.extract::<PyBuffer<u8>>() {
            Ok(Self::UInt8(buf))
        } else if let Ok(buf) = ob.extract::<PyBuffer<u16>>() {
            Ok(Self::UInt16(buf))
        } else if let Ok(buf) = ob.extract::<PyBuffer<u32>>() {
            Ok(Self::UInt32(buf))
        } else if let Ok(buf) = ob.extract::<PyBuffer<u64>>() {
            Ok(Self::UInt64(buf))
        } else if let Ok(buf) = ob.extract::<PyBuffer<i8>>() {
            Ok(Self::Int8(buf))
        } else if let Ok(buf) = ob.extract::<PyBuffer<i16>>() {
            Ok(Self::Int16(buf))
        } else if let Ok(buf) = ob.extract::<PyBuffer<i32>>() {
            Ok(Self::Int32(buf))
        } else if let Ok(buf) = ob.extract::<PyBuffer<i64>>() {
            Ok(Self::Int64(buf))
        } else if let Ok(buf) = ob.extract::<PyBuffer<f32>>() {
            Ok(Self::Float32(buf))
        } else if let Ok(buf) = ob.extract::<PyBuffer<f64>>() {
            Ok(Self::Float64(buf))
        } else {
            Err(PyValueError::new_err("Not a buffer protocol object"))
        }
    }
}

This will have a slight amount of overhead to try to extract all of those different buffer types, but that's probably on the order of microseconds.

Note that these are physical types, so we still need to check for the logical type (e.g. a boolean-typed buffer protocol object will still export as a u8).

@martindurant
Copy link
Author

Are the buffers required by the arrow internal implementation really typed? I would have assumed they are all always u8.

Note that buffers-on-the-edge was written two years ago, and pyo3 has done a lot since.

@kylebarron
Copy link
Owner

Are the buffers required by the arrow internal implementation really typed?

At the core, the arrow Buffer is not typed. But we need the typing so that we can accept any buffer protocol input from the user and know which Arrow array type it should be used for. E.g. passing in a numpy array with float64 data type should be converted into an Arrow Float64Array and it shouldn't require the user to do any special work.

Note that buffers-on-the-edge was written two years ago, and pyo3 has done a lot since.

I don't think this in particular has changed: PyO3/pyo3#2824 (comment)

@martindurant
Copy link
Author

passing in a numpy array with float64 data type should be converted into an Arrow Float64Array

Then we are at cross purposes - I would expect that to be defined in the schema, and indeed pass byte buffers.

I don't think this in particular has changed

Except that we always have immutable buffers like you say, so we can make some stronger guarantees.

I also made one (incomplete) of these, by the way, and thought at the time that there must be a better way!

@kylebarron
Copy link
Owner

passing in a numpy array with float64 data type should be converted into an Arrow Float64Array

Then we are at cross purposes - I would expect that to be defined in the schema, and indeed pass byte buffers.

I think it's very important for end-user usability to allow this, because it makes this just work in many cases with raw numeric data.

If you know what you're doing and want to do something different, you can view the numpy array as np.uint8, then convert that to an Arrow UInt8Array, then cast that to whatever data type you want.

This conversation did get me to look at the existing buffer protocol implementation in arro3 and I revamped it in #204. So now we can interpret buffer protocol objects zero copy as Arrow arrays!

This is an improvement from before, because previously we would always copy buffer protocol input into newly-allocated Arrow input.

@martindurant
Copy link
Author

I think it's very important for end-user usability to allow this, because it makes this just work in many cases with raw numeric data.

Agreed, but isn't it easier to handle that on the python side?

@kylebarron
Copy link
Owner

Doesn't that require the user to do extra steps? We don't have a Python side to arro3.

Additionally, the nice part of making this a pure-rust implementation is that it's not just in arro3... it's also in pyo3-arrow. So anyone else creating a Python library and using pyo3-arrow will automatically support numpy arrays in all their primitive-type arrow APIs.

It's not that much code on the Rust side either

@martindurant
Copy link
Author

I don't think groked that there was no python at all!
Do you anticipate that anyone will be calling this library not from python?

Unfortunately, my attempts at rust-python coding are not idiomatic in rust at all. In that specific case, there is a good reason to have quite a lot on the python side. I wonder if you happen to know of a good resource to learn my way around a high-level rust codebase (i.e., with many layers of traits and generics)?

@kylebarron
Copy link
Owner

I don't think groked that there was no python at all!

There is virtually no Python. The only Python is a couple user-facing enums and types, and stub files for static type hinting.

Do you anticipate that anyone will be calling this library not from python?

There are two parts to the arro3 repo. There's arro3-*, a collection of namespaced Python packages for end-users in Python. But there's also pyo3-arrow, a Rust helper library for other Rust developers building Python libraries in Rust.

If I implement Python code in arro3, it can't be reused by other Rust developers. Meanwhile, the entire core of pyo3-arrow is able to be reused by other Rust developers.

So if you create a function in your own library with

use pyo3::prelude::*;
use pyo3_arrow::PyArray;

#[pyfunction]
pub fn my_python_function(array: PyArray) {}

Then that function will automatically accept any Arrow array (from any library implementing the Arrow PyCapsule Interface) or from any buffer protocol object.

Unfortunately, my attempts at rust-python coding are not idiomatic in rust at all.

The best way to have idiomatic Python-Rust is to implement the FromPyObject trait on your custom objects. So in the Arrow case, I manage the data translation from Python to Rust once in the FromPyObject implementation on PyArray and then any function can just put array: PyArray in the signature and it'll just work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants