Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update README in preparation for 0.8 release #206

Merged
merged 5 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 34 additions & 42 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,30 @@

This is a Python library that binds to [Apache Arrow](https://arrow.apache.org/) in-memory query engine [DataFusion](https://github.com/apache/arrow-datafusion).

Like pyspark, it allows you to build a plan through SQL or a DataFrame API against in-memory data, parquet or CSV
files, run it in a multi-threaded environment, and obtain the result back in Python.
DataFusion's Python bindings can be used as an end-user tool as well as providing a foundation for building new systems.

It also allows you to use UDFs and UDAFs for complex operations.
## Features

The major advantage of this library over other execution engines is that this library achieves zero-copy between
Python and its execution engine: there is no cost in using UDFs, UDAFs, and collecting the results to Python apart
from having to lock the GIL when running those operations.
- Execute queries using SQL or DataFrames against CSV, Parquet, and JSON data sources
- Queries are optimized using DataFusion's query optimizer
- Execute user-defined Python code from SQL
- Exchange data with Pandas and other DataFrame libraries that support PyArrow
- Serialize and deserialize query plans in Substrait format
- Experimental support for executing SQL queries against Polars, Pandas and cuDF

Its query engine, DataFusion, is written in [Rust](https://www.rust-lang.org/), which makes strong assumptions
about thread safety and lack of memory leaks.
## Comparison with other projects

There is also experimental support for executing SQL against other DataFrame libraries, such as Polars, Pandas, and any
drop-in replacements for Pandas.
Here is a comparison with similar projects that may help understand when DataFusion might be suitable and unsuitable
for your needs:

Technically, zero-copy is achieved via the [c data interface](https://arrow.apache.org/docs/format/CDataInterface.html).
- [DuckDB](http://www.duckdb.org/) is an open source, in-process analytic database. Like DataFusion, it supports
very fast execution, both from its custom file format and directly from Parquet files. Unlike DataFusion, it is
written in C/C++ and it is primarily used directly by users as a serverless database and query system rather than
as a library for building such database systems.

- [Polars](http://pola.rs/) is one of the fastest DataFrame libraries at the time of writing. Like DataFusion, it
is also written in Rust and uses the Apache Arrow memory model, but unlike DataFusion it does not provide full SQL
support, nor as many extension points.

## Example Usage

Expand All @@ -50,12 +58,8 @@ The Parquet file used in this example can be downloaded from the following page:

- https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

See the [examples](examples) directory for more examples.

```python
from datafusion import SessionContext
import pandas as pd
import pyarrow as pa

# Create a DataFusion context
ctx = SessionContext()
Expand All @@ -82,42 +86,30 @@ This produces the following chart:

![Chart](examples/chart.png)

## Substrait Support

`arrow-datafusion-python` has bindings which allow for serializing a SQL query to substrait protobuf format and deserializing substrait protobuf bytes to a DataFusion `LogicalPlan`, `PyLogicalPlan` in a Python context, which can then be executed.
## More Examples

### Example of Serializing/Deserializing Substrait Plans

```python
from datafusion import SessionContext
from datafusion import substrait as ss
See [examples](examples/README.md) for more information.

# Create a DataFusion context
ctx = SessionContext()
### Executing Queries with DataFusion

# Register table with context
ctx.register_parquet('aggregate_test_data', './testing/data/csv/aggregate_test_100.csv')
- [Query a Parquet file using SQL](./examples/sql-parquet.py)
- [Query a Parquet file using the DataFrame API](./examples/dataframe-parquet.py)
- [Run a SQL query and store the results in a Pandas DataFrame](./examples/sql-to-pandas.py)
- [Query PyArrow Data](./examples/query-pyarrow-data.py)

substrait_plan = ss.substrait.serde.serialize_to_plan("SELECT * FROM aggregate_test_data", ctx)
# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
### Running User-Defined Python Code

# Alternative serialization approaches
# type(substrait_bytes) -> <class 'list'>, at this point the bytes can be distributed to file, network, etc safely
# where they could subsequently be deserialized on the receiving end.
substrait_bytes = ss.substrait.serde.serialize_bytes("SELECT * FROM aggregate_test_data", ctx)
- [Register a Python UDF with DataFusion](./examples/python-udf.py)
- [Register a Python UDAF with DataFusion](./examples/python-udaf.py)

# Imagine here bytes would be read from network, file, etc ... for example brevity this is omitted and variable is simply reused
# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
substrait_plan = ss.substrait.serde.deserialize_bytes(substrait_bytes)
### Substrait Support

# type(df_logical_plan) -> <class 'substrait.LogicalPlan'>
df_logical_plan = ss.substrait.consumer.from_substrait_plan(ctx, substrait_plan)
- [Serialize query plans using Substrait](./examples/substrait.py)

# Back to Substrait Plan just for demonstration purposes
# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
substrait_plan = ss.substrait.producer.to_substrait_plan(df_logical_plan)
### Executing SQL against DataFrame Libraries (Experimental)

```
- [Executing SQL on Polars](./examples/sql-on-polars.py)
- [Executing SQL on Pandas](./examples/sql-on-pandas.py)

## How to install (from pip)

Expand Down
30 changes: 20 additions & 10 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,31 @@

# DataFusion Python Examples

Some of the examples rely on data which can be downloaded from the following site:
Some examples rely on data which can be downloaded from the following site:

- https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

Here is a direct link to the file used in the examples:

- https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet

## Examples
### Executing Queries with DataFusion

- [Query a Parquet file using SQL](./sql-parquet.py)
- [Query a Parquet file using the DataFrame API](./dataframe-parquet.py)
- [Run a SQL query and store the results in a Pandas DataFrame](./sql-to-pandas.py)
- [Query PyArrow Data](./query-pyarrow-data.py)
- [Register a Python UDF with DataFusion](./python-udf.py)
- [Register a Python UDAF with DataFusion](./python-udaf.py)
- [Executing SQL on Polars](./sql-on-polars.py)
- [Executing SQL on Pandas](./sql-on-pandas.py)
- [Query a Parquet file using SQL](./examples/sql-parquet.py)
- [Query a Parquet file using the DataFrame API](./examples/dataframe-parquet.py)
- [Run a SQL query and store the results in a Pandas DataFrame](./examples/sql-to-pandas.py)
- [Query PyArrow Data](./examples/query-pyarrow-data.py)

### Running User-Defined Python Code

- [Register a Python UDF with DataFusion](./examples/python-udf.py)
- [Register a Python UDAF with DataFusion](./examples/python-udaf.py)

### Substrait Support

- [Serialize query plans using Substrait](./examples/substrait.py)

### Executing SQL against DataFrame Libraries (Experimental)

- [Executing SQL on Polars](./examples/sql-on-polars.py)
- [Executing SQL on Pandas](./examples/sql-on-pandas.py)
6 changes: 3 additions & 3 deletions examples/dataframe-parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from datafusion import functions as f

ctx = SessionContext()
df = ctx.read_parquet(
"/mnt/bigdata/nyctaxi/yellow/2021/yellow_tripdata_2021-01.parquet"
).aggregate([f.col("passenger_count")], [f.count_star()])
df = ctx.read_parquet("yellow_tripdata_2021-01.parquet").aggregate(
[f.col("passenger_count")], [f.count_star()]
)
df.show()
4 changes: 1 addition & 3 deletions examples/sql-on-pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@


ctx = SessionContext()
ctx.register_parquet(
"taxi", "/mnt/bigdata/nyctaxi/yellow_tripdata_2021-01.parquet"
)
ctx.register_parquet("taxi", "yellow_tripdata_2021-01.parquet")
df = ctx.sql("select passenger_count from taxi")
print(df)
4 changes: 1 addition & 3 deletions examples/sql-on-polars.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@


ctx = SessionContext()
ctx.register_parquet(
"taxi", "/mnt/bigdata/nyctaxi/yellow_tripdata_2021-01.parquet"
)
ctx.register_parquet("taxi", "yellow_tripdata_2021-01.parquet")
df = ctx.sql(
"select passenger_count, count(*) from taxi group by passenger_count"
)
Expand Down
4 changes: 1 addition & 3 deletions examples/sql-parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
from datafusion import SessionContext

ctx = SessionContext()
ctx.register_parquet(
"taxi", "/mnt/bigdata/nyctaxi/yellow/2021/yellow_tripdata_2021-01.parquet"
)
ctx.register_parquet("taxi", "yellow_tripdata_2021-01.parquet")
df = ctx.sql(
"select passenger_count, count(*) from taxi where passenger_count is not null group by passenger_count order by passenger_count"
)
Expand Down
53 changes: 53 additions & 0 deletions examples/substrait.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from datafusion import SessionContext
from datafusion import substrait as ss


# Create a DataFusion context
ctx = SessionContext()

# Register table with context
ctx.register_parquet(
"aggregate_test_data", "./testing/data/csv/aggregate_test_100.csv"
)

substrait_plan = ss.substrait.serde.serialize_to_plan(
"SELECT * FROM aggregate_test_data", ctx
)
# type(substrait_plan) -> <class 'datafusion.substrait.plan'>

# Alternative serialization approaches
# type(substrait_bytes) -> <class 'list'>, at this point the bytes can be distributed to file, network, etc safely
# where they could subsequently be deserialized on the receiving end.
substrait_bytes = ss.substrait.serde.serialize_bytes(
"SELECT * FROM aggregate_test_data", ctx
)

# Imagine here bytes would be read from network, file, etc ... for example brevity this is omitted and variable is simply reused
# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
substrait_plan = ss.substrait.serde.deserialize_bytes(substrait_bytes)

# type(df_logical_plan) -> <class 'substrait.LogicalPlan'>
df_logical_plan = ss.substrait.consumer.from_substrait_plan(
ctx, substrait_plan
)

# Back to Substrait Plan just for demonstration purposes
# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
substrait_plan = ss.substrait.producer.to_substrait_plan(df_logical_plan)