Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PyCapsule support for Arrow import and export #825

Merged
merged 19 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions docs/source/user-guide/io/arrow.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
.. Licensed to the Apache Software Foundation (ASF) under one
.. or more contributor license agreements. See the NOTICE file
.. distributed with this work for additional information
.. regarding copyright ownership. The ASF licenses this file
.. to you under the Apache License, Version 2.0 (the
.. "License"); you may not use this file except in compliance
.. with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
.. software distributed under the License is distributed on an
.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
.. KIND, either express or implied. See the License for the
.. specific language governing permissions and limitations
.. under the License.

Arrow
=====

DataFusion implements the
`Apache Arrow PyCapsule interface <https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html>`_
for importing and exporting DataFrames with zero copy. With this feature, any Python
project that implements this interface can share data back and forth with DataFusion
with zero copy.

We can demonstrate using `pyarrow <https://arrow.apache.org/docs/python/index.html>`_.

Importing to DataFusion
-----------------------

Here we will create an Arrow table and import it to DataFusion.

To import an Arrow table, use :py:func:`datafusion.context.SessionContext.from_arrow`.
This will accept any Python object that implements
`__arrow_c_stream__ <https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html#arrowstream-export>`_
or `__arrow_c_array__ <https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html#arrowarray-export>`_
and returns a ``StructArray``. Common pyarrow sources you can use are:

- `Array <https://arrow.apache.org/docs/python/generated/pyarrow.Array.html>`_ (but it must return a Struct Array)
- `Record Batch <https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html>`_
- `Record Batch Reader <https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatchReader.html>`_
- `Table <https://arrow.apache.org/docs/python/generated/pyarrow.Table.html>`_

.. ipython:: python

from datafusion import SessionContext
import pyarrow as pa

data = {"a": [1, 2, 3], "b": [4, 5, 6]}
table = pa.Table.from_pydict(data)

ctx = SessionContext()
df = ctx.from_arrow(table)
df

Exporting from DataFusion
-------------------------

DataFusion DataFrames implement ``__arrow_c_stream__`` PyCapsule interface, so any
Python library that accepts these can import a DataFusion DataFrame directly.

.. warning::
It is important to note that this will cause the DataFrame execution to happen, which may be
a time consuming task. That is, you will cause a
:py:func:`datafusion.dataframe.DataFrame.collect` operation call to occur.


.. ipython:: python

df = df.select((col("a") * lit(1.5)).alias("c"), lit("df").alias("d"))
pa.table(df)

6 changes: 3 additions & 3 deletions docs/source/user-guide/io/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ IO
.. toctree::
:maxdepth: 2

arrow
avro
csv
parquet
json
avro

parquet
2 changes: 1 addition & 1 deletion examples/import.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,5 @@

