Skip to content

Commit c316bfd

Browse files
author
Martin
committed
Add gentle introduction to Arrow and RecordBatches
This adds a new user guide page addressing issue apache#11336 to provide a gentle introduction to Apache Arrow and RecordBatches for DataFusion users. The guide includes: - Explanation of Arrow as a columnar specification - Visual comparison of row vs columnar storage (with ASCII diagrams) - Rationale for RecordBatch-based streaming (memory + vectorization) - Practical examples: reading files, building batches, querying with MemTable - Clear guidance on when Arrow knowledge is needed (extension points) - Links back to DataFrame API and library user guide - Link to DataFusion Invariants for contributors who want to go deeper This helps users understand the foundation without getting overwhelmed, addressing feedback from PR apache#11290 that DataFrame examples 'throw people into the deep end of Arrow.'
1 parent 556eb9b commit c316bfd

File tree

6 files changed

+259
-6
lines changed

6 files changed

+259
-6
lines changed

docs/source/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ To get started, see
113113
user-guide/crate-configuration
114114
user-guide/cli/index
115115
user-guide/dataframe
116+
user-guide/arrow-introduction
116117
user-guide/expressions
117118
user-guide/sql/index
118119
user-guide/configs

docs/source/library-user-guide/query-optimizer.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -440,13 +440,11 @@ When analyzing expressions, DataFusion runs boundary analysis using interval ari
440440
Consider a simple predicate like age > 18 AND age <= 25. The analysis flows as follows:
441441

442442
1. Context Initialization
443-
444443
- Begin with known column statistics
445444
- Set up initial boundaries based on column constraints
446445
- Initialize the shared analysis context
447446

