Skip to content

Commit 275d6f2

Browse files
committed
Update DataFusion architecture documentation
1 parent 9798fbc commit 275d6f2

File tree

4 files changed

+158
-85
lines changed

4 files changed

+158
-85
lines changed

datafusion/core/src/datasource/empty.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! An empty plan that is usefull for testing and generating plans without mapping them to actual data.
18+
//! [`EmptyTable`] usefull for testing.
1919
2020
use std::any::Any;
2121
use std::sync::Arc;
@@ -30,7 +30,8 @@ use crate::logical_expr::Expr;
3030
use crate::physical_plan::project_schema;
3131
use crate::physical_plan::{empty::EmptyExec, ExecutionPlan};
3232

33-
/// A table with a schema but no data.
33+
/// An empty plan that is usefull for testing and generating plans
34+
/// without mapping them to actual data.
3435
pub struct EmptyTable {
3536
schema: SchemaRef,
3637
partitions: usize,

datafusion/core/src/datasource/memory.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! In-memory data source for presenting a `Vec<RecordBatch>` as a data source that can be
19-
//! queried by DataFusion. This allows data to be pre-loaded into memory and then
20-
//! repeatedly queried without incurring additional file I/O overhead.
18+
//! [`MemTable`] for querying `Vec<RecordBatch>` by DataFusion.
2119
2220
use futures::{StreamExt, TryStreamExt};
2321
use std::any::Any;
@@ -41,7 +39,10 @@ use crate::physical_plan::memory::MemoryExec;
4139
use crate::physical_plan::ExecutionPlan;
4240
use crate::physical_plan::{repartition::RepartitionExec, Partitioning};
4341

44-
/// In-memory table
42+
/// In-memory data source for presenting a `Vec<RecordBatch>` as a
43+
/// data source that can be queried by DataFusion. This allows data to
44+
/// be pre-loaded into memory and then repeatedly queried without
45+
/// incurring additional file I/O overhead.
4546
#[derive(Debug)]
4647
pub struct MemTable {
4748
schema: SchemaRef,

datafusion/core/src/lib.rs

Lines changed: 149 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,16 @@
2626
//! multi-threaded, vectorized execution engine, and partitioned data
2727
//! sources (Parquet, CSV, JSON, and Avro).
2828
//!
29-
//! DataFusion can also be easily customized to support additional
30-
//! data sources, query languages, functions, custom operators and
31-
//! more.
29+
//! DataFusion is designed for easy customization such as supporting
30+
//! additional data sources, query languages, functions, custom
31+
//! operators and more. See the [Architecture] section for more details.
3232
//!
3333
//! [DataFusion]: https://arrow.apache.org/datafusion/
3434
//! [Apache Arrow]: https://arrow.apache.org
3535
//! [use cases]: https://arrow.apache.org/datafusion/user-guide/introduction.html#use-cases
3636
//! [SQL]: https://arrow.apache.org/datafusion/user-guide/sql/index.html
3737
//! [`DataFrame`]: dataframe::DataFrame
38+
//! [Architecture]: #architecture
3839
//!
3940
//! # Examples
4041
//!
@@ -150,9 +151,13 @@
150151
//! [`AggregateUDF`]: physical_plan::udaf::AggregateUDF
151152
//! [`QueryPlanner`]: execution::context::QueryPlanner
152153
//! [`OptimizerRule`]: datafusion_optimizer::optimizer::OptimizerRule
153-
//! [`PhysicalOptimizerRule`]: datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule
154+
//! [`PhysicalOptimizerRule`]: crate::physical_optimizer::optimizer::PhysicalOptimizerRule
155+
//!
156+
//! # Architecture
154157
//!
155-
//! # Code Organization
158+
//! <!-- NOTE: The goal of this section is to provide a high level
159+
//! overview of how DataFusion is organized and then link to other
160+
//! sections of the docs with more details -->
156161
//!
157162
//! ## Overview Presentations
158163
//!
@@ -168,104 +173,170 @@
168173
//! - [March 2021]: The DataFusion architecture is described in _Query Engine Design and the Rust-Based DataFusion in Apache Arrow_: [recording](https://www.youtube.com/watch?v=K6eCAVEk4kU) (DataFusion content starts [~ 15 minutes in](https://www.youtube.com/watch?v=K6eCAVEk4kU&t=875s)) and [slides](https://www.slideshare.net/influxdata/influxdb-iox-tech-talks-query-engine-design-and-the-rustbased-datafusion-in-apache-arrow-244161934)
169174
//! - [February 2021]: How DataFusion is used within the Ballista Project is described in \*Ballista: Distributed Compute with Rust and Apache Arrow: [recording](https://www.youtube.com/watch?v=ZZHQaOap9pQ)
170175
//!
171-
//! ## Architecture
176+
//! ## Query Planning and Execution Overview
172177
//!
173-
//! DataFusion is a fully fledged query engine capable of performing complex operations.
174-
//! Specifically, when DataFusion receives an SQL query, there are different steps
175-
//! that it passes through until a result is obtained. Broadly, they are:
178+
//! ### SQL
176179
//!
177-
//! 1. The string is parsed to an Abstract syntax tree (AST) using [sqlparser].
178-
//! 2. The planner [`SqlToRel`] converts logical expressions on the AST to logical expressions [`Expr`]s.
179-
//! 3. The planner [`SqlToRel`] converts logical nodes on the AST to a [`LogicalPlan`].
180-
//! 4. [`OptimizerRule`]s are applied to the [`LogicalPlan`] to optimize it.
181-
//! 5. The [`LogicalPlan`] is converted to an [`ExecutionPlan`] by a [`PhysicalPlanner`]
182-
//! 6. The [`ExecutionPlan`]is executed against data through the [`SessionContext`]
180+
//! 1. The query string is parsed to an Abstract Syntax Tree (AST)
181+
//! using [sqlparser].
183182
//!
184-
//! With the [`DataFrame`] API, steps 1-3 are not used as the DataFrame builds the [`LogicalPlan`] directly.
183+
//! 2. The AST is converted to a [`LogicalPlan`] and logical
184+
//! expressions [`Expr`]s to compute the desired result by the
185+
//! [`SqlToRel`] planner.
185186
//!
186-
//! Phases 1-5 are typically cheap when compared to phase 6, and thus DataFusion puts a
187-
//! lot of effort to ensure that phase 6 runs efficiently and without errors.
187+
//! 3. The [`LogicalPlan`] is checked and rewritten to enforce
188+
//! semantic rules, such as type coercion, by [`AnalyzerRule`]s
188189
//!
189-
//! DataFusion's planning is divided in two main parts: logical planning and physical planning.
190+
//! 4. The [`LogicalPlan`] is rewritten by [`OptimizerRule`]s, such as
191+
//! projection and filter pushdown, to improve its efficiency.
190192
//!
191-
//! ### Logical planning
193+
//! 5. The [`LogicalPlan`] is converted to an [`ExecutionPlan`] by a
194+
//! [`PhysicalPlanner`]
192195
//!
193-
//! Logical planning yields [`LogicalPlan`]s and logical [`Expr`]
194-
//! expressions which are [`Schema`]aware and represent statements
195-
//! whose result is independent of how it should physically be
196-
//! executed.
196+
//! 6. The [`ExecutionPlan`] is rewrittten by
197+
//! [`PhysicalOptimizerRule`]s, such as sort and join selection, to
198+
//! improve its efficiency.
197199
//!
198-
//! A [`LogicalPlan`] is a Directed Acyclic Graph (DAG) of other
199-
//! [`LogicalPlan`]s, and each node contains [`Expr`]s. All of these
200-
//! are located in [`datafusion_expr`] module.
200+
//! 7. The [`ExecutionPlan`]is executed, producing one or more
201+
//! [`RecordBatchStream`] which yield arrow [`RecordBatch`]es.
202+
//!
203+
//! To process large datasets with many rows as efficiently as
204+
//! possible, DataFusion expends significant effort in steps 1-6,
205+
//! which are done once per plan.
206+
//!
207+
//! ### DataFrame
208+
//!
209+
//! When executing plans using the [`DataFrame`] API, the process is
210+
//! identical as with SQL, except that steps 1 and 2 are
211+
//! omitted. Instead, the DataFrame API builds the [`LogicalPlan`]
212+
//! directly using [`LogicalPlanBuilder`]. Systems that have their own
213+
//! custom query languages also typically build the desired
214+
//! [`LogicalPlan`] directly.
215+
//!
216+
//! ## Data Sources
217+
//!
218+
//! DataFusion includes several built in data sources for common use
219+
//! cases, and can be extended by implementing the [`TableProvider`]
220+
//! trait and can provides a stream of [`RecordBatch`]es.
221+
//!
222+
//! 1. [`ListingTable`]: Reads data from Parquet, JSON, CSV, or AVRO
223+
//! files. Supports single files or multiple files with HIVE style
224+
//! partitoning, optional compression, directly reading from remote
225+
//! object store and more.
226+
//!
227+
//! 2. [`MemTable`]: Reads data from in memory [`RecordBatch`]es.
228+
//!
229+
//! 3. [`StreamingTable`]: Reads data from potentially unbounded inputs.
201230
//!
202-
//! ### Physical planning
231+
//! [`ListingTable`]: crate::datasource::listing::ListingTable
232+
//! [`MemTable`]: crate::datasource::memory::MemTable
233+
//! [`StreamingTable`]: crate::datasource::streaming::StreamingTable
234+
//!
235+
//! ## Plans
236+
//!
237+
//! Logical planning yields [`LogicalPlan`]s nodes [`Expr`]
238+
//! expressions which are [`Schema`] aware and represent statements
239+
//! independent of how they are physically executed.
240+
//! A [`LogicalPlan`] is a Directed Acyclic Graph (DAG) of other
241+
//! [`LogicalPlan`]s, each potentially containing [`Expr`]s.
203242
//!
204243
//! An [`ExecutionPlan`] (sometimes referred to as a "physical plan")
205-
//! is a plan that can be executed against data. Compared to a
206-
//! logical plan, the physical plan has concrete information about how
207-
//! calculations should be performed (e.g. what Rust functions are
208-
//! used) and how data should be loaded into memory.
209-
//!
210-
//! [`ExecutionPlan`]s uses the [Apache Arrow] format as its in-memory
211-
//! representation of data, through the [arrow] crate. The [arrow]
212-
//! crate documents how the memory is physically represented.
213-
//!
214-
//! A [`ExecutionPlan`] is composed by nodes (which each implement the
215-
//! [`ExecutionPlan`] trait). Each node can contain physical
216-
//! expressions ([`PhysicalExpr`]) or aggreagate expressions
217-
//! ([`AggregateExpr`]). All of these are located in the
218-
//! [`physical_plan`] module.
219-
//!
220-
//! Broadly speaking,
221-
//!
222-
//! * an [`ExecutionPlan`] receives a partition number and
223-
//! asynchronously returns an iterator over [`RecordBatch`] (a
224-
//! node-specific struct that implements [`RecordBatchReader`])
225-
//! * a [`PhysicalExpr`] receives a [`RecordBatch`]
226-
//! and returns an [`Array`]
227-
//! * an [`AggregateExpr`] receives a series of [`RecordBatch`]es
228-
//! and returns a [`RecordBatch`] of a single row(*)
229-
//!
230-
//! (*) Technically, it aggregates the results on each partition and then merges the results into a single partition.
231-
//!
232-
//! The following physical nodes are currently implemented:
233-
//!
234-
//! * Projection: [`ProjectionExec`](physical_plan::projection::ProjectionExec)
235-
//! * Filter: [`FilterExec`](physical_plan::filter::FilterExec)
236-
//! * Grouped and non-grouped aggregations: [`AggregateExec`](physical_plan::aggregates::AggregateExec)
237-
//! * Hash Join: [`HashJoinExec`](physical_plan::joins::HashJoinExec)
238-
//! * Cross Join: [`CrossJoinExec`](physical_plan::joins::CrossJoinExec)
239-
//! * Sort Merge Join: [`SortMergeJoinExec`](physical_plan::joins::SortMergeJoinExec)
240-
//! * Union: [`UnionExec`](physical_plan::union::UnionExec)
241-
//! * Sort: [`SortExec`](physical_plan::sorts::sort::SortExec)
242-
//! * Coalesce partitions: [`CoalescePartitionsExec`](physical_plan::coalesce_partitions::CoalescePartitionsExec)
243-
//! * Limit: [`LocalLimitExec`](physical_plan::limit::LocalLimitExec) and [`GlobalLimitExec`](physical_plan::limit::GlobalLimitExec)
244-
//! * Scan CSV: [`CsvExec`](physical_plan::file_format::CsvExec)
245-
//! * Scan Parquet: [`ParquetExec`](physical_plan::file_format::ParquetExec)
246-
//! * Scan Avro: [`AvroExec`](physical_plan::file_format::AvroExec)
247-
//! * Scan newline-delimited JSON: [`NdJsonExec`](physical_plan::file_format::NdJsonExec)
248-
//! * Scan from memory: [`MemoryExec`](physical_plan::memory::MemoryExec)
249-
//! * Explain the plan: [`ExplainExec`](physical_plan::explain::ExplainExec)
250-
//!
251-
//! Future topics (coming soon):
252-
//! * Analyzer Rules
253-
//! * Resource management (memory and disk)
244+
//! is a plan that can be executed against data. It a DAG of
245+
//! other [`ExecutionPlan`]s each potentially containing
246+
//!
247+
//! 1. [`PhysicalExpr`]: Scalar functions
248+
//!
249+
//! 2. [`AggregateExpr`]: Aggregate functions
250+
//!
251+
//! 2. [`WindowExpr`]: Window functions
252+
//!
253+
//! Compared to a [`LogicalPlan`], an [`ExecutionPlan`] has concrete
254+
//! information about how to perform calculations (e.g. hash vs merge
255+
//! join), and how data flows during execution (e.g. partitioning and
256+
//! sortedness).
257+
//!
258+
//! [`PhysicalExpr`]: crate::physical_plan::PhysicalExpr
259+
//! [`AggregateExpr`]: crate::physical_plan::AggregateExpr
260+
//! [`WindowExpr`]: crate::physical_plan::WindowExpr
261+
//!
262+
//!
263+
//! ## Execution
264+
//!
265+
//! [`ExecutionPlan`]s process data using the [Apache Arrow] memory
266+
//! format, largely with functions from the [arrow] crate. Values are
267+
//! represented with [`ColumnarValue`], which are either single
268+
//! constant values ([`ScalarValue`]) or Arrow Arrays ([`ArrayRef`]).
269+
//!
270+
//! [`ColumnarValue`]: datafusion_expr::ColumnarValue
271+
//! [`ScalarValue`]: crate::scalar::ScalarValue
272+
//! [`ArrayRef`]: arrow::array::ArrayRef
273+
//!
274+
//!
275+
//! See the [implementors of `ExecutionPlan`] for a list of physical nodes that are implemented;
276+
//!
277+
//! [implementors of `ExecutionPlan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#implementors
278+
//!
279+
//! ## State Management and Configuration
280+
//!
281+
//! [`ConfigOptions`] contain options to control DataFusion's
282+
//! execution.
283+
//!
284+
//! [`ConfigOptions`]: datafusion_common::config::ConfigOptions
285+
//!
286+
//! The state required to execute queries is managed by the following
287+
//! structures:
288+
//!
289+
//! 1. [`SessionContext`]: State needed for create [`LogicalPlans`] such
290+
//! as the table definitions, and the function registries.
291+
//!
292+
//! 2. [`TaskContext`]: State needed for execution such as the
293+
//! [`MemoryPool`], [`DiskManager`], and [`ObjectStoreRegistry`].
294+
//!
295+
//! 3. [`ExecutionProps`]: Per-execution properties and data (such as
296+
//! starting timestamps, etc).
297+
//!
298+
//! [`SessionContext`]: crate::execution::context::SessionContext
299+
//! [`TaskContext`]: crate::execution::context::TaskContext
300+
//! [`ExecutionProps`]: crate::execution::context::ExecutionProps
301+
//!
302+
//! ### Resource Management
303+
//!
304+
//! The amount of memory and temporary local disk space used by
305+
//! DataFusion when running a plan can be controlled using the
306+
//! [`MemoryPool`] and [`DiskManager`].
307+
//!
308+
//! [`DiskManager`]: crate::execution::DiskManager
309+
//! [`MemoryPool`]: crate::execution::memory_pool::MemoryPool
310+
//! [`ObjectStoreRegistry`]: crate::datasource::object_store::ObjectStoreRegistry
311+
//!
312+
//! ## Crate Organization
313+
//!
314+
//! DataFusion is organized into multiple crates to enforce modularity
315+
//! and improve compilation times. The crates are:
316+
//!
317+
//! * [datafusion_common]: Common traits and types
318+
//! * [datafusion_execution]: State needed for execution
319+
//! * [datafusion_expr]: [`LogicalPlan`], [`Expr`] and related logical planning structure
320+
//! * [datafusion_optimizer]: [`OptimizerRule`]s and [`AnalyzerRule`]s
321+
//! * [datfusion_physical_expr]: [`PhysicalExpr`] and related expressions
322+
//! * [datafusion_sql]: [`SqlToRel`] SQL planner
254323
//!
255324
//! [sqlparser]: https://docs.rs/sqlparser/latest/sqlparser
256325
//! [`SqlToRel`]: sql::planner::SqlToRel
257326
//! [`Expr`]: datafusion_expr::Expr
258327
//! [`LogicalPlan`]: datafusion_expr::LogicalPlan
328+
//! [`AnalyzerRule`]: datafusion_optimizer::analyzer::AnalyzerRule
259329
//! [`OptimizerRule`]: optimizer::optimizer::OptimizerRule
260330
//! [`ExecutionPlan`]: physical_plan::ExecutionPlan
261331
//! [`PhysicalPlanner`]: physical_plan::PhysicalPlanner
332+
//! [`PhysicalOptimizerRule`]: datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule
262333
//! [`Schema`]: arrow::datatypes::Schema
263-
//! [`datafusion_expr`]: datafusion_expr
264334
//! [`PhysicalExpr`]: physical_plan::PhysicalExpr
265335
//! [`AggregateExpr`]: physical_plan::AggregateExpr
266336
//! [`RecordBatch`]: arrow::record_batch::RecordBatch
267337
//! [`RecordBatchReader`]: arrow::record_batch::RecordBatchReader
268338
//! [`Array`]: arrow::array::Array
339+
//! [`RecordBatchStream`]: crate::physical_plan::SendableRecordBatchStream
269340
270341
/// DataFusion crate version
271342
pub const DATAFUSION_VERSION: &str = env!("CARGO_PKG_VERSION");

docs/source/contributor-guide/architecture.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,5 @@ DataFusion's code structure and organization is described in the
2323
[crates.io documentation], to keep it as close to the source as
2424
possible. You can find the most up to date version in the [source code].
2525

26-
[crates.io documentation]: https://docs.rs/datafusion/latest/datafusion/index.html#code-organization
26+
[crates.io documentation]: https://docs.rs/datafusion/latest/datafusion/index.html#architecture
2727
[source code]: https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/src/lib.rs

0 commit comments

Comments
 (0)