# Convert Arrow Table to datafusion DataFrame
arrow_table = pa.Table.from_pydict({"a": [1, 2, 3], "b": [4, 5, 6]})
df = ctx.from_arrow_table(arrow_table)
df = ctx.from_arrow(arrow_table)
assert type(df) == datafusion.DataFrame
24 changes: 18 additions & 6 deletions python/datafusion/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,19 +586,31 @@ def from_pydict(
"""
return DataFrame(self.ctx.from_pydict(data, name))

def from_arrow_table(
self, data: pyarrow.Table, name: str | None = None
) -> DataFrame:
"""Create a :py:class:`~datafusion.dataframe.DataFrame` from an Arrow table.
def from_arrow(self, data: Any, name: str | None = None) -> DataFrame:
"""Create a :py:class:`~datafusion.dataframe.DataFrame` from an Arrow source.

The Arrow data source can be any object that implements either
``__arrow_c_stream__`` or ``__arrow_c_array__``. For the latter, it must return
a struct array. Common examples of sources from pyarrow include
Comment on lines +593 to +594
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For both they must emit a struct array. Any Arrow array can be passed through an __arrow_c_stream__. Canonically, to transfer a DataFrame you have a stream of struct arrays where each one is unpacked to be the columns of a RecordBatch. But it doesn't have to a struct array: you can also transfer a Series through an __arrow_c_stream__, where each batch in the stream iterator is just a primitive array.


Args:
data: Arrow table.
data: Arrow data source.
name: Name of the DataFrame.

Returns:
DataFrame representation of the Arrow table.
"""
return DataFrame(self.ctx.from_arrow_table(data, name))
return DataFrame(self.ctx.from_arrow(data, name))

@deprecated("Use ``from_arrow`` instead.")
def from_arrow_table(
timsaucer marked this conversation as resolved.
Show resolved Hide resolved
self, data: pyarrow.Table, name: str | None = None
) -> DataFrame:
"""Create a :py:class:`~datafusion.dataframe.DataFrame` from an Arrow table.

This is an alias for :py:func:`from_arrow`.
"""
return self.from_arrow(data, name)

def from_pandas(self, data: pandas.DataFrame, name: str | None = None) -> DataFrame:
"""Create a :py:class:`~datafusion.dataframe.DataFrame` from a Pandas DataFrame.
Expand Down
16 changes: 16 additions & 0 deletions python/datafusion/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,3 +524,19 @@ def unnest_columns(self, *columns: str, preserve_nulls: bool = True) -> DataFram
"""
columns = [c for c in columns]
return DataFrame(self.df.unnest_columns(columns, preserve_nulls=preserve_nulls))

def __arrow_c_stream__(self, requested_schema: pa.Schema) -> Any:
"""Export an Arrow PyCapsule Stream.

This will execute and collect the DataFrame. We will attempt to respect the
requested schema, but only trivial transformations will be applied such as only
returning the fields listed in the requested schema if their data types match
those in the DataFrame.
timsaucer marked this conversation as resolved.
Show resolved Hide resolved

Args:
requested_schema: Attempt to provide the DataFrame using this schema.

Returns:
Arrow PyCapsule object.
"""
return self.df.__arrow_c_stream__(requested_schema)
37 changes: 33 additions & 4 deletions python/datafusion/tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def test_from_arrow_table(ctx):
table = pa.Table.from_pydict(data)

# convert to DataFrame
df = ctx.from_arrow_table(table)
df = ctx.from_arrow(table)
tables = list(ctx.catalog().database().names())

assert df
Expand All @@ -166,13 +166,42 @@ def test_from_arrow_table(ctx):
assert df.collect()[0].num_rows == 3


def record_batch_generator(num_batches: int):
schema = pa.schema([("a", pa.int64()), ("b", pa.int64())])
for i in range(num_batches):
yield pa.RecordBatch.from_arrays(
[pa.array([1, 2, 3]), pa.array([4, 5, 6])], schema=schema
)


@pytest.mark.parametrize(
"source",
[
# __arrow_c_array__ sources
pa.array([{"a": 1, "b": 4}, {"a": 2, "b": 5}, {"a": 3, "b": 6}]),
# __arrow_c_stream__ sources
pa.RecordBatch.from_pydict({"a": [1, 2, 3], "b": [4, 5, 6]}),
pa.RecordBatchReader.from_batches(
pa.schema([("a", pa.int64()), ("b", pa.int64())]), record_batch_generator(1)
),
pa.Table.from_pydict({"a": [1, 2, 3], "b": [4, 5, 6]}),
],
)
def test_from_arrow_sources(ctx, source) -> None:
df = ctx.from_arrow(source)
assert df
assert isinstance(df, DataFrame)
assert df.schema().names == ["a", "b"]
assert df.count() == 3


def test_from_arrow_table_with_name(ctx):
# create a PyArrow table
data = {"a": [1, 2, 3], "b": [4, 5, 6]}
table = pa.Table.from_pydict(data)

# convert to DataFrame with optional name
df = ctx.from_arrow_table(table, name="tbl")
df = ctx.from_arrow(table, name="tbl")
tables = list(ctx.catalog().database().names())

assert df
Expand All @@ -185,7 +214,7 @@ def test_from_arrow_table_empty(ctx):
table = pa.Table.from_pydict(data, schema=schema)

# convert to DataFrame
df = ctx.from_arrow_table(table)
df = ctx.from_arrow(table)
tables = list(ctx.catalog().database().names())

assert df
Expand All @@ -200,7 +229,7 @@ def test_from_arrow_table_empty_no_schema(ctx):
table = pa.Table.from_pydict(data)

# convert to DataFrame
df = ctx.from_arrow_table(table)
df = ctx.from_arrow(table)
tables = list(ctx.catalog().database().names())

assert df
Expand Down
51 changes: 40 additions & 11 deletions python/datafusion/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def df():
names=["a", "b", "c"],
)

return ctx.create_dataframe([[batch]])
return ctx.from_arrow(batch)