448447
2. Expression Tree Walk
449-
450448
- Analyze each node in the expression tree
451449
- Propagate boundary information upward
452450
- Allow child nodes to influence parent boundaries
Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
<!---
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# A Gentle Introduction to Arrow & RecordBatches (for DataFusion users)
21+
22+
```{contents}
23+
:local:
24+
:depth: 2
25+
```
26+
27+
This guide helps DataFusion users understand Arrow and its RecordBatch format. While you may never need to work with Arrow directly, this knowledge becomes valuable when using DataFusion's extension points or debugging performance issues.
28+
29+
## Why Columnar? The Arrow Advantage
30+
31+
Apache Arrow is an open **specification** that defines how analytical data should be organized in memory. Think of it as a blueprint that different systems agree to follow, not a database or programming language.
32+
33+
### Row-oriented vs Columnar Layout
34+
35+
Traditional databases often store data row-by-row:
36+
37+
```
38+
Row 1: [id: 1, name: "Alice", age: 30]
39+
Row 2: [id: 2, name: "Bob", age: 25]
40+
Row 3: [id: 3, name: "Carol", age: 35]
41+
```
42+
43+
Arrow organizes the same data by column:
44+
45+
```
46+
Column "id": [1, 2, 3]
47+
Column "name": ["Alice", "Bob", "Carol"]
48+
Column "age": [30, 25, 35]
49+
```
50+
51+
Visual comparison:
52+
53+
```
54+
Traditional Row Storage: Arrow Columnar Storage:
55+
┌──────────────────┐ ┌─────────┬─────────┬──────────┐
56+
│ id │ name │ age │ │ id │ name │ age │
57+
├────┼──────┼──────┤ ├─────────┼─────────┼──────────┤
58+
│ 1 │ A │ 30 │ │ [1,2,3] │ [A,B,C] │[30,25,35]│
59+
│ 2 │ B │ 25 │ └─────────┴─────────┴──────────┘
60+
│ 3 │ C │ 35 │ ↑ ↑ ↑
61+
└──────────────────┘ Int32Array StringArray Int32Array
62+
(read entire rows) (process entire columns at once)
63+
```
64+
65+
### Why This Matters
66+
67+
- **Vectorized Execution**: Process entire columns at once using SIMD instructions
68+
- **Better Compression**: Similar values stored together compress more efficiently
69+
- **Cache Efficiency**: Scanning specific columns doesn't load unnecessary data
70+
- **Zero-Copy Data Sharing**: Systems can share Arrow data without conversion overhead
71+
72+
DataFusion, DuckDB, Polars, and Pandas all speak Arrow natively—they can exchange data without expensive serialization/deserialization steps.
73+
74+
## What is a RecordBatch? (And Why Batch?)
75+
76+
A **RecordBatch** represents a horizontal slice of a table—a collection of equal-length columnar arrays sharing the same schema.
77+
78+
### Why Not Process Entire Tables?
79+
80+
- **Memory Constraints**: A billion-row table might not fit in RAM
81+
- **Pipeline Processing**: Start producing results before reading all data
82+
- **Parallel Execution**: Different threads can process different batches
83+
84+
### Why Not Process Single Rows?
85+
86+
- **Lost Vectorization**: Can't use SIMD instructions on single values
87+
- **Poor Cache Utilization**: Jumping between rows defeats CPU cache optimization
88+
- **High Overhead**: Managing individual rows has significant bookkeeping costs
89+
90+
### RecordBatches: The Sweet Spot
91+
92+
RecordBatches typically contain thousands of rows—enough to benefit from vectorization but small enough to fit in memory. DataFusion streams these batches through operators, achieving both efficiency and scalability.
93+
94+
**Key Properties**:
95+
96+
- Arrays are immutable (create new batches to modify data)
97+
- NULL values tracked via efficient validity bitmaps
98+
- Variable-length data (strings, lists) use offset arrays for efficient access
99+
100+
## From files to Arrow
101+
102+
When you call `read_csv`, `read_parquet`, `read_json` or `read_avro`, DataFusion decodes those formats into Arrow arrays and streams them to operators as RecordBatches.
103+
104+
The example below shows how to read data from different file formats. Each `read_*` method returns a `DataFrame` that represents a query plan. When you call `.collect()`, DataFusion executes the plan and returns results as a `Vec<RecordBatch>`—the actual columnar data in Arrow format.
105+
106+
```rust
107+
use datafusion::prelude::*;
108+
109+
#[tokio::main]
110+
async fn main() -> datafusion::error::Result<()> {
111+
let ctx = SessionContext::new();
112+
113+
// Pick ONE of these per run (each returns a new DataFrame):
114+
let df = ctx.read_csv("data.csv", CsvReadOptions::new()).await?;
115+
// let df = ctx.read_parquet("data.parquet", ParquetReadOptions::default()).await?;
116+
// let df = ctx.read_json("data.ndjson", NdJsonReadOptions::default()).await?; // requires "json" feature
117+
// let df = ctx.read_avro("data.avro", AvroReadOptions::default()).await?; // requires "avro" feature
118+
119+
let batches = df
120+
.select(vec![col("id")])?
121+
.filter(col("id").gt(lit(10)))?
122+
.collect()
123+
.await?; // Vec<RecordBatch>
124+
125+
Ok(())
126+
}
127+
```
128+
129+
## Streaming Through the Engine
130+
131+
DataFusion processes queries as pull-based pipelines where operators request batches from their inputs. This streaming approach enables early result production, bounds memory usage (spilling to disk only when necessary), and naturally supports parallel execution across multiple CPU cores.
132+
133+
## Minimal: build a RecordBatch in Rust
134+
135+
Sometimes you need to create Arrow data programmatically rather than reading from files. This example shows the core building blocks: creating typed arrays (like `Int32Array` for numbers), defining a `Schema` that describes your columns, and assembling them into a `RecordBatch`. Notice how nullable columns can contain `None` values, tracked efficiently by Arrow's internal validity bitmap.
136+
137+
```rust
138+
use std::sync::Arc;
139+
use arrow_array::{ArrayRef, Int32Array, StringArray, RecordBatch};
140+
use arrow_schema::{DataType, Field, Schema};
141+
142+
fn make_batch() -> arrow_schema::Result<RecordBatch> {
143+
let ids = Int32Array::from(vec![1, 2, 3]);
144+
let names = StringArray::from(vec![Some("alice"), None, Some("carol")]);
145+
146+
let schema = Arc::new(Schema::new(vec![
147+
Field::new("id", DataType::Int32, false),
148+
Field::new("name", DataType::Utf8, true),
149+
]));
150+
151+
let cols: Vec<ArrayRef> = vec![Arc::new(ids), Arc::new(names)];
152+
RecordBatch::try_new(schema, cols)
153+
}
154+
```
155+
156+
## Query an in-memory batch with DataFusion
157+
158+
Once you have a `RecordBatch`, you can query it with DataFusion using a `MemTable`. This is useful for testing, processing data from external systems, or combining in-memory data with other sources. The example below creates a batch, wraps it in a `MemTable`, registers it as a named table, and queries it using SQL—demonstrating how Arrow serves as the bridge between your data and DataFusion's query engine.
159+
160+
```rust
161+
use std::sync::Arc;
162+
use arrow_array::{Int32Array, StringArray, RecordBatch};
163+
use arrow_schema::{DataType, Field, Schema};
164+
use datafusion::datasource::MemTable;
165+
use datafusion::prelude::*;
166+
167+
#[tokio::main]
168+
async fn main() -> datafusion::error::Result<()> {
169+
let ctx = SessionContext::new();
170+
171+
// build a batch
172+
let schema = Arc::new(Schema::new(vec![
173+
Field::new("id", DataType::Int32, false),
174+
Field::new("name", DataType::Utf8, true),
175+
]));
176+
let batch = RecordBatch::try_new(
177+
schema.clone(),
178+
vec![
179+
Arc::new(Int32Array::from(vec![1, 2, 3])) as _,
180+
Arc::new(StringArray::from(vec![Some("foo"), Some("bar"), None])) as _,
181+
],
182+
)?;
183+
184+
// expose it as a table
185+
let table = MemTable::try_new(schema, vec![vec![batch]])?;
186+
ctx.register_table("people", Arc::new(table))?;
187+
188+
// query it
189+
let df = ctx.sql("SELECT id, upper(name) AS name FROM people WHERE id >= 2").await?;
190+
df.show().await?;
191+
Ok(())
192+
}
193+
```
194+
195+
## Common Pitfalls
196+
197+
When working with Arrow and RecordBatches, watch out for these common issues:
198+
199+
- **Schema consistency**: All batches in a stream must share the exact same `Schema` (names, types, nullability, metadata)
200+
- **Immutability**: Arrays are immutable; to modify data, build new arrays or new RecordBatches
201+
- **Buffer management**: Variable-length types (UTF-8, binary, lists) use offsets + values; avoid manual buffer slicing unless you understand Arrow's internal invariants
202+
- **Type mismatches**: Mixed input types across files may require explicit casts before joins/unions
203+
- **Batch size assumptions**: Don't assume a particular batch size; always iterate until the stream ends
204+
205+
## When Arrow knowledge is needed (Extension Points)
206+
207+
For many use cases, you don't need to know about Arrow. DataFusion handles the conversion from formats like CSV and Parquet for you. However, Arrow becomes important when you use DataFusion's **extension points** to add your own custom functionality.
208+
209+
These APIs are where you can plug your own code into the engine, and they often operate directly on Arrow `RecordBatch` streams.
210+
211+
- **`TableProvider` (Custom Data Sources)**: This is the most common extension point. You can teach DataFusion how to read from any source—a custom file format, a network API, a different database—by implementing the `TableProvider` trait. Your implementation will be responsible for creating `RecordBatch`es to stream data into the engine.
212+
213+
- **User-Defined Functions (UDFs)**: If you need to perform a custom transformation on your data that isn't built into DataFusion, you can write a UDF. Your function will receive data as Arrow arrays (inside a `RecordBatch`) and must produce an Arrow array as its output.
214+
215+
- **Custom Optimizer Rules and Operators**: For advanced use cases, you can even add your own rules to the query optimizer or implement entirely new physical operators (like a special type of join). These also operate on the Arrow-based query plans.
216+
217+
In short, knowing Arrow is key to unlocking the full power of DataFusion's modular and extensible architecture.
218+
219+
## Next Steps: Working with DataFrames
220+
221+
Now that you understand Arrow's RecordBatch format, you're ready to work with DataFusion's high-level APIs. The [DataFrame API](dataframe.md) provides a familiar, ergonomic interface for building queries without needing to think about Arrow internals most of the time.
222+
223+
The DataFrame API handles all the Arrow details under the hood - reading files into RecordBatches, applying transformations, and producing results. You only need to drop down to the Arrow level when implementing custom data sources, UDFs, or other extension points.
224+
225+
**Recommended reading order:**
226+
227+
1. [DataFrame API](dataframe.md) - High-level query building interface
228+
2. [Library User Guide: DataFrame API](../library-user-guide/using-the-dataframe-api.md) - Detailed examples and patterns
229+
3. [Custom Table Providers](../library-user-guide/custom-table-providers.md) - When you need Arrow knowledge
230+
231+
## Further reading
232+
233+
- [Arrow introduction](https://arrow.apache.org/docs/format/Intro.html)
234+
- [Arrow columnar format (overview)](https://arrow.apache.org/docs/format/Columnar.html)
235+
- [Arrow IPC format (files and streams)](https://arrow.apache.org/docs/format/IPC.html)
236+
- [arrow_array::RecordBatch (docs.rs)](https://docs.rs/arrow-array/latest/arrow_array/struct.RecordBatch.html)
237+
- [Apache Arrow DataFusion: A Fast, Embeddable, Modular Analytic Query Engine](https://dl.acm.org/doi/10.1145/3626246.3653368)
238+
239+
- DataFusion + Arrow integration (docs.rs):
240+
- [datafusion::common::arrow](https://docs.rs/datafusion/latest/datafusion/common/arrow/index.html)
241+
- [datafusion::common::arrow::array](https://docs.rs/datafusion/latest/datafusion/common/arrow/array/index.html)
242+
- [datafusion::common::arrow::compute](https://docs.rs/datafusion/latest/datafusion/common/arrow/compute/index.html)
243+
- [SessionContext::read_csv](https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.read_csv)
244+
- [read_parquet](https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.read_parquet)
245+
- [read_json](https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.read_json)
246+
- [DataFrame::collect](https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html#method.collect)
247+
- [SendableRecordBatchStream](https://docs.rs/datafusion/latest/datafusion/physical_plan/type.SendableRecordBatchStream.html)
248+
- [TableProvider](https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html)
249+
- [MemTable](https://docs.rs/datafusion/latest/datafusion/datasource/struct.MemTable.html)
250+
- Deep dive (memory layout internals): [ArrayData on docs.rs](https://docs.rs/datafusion/latest/datafusion/common/arrow/array/struct.ArrayData.html)
251+
- Parquet format and pushdown: [Parquet format](https://parquet.apache.org/docs/file-format/), [Row group filtering / predicate pushdown](https://arrow.apache.org/docs/cpp/parquet.html#row-group-filtering)
252+
- For DataFusion contributors: [DataFusion Invariants](../contributor-guide/specification/invariants.md) - How DataFusion maintains type safety and consistency with Arrow's dynamic type system

docs/source/user-guide/concepts-readings-events.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ This is a list of DataFusion related blog posts, articles, and other resources.
7070
- **2024-10-16** [Blog: Candle Image Segmentation](https://www.letsql.com/posts/candle-image-segmentation/)
7171

7272
- **2024-09-23 → 2024-12-02** [Talks: Carnegie Mellon University: Database Building Blocks Seminar Series - Fall 2024](https://db.cs.cmu.edu/seminar2024/)
73-
7473
- **2024-11-12** [Video: Building InfluxDB 3.0 with the FDAP Stack: Apache Flight, DataFusion, Arrow and Parquet (Paul Dix)](https://www.youtube.com/watch?v=AGS4GNGDK_4)
7574

7675
- **2024-11-04** [Video: Synnada: Towards “Unified” Compute Engines: Opportunities and Challenges (Mehmet Ozan Kabak)](https://www.youtube.com/watch?v=z38WY9uZtt4)

docs/source/user-guide/dataframe.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
# DataFrame API
2121

22+
## DataFrame overview
23+
2224
A DataFrame represents a logical set of rows with the same named columns,
2325
similar to a [Pandas DataFrame] or [Spark DataFrame].
2426

@@ -109,6 +111,10 @@ async fn main() -> Result<()> {
109111
}
110112
```
111113

114+
---
115+
116+
# REFERENCES
117+
112118
[pandas dataframe]: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html
113119
[spark dataframe]: https://spark.apache.org/docs/latest/sql-programming-guide.html
114120
[`sessioncontext`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html

docs/source/user-guide/sql/scalar_functions.md

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2444,7 +2444,6 @@ date_bin(interval, expression, origin-timestamp)
24442444
- **interval**: Bin interval.
24452445
- **expression**: Time expression to operate on. Can be a constant, column, or function.
24462446
- **origin-timestamp**: Optional. Starting point used to determine bin boundaries. If not specified defaults 1970-01-01T00:00:00Z (the UNIX epoch in UTC). The following intervals are supported:
2447-
24482447
- nanoseconds
24492448
- microseconds
24502449
- milliseconds
@@ -2498,7 +2497,6 @@ date_part(part, expression)
24982497
#### Arguments
24992498

25002499
- **part**: Part of the date to return. The following date parts are supported:
2501-
25022500
- year
25032501
- quarter (emits value in inclusive range [1, 4] based on which quartile of the year the date is in)
25042502
- month
@@ -2538,7 +2536,6 @@ date_trunc(precision, expression)
25382536
#### Arguments
25392537

25402538
- **precision**: Time precision to truncate to. The following precisions are supported:
2541-
25422539
- year / YEAR
25432540
- quarter / QUARTER
25442541
- month / MONTH

0 commit comments

Comments
 (0)