Skip to content

Commit d9f1f31

Browse files
committed
Update DataFusion architecture documentation
1 parent 9798fbc commit d9f1f31

File tree

4 files changed

+166
-78
lines changed

4 files changed

+166
-78
lines changed

datafusion/core/src/datasource/empty.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! An empty plan that is usefull for testing and generating plans without mapping them to actual data.
18+
//! [`EmptyTable`] usefull for testing.
1919
2020
use std::any::Any;
2121
use std::sync::Arc;
@@ -30,7 +30,8 @@ use crate::logical_expr::Expr;
3030
use crate::physical_plan::project_schema;
3131
use crate::physical_plan::{empty::EmptyExec, ExecutionPlan};
3232

33-
/// A table with a schema but no data.
33+
/// An empty plan that is usefull for testing and generating plans
34+
/// without mapping them to actual data.
3435
pub struct EmptyTable {
3536
schema: SchemaRef,
3637
partitions: usize,

datafusion/core/src/datasource/memory.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! In-memory data source for presenting a `Vec<RecordBatch>` as a data source that can be
19-
//! queried by DataFusion. This allows data to be pre-loaded into memory and then
20-
//! repeatedly queried without incurring additional file I/O overhead.
18+
//! [`MemTable`] for querying `Vec<RecordBatch>` by DataFusion.
2119
2220
use futures::{StreamExt, TryStreamExt};
2321
use std::any::Any;
@@ -41,7 +39,10 @@ use crate::physical_plan::memory::MemoryExec;
4139
use crate::physical_plan::ExecutionPlan;
4240
use crate::physical_plan::{repartition::RepartitionExec, Partitioning};
4341

44-
/// In-memory table
42+
/// In-memory data source for presenting a `Vec<RecordBatch>` as a
43+
/// data source that can be queried by DataFusion. This allows data to
44+
/// be pre-loaded into memory and then repeatedly queried without
45+
/// incurring additional file I/O overhead.
4546
#[derive(Debug)]
4647
pub struct MemTable {
4748
schema: SchemaRef,

datafusion/core/src/lib.rs

Lines changed: 157 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,16 @@
2626
//! multi-threaded, vectorized execution engine, and partitioned data
2727
//! sources (Parquet, CSV, JSON, and Avro).
2828
//!
29-
//! DataFusion can also be easily customized to support additional
30-
//! data sources, query languages, functions, custom operators and
31-
//! more.
29+
//! DataFusion is designed for easy customization such as supporting
30+
//! additional data sources, query languages, functions, custom
31+
//! operators and more. See the [Architecture] section for more details.
3232
//!
3333
//! [DataFusion]: https://arrow.apache.org/datafusion/
3434
//! [Apache Arrow]: https://arrow.apache.org
3535
//! [use cases]: https://arrow.apache.org/datafusion/user-guide/introduction.html#use-cases
3636
//! [SQL]: https://arrow.apache.org/datafusion/user-guide/sql/index.html
3737
//! [`DataFrame`]: dataframe::DataFrame
38+
//! [Architecture]: #architecture
3839
//!
3940
//! # Examples
4041
//!
@@ -150,9 +151,13 @@
150151
//! [`AggregateUDF`]: physical_plan::udaf::AggregateUDF
151152
//! [`QueryPlanner`]: execution::context::QueryPlanner
152153
//! [`OptimizerRule`]: datafusion_optimizer::optimizer::OptimizerRule
153-
//! [`PhysicalOptimizerRule`]: datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule
154+
//! [`PhysicalOptimizerRule`]: crate::physical_optimizer::optimizer::PhysicalOptimizerRule
155+
//!
156+
//! # Architecture
154157
//!
155-
//! # Code Organization
158+
//! <!-- NOTE: The goal of this section is to provide a high level
159+
//! overview of how DataFusion is organized and then link to other
160+
//! sections of the docs with more details -->
156161
//!
157162
//! ## Overview Presentations
158163
//!
@@ -168,104 +173,185 @@
168173
//! - [March 2021]: The DataFusion architecture is described in _Query Engine Design and the Rust-Based DataFusion in Apache Arrow_: [recording](https://www.youtube.com/watch?v=K6eCAVEk4kU) (DataFusion content starts [~ 15 minutes in](https://www.youtube.com/watch?v=K6eCAVEk4kU&t=875s)) and [slides](https://www.slideshare.net/influxdata/influxdb-iox-tech-talks-query-engine-design-and-the-rustbased-datafusion-in-apache-arrow-244161934)
169174
//! - [February 2021]: How DataFusion is used within the Ballista Project is described in \*Ballista: Distributed Compute with Rust and Apache Arrow: [recording](https://www.youtube.com/watch?v=ZZHQaOap9pQ)
170175
//!
171-
//! ## Architecture
176+
//! ## Query Planning and Execution Overview
177+
//!
178+
//! ### SQL
179+
//!
180+
//! 1. The query string is parsed to an Abstract Syntax Tree (AST)
181+
//! using [sqlparser].
182+
//!
183+
//! 2. The AST is converted to a [`LogicalPlan`] and logical
184+
//! expressions [`Expr`]s to compute the desired result by the
185+
//! [`SqlToRel`] planner.
186+
//!
187+
//! 3. The [`LogicalPlan`] is checked and rewritten to enforce
188+
//! semantic rules, such as type coercion, by [`AnalyzerRule`]s
189+
//!
190+
//! 4. The [`LogicalPlan`] is rewritten by [`OptimizerRule`]s, such as
191+
//! projection and filter pushdown, to improve its efficiency.
192+
//!
193+
//! 5. The [`LogicalPlan`] is converted to an [`ExecutionPlan`] by a
194+
//! [`PhysicalPlanner`]
195+
//!
196+
//! 6. The [`ExecutionPlan`] is rewrittten by
197+
//! [`PhysicalOptimizerRule`]s, such as sort and join selection, to
198+
//! improve its efficiency.
199+
//!
200+
//! 7. The [`ExecutionPlan`]is executed, producing one or more
201+
//! [`RecordBatchStream`] which yield arrow [`RecordBatch`]es.
202+
//!
203+
//! To process large datasets with many rows as efficiently as
204+
//! possible, DataFusion expends significant effort in steps 1-6,
205+
//! which are done once per plan.
172206
//!
173-
//! DataFusion is a fully fledged query engine capable of performing complex operations.
174-
//! Specifically, when DataFusion receives an SQL query, there are different steps
175-
//! that it passes through until a result is obtained. Broadly, they are:
207+
//! ### DataFrame
176208
//!
177-
//! 1. The string is parsed to an Abstract syntax tree (AST) using [sqlparser].
178-
//! 2. The planner [`SqlToRel`] converts logical expressions on the AST to logical expressions [`Expr`]s.
179-
//! 3. The planner [`SqlToRel`] converts logical nodes on the AST to a [`LogicalPlan`].
180-
//! 4. [`OptimizerRule`]s are applied to the [`LogicalPlan`] to optimize it.
181-
//! 5. The [`LogicalPlan`] is converted to an [`ExecutionPlan`] by a [`PhysicalPlanner`]
182-
//! 6. The [`ExecutionPlan`]is executed against data through the [`SessionContext`]
209+
//! When executing plans using the [`DataFrame`] API, the process is
210+
//! identical as with SQL, except that steps 1 and 2 are
211+
//! omitted. Instead, the DataFrame API builds the [`LogicalPlan`]
212+
//! directly using [`LogicalPlanBuilder`]. Systems that have their own
213+
//! custom query languages also typically build the desired
214+
//! [`LogicalPlan`] directly.
183215
//!
184-
//! With the [`DataFrame`] API, steps 1-3 are not used as the DataFrame builds the [`LogicalPlan`] directly.
216+
//! ## Data Sources
185217
//!
186-
//! Phases 1-5 are typically cheap when compared to phase 6, and thus DataFusion puts a
187-
//! lot of effort to ensure that phase 6 runs efficiently and without errors.
218+
//! DataFusion includes several built in data sources for common use
219+
//! cases, and can be extended with any source that implements the
220+
//! [`TableProvider`] trait and can provides a stream of
221+
//! [`RecordBatch`]es.
188222
//!
189-
//! DataFusion's planning is divided in two main parts: logical planning and physical planning.
223+
//! 1. [`ListingTable`]: Read data from Parquet, JSON, CSV, or AVRO
224+
//! files. Supports single files or multiple files with HIVE style
225+
//! aprtitoning, optional compression, directly reading from remote
226+
//! object store and more.
190227
//!
191-
//! ### Logical planning
228+
//! 2. [`MemTable`]: Reads data from in memory [`RecordBatch`]es.
229+
//!
230+
//! 3. [`StreamingTable`]: Reads data from potentially unbounded inputs.
231+
//!
232+
//! [`ListingTable`]: crate::datasource::listing::ListingTable
233+
//! [`MemTable`]: crate::datasource::memory::MemTable
234+
//! [`StreamingTable`]: crate::datasource::streaming::StreamingTable
235+
//!
236+
//! ## Plans
237+
//!
238+
//! ### [`LogicalPlan`]
192239
//!
193240
//! Logical planning yields [`LogicalPlan`]s and logical [`Expr`]
194-
//! expressions which are [`Schema`]aware and represent statements
241+
//! expressions which are [`Schema`] aware and represent statements
195242
//! whose result is independent of how it should physically be
196243
//! executed.
197244
//!
198245
//! A [`LogicalPlan`] is a Directed Acyclic Graph (DAG) of other
199246
//! [`LogicalPlan`]s, and each node contains [`Expr`]s. All of these
200247
//! are located in [`datafusion_expr`] module.
201248
//!
202-
//! ### Physical planning
249+
//! ### [`ExecutionPlan`] / Physical Plans
203250
//!
204251
//! An [`ExecutionPlan`] (sometimes referred to as a "physical plan")
205252
//! is a plan that can be executed against data. Compared to a
206-
//! logical plan, the physical plan has concrete information about how
207-
//! calculations should be performed (e.g. what Rust functions are
208-
//! used) and how data should be loaded into memory.
209-
//!
210-
//! [`ExecutionPlan`]s uses the [Apache Arrow] format as its in-memory
211-
//! representation of data, through the [arrow] crate. The [arrow]
212-
//! crate documents how the memory is physically represented.
213-
//!
214-
//! A [`ExecutionPlan`] is composed by nodes (which each implement the
215-
//! [`ExecutionPlan`] trait). Each node can contain physical
216-
//! expressions ([`PhysicalExpr`]) or aggreagate expressions
217-
//! ([`AggregateExpr`]). All of these are located in the
218-
//! [`physical_plan`] module.
219-
//!
220-
//! Broadly speaking,
221-
//!
222-
//! * an [`ExecutionPlan`] receives a partition number and
223-
//! asynchronously returns an iterator over [`RecordBatch`] (a
224-
//! node-specific struct that implements [`RecordBatchReader`])
225-
//! * a [`PhysicalExpr`] receives a [`RecordBatch`]
226-
//! and returns an [`Array`]
227-
//! * an [`AggregateExpr`] receives a series of [`RecordBatch`]es
228-
//! and returns a [`RecordBatch`] of a single row(*)
229-
//!
230-
//! (*) Technically, it aggregates the results on each partition and then merges the results into a single partition.
231-
//!
232-
//! The following physical nodes are currently implemented:
233-
//!
234-
//! * Projection: [`ProjectionExec`](physical_plan::projection::ProjectionExec)
235-
//! * Filter: [`FilterExec`](physical_plan::filter::FilterExec)
236-
//! * Grouped and non-grouped aggregations: [`AggregateExec`](physical_plan::aggregates::AggregateExec)
237-
//! * Hash Join: [`HashJoinExec`](physical_plan::joins::HashJoinExec)
238-
//! * Cross Join: [`CrossJoinExec`](physical_plan::joins::CrossJoinExec)
239-
//! * Sort Merge Join: [`SortMergeJoinExec`](physical_plan::joins::SortMergeJoinExec)
240-
//! * Union: [`UnionExec`](physical_plan::union::UnionExec)
241-
//! * Sort: [`SortExec`](physical_plan::sorts::sort::SortExec)
242-
//! * Coalesce partitions: [`CoalescePartitionsExec`](physical_plan::coalesce_partitions::CoalescePartitionsExec)
243-
//! * Limit: [`LocalLimitExec`](physical_plan::limit::LocalLimitExec) and [`GlobalLimitExec`](physical_plan::limit::GlobalLimitExec)
244-
//! * Scan CSV: [`CsvExec`](physical_plan::file_format::CsvExec)
245-
//! * Scan Parquet: [`ParquetExec`](physical_plan::file_format::ParquetExec)
246-
//! * Scan Avro: [`AvroExec`](physical_plan::file_format::AvroExec)
247-
//! * Scan newline-delimited JSON: [`NdJsonExec`](physical_plan::file_format::NdJsonExec)
248-
//! * Scan from memory: [`MemoryExec`](physical_plan::memory::MemoryExec)
249-
//! * Explain the plan: [`ExplainExec`](physical_plan::explain::ExplainExec)
250-
//!
251-
//! Future topics (coming soon):
252-
//! * Analyzer Rules
253-
//! * Resource management (memory and disk)
253+
//! [`LogicalPlan`], an [`ExecutionPlan`] has concrete information
254+
//! about how to perform calculations (e.g. algorithms to use), and
255+
//! how data flows during execution (e.g. partitioning and
256+
//! sortedness). It is a DAG of other [`ExecutionPlan`]s
257+
//! and each node can
258+
//!
259+
//! 1. [`PhysicalExpr`]: Scalar functions
260+
//!
261+
//! 2. [`AggregateExpr`]: Aggregate functions
262+
//!
263+
//! 2. [`WindowExpr`]: Window functions
264+
//!
265+
//! [`PhysicalExpr`]: crate::physical_plan::PhysicalExpr
266+
//! [`AggregateExpr`]: crate::physical_plan::AggregateExpr
267+
//! [`WindowExpr`]: crate::physical_plan::WindowExpr
268+
//!
269+
//!
270+
//! ### Execution
271+
//!
272+
//! [`ExecutionPlan`]s process data using the [Apache Arrow] memory
273+
//! format, largely with functions from the [arrow] crate. Values are
274+
//! represented with [`ColumnarValue`], which are either:
275+
//!
276+
//! [`ScalarValue`]: single constant values [`ScalarValue`]
277+
//! [`ArrayRef`]: Arrow arrays
278+
//!
279+
//! [`ColumnarValue`]: datafusion_expr::ColumnarValue
280+
//! [`ScalarValue`]: crate::scalar::ScalarValue
281+
//! [`ArrayRef`]: arrow::array::ArrayRef
282+
//!
283+
//!
284+
//! See the [implementors of `ExecutionPlan`] for a list of physical nodes that are implemented;
285+
//!
286+
//! [implementors of `ExecutionPlan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#implementors
287+
//!
288+
//! ## State Management and Configuration
289+
//!
290+
//! ### Configuration
291+
//!
292+
//! [`ConfigOptions`] contain options to control DataFusion's
293+
//! execution.
294+
//!
295+
//! [`ConfigOptions`]: datafusion_common::config::ConfigOptions
296+
//!
297+
//! ### State Management
298+
//!
299+
//! The state required to execute queries is managed by several structures:
300+
//!
301+
//! 1. [`SessionContext`]: State needed for create logical plans such
302+
//! as the table definitions, and the function registries. In general
303+
//! a [`SessionContext`] is required to create [`LogicalPlan`]s.
304+
//!
305+
//! 2. [`TaskContext`]: State needed for execution such as the
306+
//! [`MemoryPool`], [`DiskManager`], and [`ObjectStoreRegistry`].
307+
//!
308+
//! 3. [`ExecutionProps`]: Per-execution properties and data (such as
309+
//! starting timestamps, etc).
310+
//!
311+
//! [`SessionContext`]: crate::execution::context::SessionContext
312+
//! [`TaskContext`]: crate::execution::context::TaskContext
313+
//! [`ExecutionProps`]: crate::execution::context::ExecutionProps
314+
//!
315+
//! ### Resource Management
316+
//!
317+
//! The amount of memory and temporary local disk space used by
318+
//! DataFusion when running a plan can be controlled using the
319+
//! [`MemoryPool`] and [`DiskManager`].
320+
//!
321+
//! [`DiskManager`]: crate::execution::DiskManager
322+
//! [`MemoryPool`]: crate::execution::memory_pool::MemoryPool
323+
//! [`ObjectStoreRegistry`]: crate::datasource::object_store::ObjectStoreRegistry
324+
//!
325+
//! ## Crate Organization
326+
//!
327+
//! DataFusion is organized into multiple crates to enforce modularity
328+
//! and improve compilation times. The crates are:
329+
//!
330+
//! * [datafusion_common]: Common traits and types
331+
//! * [datafusion_execution]: State needed for execution
332+
//! * [datafusion_expr]: [`LogicalPlan`], [`Expr`] and related logical planning structure
333+
//! * [datafusion-jit]: Just In Time support
334+
//! * [datafusion_optimizer]: [`OptimizerRule`]s and [`AnalyzerRule`]s
335+
//! * [datfusion-physical-expr]: [`PhysicalExpr`] and related expressions
336+
//! * [datfusion-row]: Specialized row format
337+
//! * [datafusion_sql]: [`SqlToRel`] SQL planner
254338
//!
255339
//! [sqlparser]: https://docs.rs/sqlparser/latest/sqlparser
256340
//! [`SqlToRel`]: sql::planner::SqlToRel
257341
//! [`Expr`]: datafusion_expr::Expr
258342
//! [`LogicalPlan`]: datafusion_expr::LogicalPlan
343+
//! [`AnalyzerRule`]: datafusion_optimizer::analyzer::AnalyzerRule
259344
//! [`OptimizerRule`]: optimizer::optimizer::OptimizerRule
260345
//! [`ExecutionPlan`]: physical_plan::ExecutionPlan
261346
//! [`PhysicalPlanner`]: physical_plan::PhysicalPlanner
347+
//! [`PhysicalOptimizerRule`]: datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule
262348
//! [`Schema`]: arrow::datatypes::Schema
263-
//! [`datafusion_expr`]: datafusion_expr
264349
//! [`PhysicalExpr`]: physical_plan::PhysicalExpr
265350
//! [`AggregateExpr`]: physical_plan::AggregateExpr
266351
//! [`RecordBatch`]: arrow::record_batch::RecordBatch
267352
//! [`RecordBatchReader`]: arrow::record_batch::RecordBatchReader
268353
//! [`Array`]: arrow::array::Array
354+
//! [`RecordBatchStream`]: crate::physical_plan::SendableRecordBatchStream
269355
270356
/// DataFusion crate version
271357
pub const DATAFUSION_VERSION: &str = env!("CARGO_PKG_VERSION");

docs/source/contributor-guide/architecture.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,5 @@ DataFusion's code structure and organization is described in the
2323
[crates.io documentation], to keep it as close to the source as
2424
possible. You can find the most up to date version in the [source code].
2525

26-
[crates.io documentation]: https://docs.rs/datafusion/latest/datafusion/index.html#code-organization
26+
[crates.io documentation]: https://docs.rs/datafusion/latest/datafusion/index.html#architecture
2727
[source code]: https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/src/lib.rs

0 commit comments

Comments
 (0)