@pytest.fixture
Expand Down Expand Up @@ -835,13 +835,42 @@ def test_write_compressed_parquet_missing_compression_level(df, tmp_path, compre
df.write_parquet(str(path), compression=compression)


# ctx = SessionContext()

# # create a RecordBatch and a new DataFrame from it
# batch = pa.RecordBatch.from_arrays(
# [pa.array([1, 2, 3]), pa.array([4, 5, 6]), pa.array([8, 5, 8])],
# names=["a", "b", "c"],
# )

# df = ctx.create_dataframe([[batch]])
# test_execute_stream(df)
def test_dataframe_export(df) -> None:
# Guarantees that we have the canonical implementation
# reading our dataframe export
table = pa.table(df)
assert table.num_columns == 3
assert table.num_rows == 3

desired_schema = pa.schema([("a", pa.int64())])

# Verify we can request a schema
table = pa.table(df, schema=desired_schema)
assert table.num_columns == 1
assert table.num_rows == 3

# Expect a table of nulls if the schema don't overlap
desired_schema = pa.schema([("g", pa.string())])
table = pa.table(df, schema=desired_schema)
assert table.num_columns == 1
assert table.num_rows == 3
for i in range(0, 3):
assert table[0][i].as_py() is None

# Expect an error when we cannot convert schema
desired_schema = pa.schema([("a", pa.float32())])
failed_convert = False
try:
table = pa.table(df, schema=desired_schema)
except Exception:
failed_convert = True
assert failed_convert

# Expect an error when we have a not set non-nullable
desired_schema = pa.schema([("g", pa.string(), False)])
failed_convert = False
try:
table = pa.table(df, schema=desired_schema)
except Exception:
failed_convert = True
assert failed_convert
46 changes: 31 additions & 15 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;

use arrow::array::RecordBatchReader;
use arrow::ffi_stream::ArrowArrayStreamReader;
use arrow::pyarrow::FromPyArrow;
use datafusion::execution::session_state::SessionStateBuilder;
use object_store::ObjectStore;
use url::Url;
use uuid::Uuid;

use pyo3::exceptions::{PyKeyError, PyValueError};
use pyo3::exceptions::{PyKeyError, PyTypeError, PyValueError};
use pyo3::prelude::*;

use crate::catalog::{PyCatalog, PyTable};
Expand Down Expand Up @@ -444,7 +447,7 @@ impl PySessionContext {
let table = table_class.call_method1("from_pylist", args)?;

// Convert Arrow Table to datafusion DataFrame
let df = self.from_arrow_table(table, name, py)?;
let df = self.from_arrow(table, name, py)?;
Ok(df)
}

Expand All @@ -463,29 +466,42 @@ impl PySessionContext {
let table = table_class.call_method1("from_pydict", args)?;

// Convert Arrow Table to datafusion DataFrame
let df = self.from_arrow_table(table, name, py)?;
let df = self.from_arrow(table, name, py)?;
Ok(df)
}

/// Construct datafusion dataframe from Arrow Table
pub fn from_arrow_table(
pub fn from_arrow(
&mut self,
data: Bound<'_, PyAny>,
name: Option<&str>,
py: Python,
) -> PyResult<PyDataFrame> {
// Instantiate pyarrow Table object & convert to batches
let table = data.call_method0("to_batches")?;
let (schema, batches) =
if let Ok(stream_reader) = ArrowArrayStreamReader::from_pyarrow_bound(&data) {
// Works for any object that implements __arrow_c_stream__ in pycapsule.

let schema = stream_reader.schema().as_ref().to_owned();
timsaucer marked this conversation as resolved.
Show resolved Hide resolved
let batches = stream_reader
.collect::<Result<Vec<RecordBatch>, arrow::error::ArrowError>>()
.map_err(DataFusionError::from)?;

(schema, batches)
} else if let Ok(array) = RecordBatch::from_pyarrow_bound(&data) {
// While this says RecordBatch, it will work for any object that implements
// __arrow_c_array__ and returns a StructArray.

(array.schema().as_ref().to_owned(), vec![array])
timsaucer marked this conversation as resolved.
Show resolved Hide resolved
} else {
return Err(PyTypeError::new_err(
"Expected either a Arrow Array or Arrow Stream in from_arrow().",
));
};

let schema = data.getattr("schema")?;
let schema = schema.extract::<PyArrowType<Schema>>()?;

// Cast PyAny to RecordBatch type
// Because create_dataframe() expects a vector of vectors of record batches
// here we need to wrap the vector of record batches in an additional vector
let batches = table.extract::<PyArrowType<Vec<RecordBatch>>>()?;
let list_of_batches = PyArrowType::from(vec![batches.0]);
self.create_dataframe(list_of_batches, name, Some(schema), py)
let list_of_batches = PyArrowType::from(vec![batches]);
self.create_dataframe(list_of_batches, name, Some(schema.into()), py)
}

/// Construct datafusion dataframe from pandas
Expand All @@ -504,7 +520,7 @@ impl PySessionContext {
let table = table_class.call_method1("from_pandas", args)?;

// Convert Arrow Table to datafusion DataFrame
let df = self.from_arrow_table(table, name, py)?;
let df = self.from_arrow(table, name, py)?;
Ok(df)
}

Expand All @@ -518,7 +534,7 @@ impl PySessionContext {
let table = data.call_method0("to_arrow")?;

// Convert Arrow Table to datafusion DataFrame
let df = self.from_arrow_table(table, name, data.py())?;
let df = self.from_arrow(table, name, data.py())?;
Ok(df)
}

Expand Down
Loading
Loading