-
Notifications
You must be signed in to change notification settings - Fork 1k
Implement a Vec<RecordBatch> wrapper for pyarrow.Table convenience
#8790
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Implement a Vec<RecordBatch> wrapper for pyarrow.Table convenience
#8790
Conversation
kylebarron
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Historically the attitude of this crate has been to avoid "Table" constructs to push users towards streaming approaches.
I don't know what the stance of maintainers is towards including a Table construct for python integration.
FWIW if you wanted to look at external crates, PyTable exists that probably does what you want. (disclosure it's my project). That alternatively might give you ideas for how to handle the Table here if you still want to do that. (It's a separate crate for these reasons)
|
Thanks @kylebarron for your very quick review! ❤️
Yes, I'm also not too sure about it, that's why I just sketched out a rough implementation without tests so far. A reason why I think this potentially could be nice to have in
At least I personally think having such a wrapper could be nice, since it simplifies stuff a bit when you anyways already have Slightly nicer Python workflowIn our very specific example, we have a Python class with a function such as this one: class ParquetFile:
def read_row_group(self, index: int) -> pyarrow.RecordBatch: ...In the issue I linked this unfortunately breaks down for a specific parquet file since a particular row group isn't expressable as a single The latter comes with a bit of syntactic shortcomings in contexts where you want to apply rg: pyarrow.RecordBatch | Iterator[pyarrow.RecordBatch] = ParquetFile(...).read_row_group(0)
python_objs: list[dict[str, Any]]
if isinstance(rg, pyarrow.RecordBatch):
python_objs = rg.to_pylist()
else:
python_objs = list(itertools.chain.from_iterable(batch.to_pylist() for batch in rg))With rg: pyarrow.RecordBatch | pyarrow.Table = ParquetFile(...).read_row_group(0)
python_objs: list[dict[str, Any]] = rg.to_pylist()And just for clarity, we unfortunately need to have the entire Row group deserialized as Python objects because our data ingestion pipelines that consume this are expecting to have access to the entire row group in bulk, so streaming approaches are sadly not usable.
Yes, in general, I much prefer the approach of |
I think that's outdated for Python -> Rust. I haven't tried but you should be able to pass a But I assume there's no way today to easily return a
I'm fine with that; and I think other maintainers would probably be fine with that too, since it's only a concept that exists in the Python integration. I'm not sure I totally get your example. Seems bad to be returning a union of multiple types to Python. But seems reasonable to return a
Well there's nothing stopping you from materializing the stream by passing it to
You can use |
Yes, exactly, that's what I even mentioned here in this PR (https://github.com/apache/arrow-rs/pull/8790/files#diff-2cc622072ff5fa80cf1a32a161da31ac058336ebedfeadbc8532fa52ea4224faR491-R492 + https://github.com/apache/arrow-rs/pull/8790/files#diff-2cc622072ff5fa80cf1a32a161da31ac058336ebedfeadbc8532fa52ea4224faR545-R549): This is even used to convert As you said, the opposite, namely easily returning a
My example wasn't entirely complete for simplicitly (and still isn't), it would be more something like this: class ParquetFile:
@overload
def read_row_group(self, index: int, as_table: Literal[True]) -> pyarrow.Table: ...
@overload
def read_row_group(self, index: int, as_table: Literal[False]) -> pyarrow.RecordBatch: ...
def read_row_group(self, index: int, as_table: bool = False) -> pyarrow.RecordBatch | pyarrow.Table: ...The advantage of that would be that both class ToListCapable(Protocol):
def to_pylist(self) -> list[dict[str, Any]]: ...
class ParquetFile:
def read_row_group(self, index: int, as_table: bool = False) -> ToListCapable: ...
&
Yes, sure!. We also do that in other places, or have entirely streamable pipelines elsewhere that use the PyCapsule ArrowStream interface. It's just that for this very specific use case, a |
|
I am not a python expert nor have I fully understood all the discussion on this ticket,
This would be my preferred approach -- make it easy to go from Rust <> Python, while trying to encourage good practices (e.g. streaming). There is no reason to be pedantic and force someone through hoops to make PyTable if that is what they want |
|
I now added a bunch of tests in |
61464b5 to
37b46be
Compare
arrow-pyarrow/src/lib.rs
Outdated
| //! For example, a `pyarrow.Table` can be imported to Rust through `PyArrowType<ArrowArrayStreamReader>` | ||
| //! instead (since `pyarrow.Table` implements the ArrayStream PyCapsule interface). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be good to note here that another advantage of using ArrowArrayStreamReader is that it works with tables and stream input out of the box. It doesn't matter which type the user passes in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also reading through the docs again, I'd suggest making a reference to Box<dyn RecordBatchReader> rather than ArrowArrayStreamReader. The former is a higher level API and much easier to use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be good to note here that another advantage of using ArrowArrayStreamReader is that it works with tables and stream input out of the box.
I added that in the docs.
Also reading through the docs again, I'd suggest making a reference to Box rather than ArrowArrayStreamReader. The former is a higher level API and much easier to use.
I'm not exactly sure what you mean here. Box<dyn RecordBatchReader> only implements IntoPyArrow, but not FromPyArrow. So in the example I state in the new documentation, that for consuming a pyarrow.Table in Rust, also a streaming approach could be used, the Box<dyn RecordBatchReader> isn't helping sadly. One has to use ArrowArrayStreamReader, since that properly implements FromPyArrow.
|
Hey @kylebarron, thanks for the review! I implemented everything as you suggested. As you can see, now the CI is broken, because of a suttle problem that was uncovered. Maybe you can help me, as I'm not too familiar with the FFI logic and all my sanity checks did not help me: In the The The schema itself from the This previously worked because I used an Sanity checks:
EDIT: More importantly, omitting the schema check in |
|
@kylebarron for now I stole the |
arrow-pyarrow/src/lib.rs
Outdated
| for record_batch in &record_batches { | ||
| if !schema_equals(&schema, &record_batch.schema()) { | ||
| return Err(ArrowError::SchemaError( | ||
| //"All record batches must have the same schema.".to_owned(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| //"All record batches must have the same schema.".to_owned(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only have the more verbose error message here right now to understand what's going on in the schema mismatch. This is currently commented out to signal that this is not intended to be merged as-is, but the schema mismatch issue shall be understood first.
In general I'm opinionless about how verbose the error message shall be, I'd happily to eventually remove whatever variant you dislike.
CQ fixes CQ fix CQ fix Let `Table` be a combination of `Vec<RecordBatch>` and `SchemaRef` instead `cargo fmt` Overhauled `Table` definition, Added tests Add empty `Table` integration test Update `arrow-pyarrow`'s crate documentation Overhaul documentation even more Typo fix
94b3cf3 to
52047b8
Compare
|
Hey, I pushed a version that actually does not use the PyCapsule ArrayStream interface for converting a
With that, I got RecordBatches with preserved metadata from Since I also checked on the Python side with a Potentially there is a slight misuse of the PyCapsule interface somewhere, as this definitely seems to return RecordBatches without metadata. I'm not too familiar with the low-level stuff there, but I'll try to investigate; help is appreciated! |
|
Okay, I think the error was in With that, consuming @alamb @kylebarron let me know whether the change of |

Rationale for this change
When dealing with Parquet files that have an exceedingly large amount of Binary or UTF8 data in one row group, there can be issues when returning a single RecordBatch because of index overflows (#7973).
In
pyarrowthis is usually solved by representing data as apyarrow.Tableobject whose columns areChunkedArrays, which basically are just lists of Arrow Arrays, or alternatively, thepyarrow.Tableis just a representation of a list ofRecordBatches.I'd like to build a function in PyO3 that returns a
pyarrow.Table, very similar to pyarrow's read_row_group method. With that, we could have feature parity withpyarrowin circumstances of potential index overflows without resorting to type changes (such as reading the data asLargeStringorStringViewcolumns).Currently, AFAIS, there is no way in
arrow-pyarrowto export apyarrow.Tabledirectly. Especially convenience methods fromVec<RecordBatch>seem to be missing. This PR tries to implement a convenience wrapper that allows directly exportingpyarrow.Table.What changes are included in this PR?
A new struct
Tablein the cratearrow-pyarrowis added which can be constructed fromVec<RecordBatch>or fromArrowArrayStreamReader.It implements
FromPyArrowandIntoPyArrow.FromPyArrowwill support anything that either implements the ArrowStreamReader protocol or is a RecordBatchReader, or has ato_reader()method which does that.pyarrow.Tabledoes both of these things.IntoPyArrowwill result int apyarrow.Tableon the Python side, constructed throughpyarrow.Table.from_batches(...).Are these changes tested?
Yes, in
arrow-pyarrow-integration-tests.Are there any user-facing changes?
A new
Tableconvience wrapper is added!