Skip to content

Commit d3ae1a7

Browse files
cj-zhukovSergey Zhukov
andauthored
Consolidate dataframe examples (apache#18142) (apache#18862)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - part of #apache#18142. ## Rationale for this change This PR is for consolidating all the `dataframe` examples (dataframe, default_column_values, deserialize_to_struct) into a single example binary. We are agreed on the pattern and we can apply it to the remaining examples <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Sergey Zhukov <szhukov@aligntech.com>
1 parent fbe1ae6 commit d3ae1a7

File tree

6 files changed

+140
-39
lines changed

6 files changed

+140
-39
lines changed

datafusion-examples/README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ cd datafusion-examples/examples
4141

4242
# Run the `dataframe` example:
4343
# ... use the equivalent for other examples
44-
cargo run --example dataframe
44+
cargo run --example dataframe -- dataframe
4545
```
4646

4747
## Single Process
@@ -61,10 +61,10 @@ cargo run --example dataframe
6161
- [`examples/custom_data_source/custom_file_casts.rs`](examples/custom_data_source/custom_file_casts.rs): Implement custom casting rules to adapt file schemas
6262
- [`examples/custom_data_source/custom_file_format.rs`](examples/custom_data_source/custom_file_format.rs): Write data to a custom file format
6363
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3
64-
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data, including multiple subqueries. Also demonstrates the various methods to write out a DataFrame to a table, parquet file, csv file, and json file.
64+
- [`examples/dataframe/dataframe.rs`](examples/dataframe/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data, including multiple subqueries. Also demonstrates the various methods to write out a DataFrame to a table, parquet file, csv file, and json file.
6565
- [`examples/builtin_functions/date_time`](examples/builtin_functions/date_time.rs): Examples of date-time related functions and queries
66-
- [`default_column_values.rs`](examples/default_column_values.rs): Implement custom default value handling for missing columns using field metadata and PhysicalExprAdapter
67-
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results (Arrow ArrayRefs) into Rust structs
66+
- [`examples/custom_data_source/default_column_values.rs`](examples/custom_data_source/default_column_values.rs): Implement custom default value handling for missing columns using field metadata and PhysicalExprAdapter
67+
- [`examples/dataframe/deserialize_to_struct.rs`](examples/dataframe/deserialize_to_struct.rs): Convert query results (Arrow ArrayRefs) into Rust structs
6868
- [`examples/query_planning/expr_api.rs`](examples/query_planning/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s
6969
- [`examples/custom_data_source/file_stream_provider.rs`](examples/custom_data_source/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks.
7070
- [`flight/sql_server.rs`](examples/flight/sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from Flight and and FlightSQL (e.g. JDBC) clients

datafusion-examples/examples/default_column_values.rs renamed to datafusion-examples/examples/custom_data_source/default_column_values.rs

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
//! See `main.rs` for how to run it.
19+
1820
use std::any::Any;
1921
use std::collections::HashMap;
2022
use std::sync::Arc;
@@ -52,25 +54,23 @@ use object_store::{ObjectStore, PutPayload};
5254
// Metadata key for storing default values in field metadata
5355
const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value";
5456

55-
// Example showing how to implement custom default value handling for missing columns
56-
// using field metadata and PhysicalExprAdapter.
57-
//
58-
// This example demonstrates how to:
59-
// 1. Store default values in field metadata using a constant key
60-
// 2. Create a custom PhysicalExprAdapter that reads these defaults
61-
// 3. Inject default values for missing columns in filter predicates
62-
// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema adaptation
63-
// 5. Wrap string default values in cast expressions for proper type conversion
64-
//
65-
// Important: PhysicalExprAdapter is specifically designed for rewriting filter predicates
66-
// that get pushed down to file scans. For handling missing columns in projections,
67-
// other mechanisms in DataFusion are used (like SchemaAdapter).
68-
//
69-
// The metadata-based approach provides a flexible way to store default values as strings
70-
// and cast them to the appropriate types at query time.
71-
72-
#[tokio::main]
73-
async fn main() -> Result<()> {
57+
/// Example showing how to implement custom default value handling for missing columns
58+
/// using field metadata and PhysicalExprAdapter.
59+
///
60+
/// This example demonstrates how to:
61+
/// 1. Store default values in field metadata using a constant key
62+
/// 2. Create a custom PhysicalExprAdapter that reads these defaults
63+
/// 3. Inject default values for missing columns in filter predicates
64+
/// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema adaptation
65+
/// 5. Wrap string default values in cast expressions for proper type conversion
66+
///
67+
/// Important: PhysicalExprAdapter is specifically designed for rewriting filter predicates
68+
/// that get pushed down to file scans. For handling missing columns in projections,
69+
/// other mechanisms in DataFusion are used (like SchemaAdapter).
70+
///
71+
/// The metadata-based approach provides a flexible way to store default values as strings
72+
/// and cast them to the appropriate types at query time.
73+
pub async fn default_column_values() -> Result<()> {
7474
println!("=== Creating example data with missing columns and default values ===");
7575

7676
// Create sample data where the logical schema has more columns than the physical schema
@@ -85,11 +85,10 @@ async fn main() -> Result<()> {
8585
.build();
8686

8787
let mut writer =
88-
ArrowWriter::try_new(&mut buf, physical_schema.clone(), Some(props))
89-
.expect("creating writer");
88+
ArrowWriter::try_new(&mut buf, physical_schema.clone(), Some(props))?;
9089

91-
writer.write(&batch).expect("Writing batch");
92-
writer.close().unwrap();
90+
writer.write(&batch)?;
91+
writer.close()?;
9392
buf
9493
};
9594
let path = Path::from("example.parquet");

datafusion-examples/examples/custom_data_source/main.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
//!
2222
//! ## Usage
2323
//! ```bash
24-
//! cargo run --example custom_data_source -- [csv_json_opener|csv_sql_streaming|custom_datasource|custom_file_casts|custom_file_format|file_stream_provider]
24+
//! cargo run --example custom_data_source -- [csv_json_opener|csv_sql_streaming|custom_datasource|custom_file_casts|custom_file_format|default_column_values|file_stream_provider]
2525
//! ```
2626
//!
2727
//! Each subcommand runs a corresponding example:
@@ -30,13 +30,15 @@
3030
//! - `custom_datasource` — run queries against a custom datasource (TableProvider)
3131
//! - `custom_file_casts` — implement custom casting rules to adapt file schemas
3232
//! - `custom_file_format` — write data to a custom file format
33+
//! - `default_column_values` — implement custom default value handling for missing columns using field metadata and PhysicalExprAdapter
3334
//! - `file_stream_provider` — run a query on FileStreamProvider which implements StreamProvider for reading and writing to arbitrary stream sources/sinks
3435
3536
mod csv_json_opener;
3637
mod csv_sql_streaming;
3738
mod custom_datasource;
3839
mod custom_file_casts;
3940
mod custom_file_format;
41+
mod default_column_values;
4042
mod file_stream_provider;
4143

4244
use std::str::FromStr;
@@ -49,6 +51,7 @@ enum ExampleKind {
4951
CustomDatasource,
5052
CustomFileCasts,
5153
CustomFileFormat,
54+
DefaultColumnValues,
5255
FileFtreamProvider,
5356
}
5457

@@ -60,6 +63,7 @@ impl AsRef<str> for ExampleKind {
6063
Self::CustomDatasource => "custom_datasource",
6164
Self::CustomFileCasts => "custom_file_casts",
6265
Self::CustomFileFormat => "custom_file_format",
66+
Self::DefaultColumnValues => "default_column_values",
6367
Self::FileFtreamProvider => "file_stream_provider",
6468
}
6569
}
@@ -75,19 +79,21 @@ impl FromStr for ExampleKind {
7579
"custom_datasource" => Ok(Self::CustomDatasource),
7680
"custom_file_casts" => Ok(Self::CustomFileCasts),
7781
"custom_file_format" => Ok(Self::CustomFileFormat),
82+
"default_column_values" => Ok(Self::DefaultColumnValues),
7883
"file_stream_provider" => Ok(Self::FileFtreamProvider),
7984
_ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))),
8085
}
8186
}
8287
}
8388

8489
impl ExampleKind {
85-
const ALL: [Self; 6] = [
90+
const ALL: [Self; 7] = [
8691
Self::CsvJsonOpener,
8792
Self::CsvSqlStreaming,
8893
Self::CustomDatasource,
8994
Self::CustomFileCasts,
9095
Self::CustomFileFormat,
96+
Self::DefaultColumnValues,
9197
Self::FileFtreamProvider,
9298
];
9399

@@ -117,6 +123,9 @@ async fn main() -> Result<()> {
117123
ExampleKind::CustomDatasource => custom_datasource::custom_datasource().await?,
118124
ExampleKind::CustomFileCasts => custom_file_casts::custom_file_casts().await?,
119125
ExampleKind::CustomFileFormat => custom_file_format::custom_file_format().await?,
126+
ExampleKind::DefaultColumnValues => {
127+
default_column_values::default_column_values().await?
128+
}
120129
ExampleKind::FileFtreamProvider => {
121130
file_stream_provider::file_stream_provider().await?
122131
}

datafusion-examples/examples/dataframe.rs renamed to datafusion-examples/examples/dataframe/dataframe.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
//! See `main.rs` for how to run it.
19+
1820
use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray, StringViewArray};
1921
use datafusion::arrow::datatypes::{DataType, Field, Schema};
2022
use datafusion::catalog::MemTable;
2123
use datafusion::common::config::CsvOptions;
2224
use datafusion::common::parsers::CompressionTypeVariant;
23-
use datafusion::common::DataFusionError;
2425
use datafusion::common::ScalarValue;
2526
use datafusion::dataframe::DataFrameWriteOptions;
2627
use datafusion::error::Result;
@@ -39,6 +40,7 @@ use tempfile::tempdir;
3940
/// * [read_parquet]: execute queries against parquet files
4041
/// * [read_csv]: execute queries against csv files
4142
/// * [read_memory]: execute queries against in-memory arrow data
43+
/// * [read_memory_macro]: execute queries against in-memory arrow data using macro
4244
///
4345
/// # Writing out to local storage
4446
///
@@ -53,12 +55,7 @@ use tempfile::tempdir;
5355
/// * [where_scalar_subquery]: execute a scalar subquery
5456
/// * [where_in_subquery]: execute a subquery with an IN clause
5557
/// * [where_exist_subquery]: execute a subquery with an EXISTS clause
56-
///
57-
/// # Querying data
58-
///
59-
/// * [query_to_date]: execute queries against parquet files
60-
#[tokio::main]
61-
async fn main() -> Result<()> {
58+
pub async fn dataframe_example() -> Result<()> {
6259
env_logger::init();
6360
// The SessionContext is the main high level API for interacting with DataFusion
6461
let ctx = SessionContext::new();
@@ -199,7 +196,7 @@ async fn read_memory_macro() -> Result<()> {
199196
/// 2. Write out a DataFrame to a parquet file
200197
/// 3. Write out a DataFrame to a csv file
201198
/// 4. Write out a DataFrame to a json file
202-
async fn write_out(ctx: &SessionContext) -> std::result::Result<(), DataFusionError> {
199+
async fn write_out(ctx: &SessionContext) -> Result<()> {
203200
let array = StringViewArray::from(vec!["a", "b", "c"]);
204201
let schema = Arc::new(Schema::new(vec![Field::new(
205202
"tablecol1",

datafusion-examples/examples/deserialize_to_struct.rs renamed to datafusion-examples/examples/dataframe/deserialize_to_struct.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
//! See `main.rs` for how to run it.
19+
1820
use arrow::array::{AsArray, PrimitiveArray};
1921
use arrow::datatypes::{Float64Type, Int32Type};
2022
use datafusion::common::assert_batches_eq;
@@ -29,8 +31,7 @@ use futures::StreamExt;
2931
/// as [ArrayRef]
3032
///
3133
/// [ArrayRef]: arrow::array::ArrayRef
32-
#[tokio::main]
33-
async fn main() -> Result<()> {
34+
pub async fn deserialize_to_struct() -> Result<()> {
3435
// Run a query that returns two columns of data
3536
let ctx = SessionContext::new();
3637
let testdata = datafusion::test_util::parquet_test_data();
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! # These are core DataFrame API usage
19+
//!
20+
//! These examples demonstrate core DataFrame API usage.
21+
//!
22+
//! ## Usage
23+
//! ```bash
24+
//! cargo run --example dataframe -- [dataframe|deserialize_to_struct]
25+
//! ```
26+
//!
27+
//! Each subcommand runs a corresponding example:
28+
//! - `dataframe` — run a query using a DataFrame API against parquet files, csv files, and in-memory data, including multiple subqueries
29+
//! - `deserialize_to_struct` — convert query results (Arrow ArrayRefs) into Rust structs
30+
31+
mod dataframe;
32+
mod deserialize_to_struct;
33+
34+
use std::str::FromStr;
35+
36+
use datafusion::error::{DataFusionError, Result};
37+
38+
enum ExampleKind {
39+
Dataframe,
40+
DeserializeToStruct,
41+
}
42+
43+
impl AsRef<str> for ExampleKind {
44+
fn as_ref(&self) -> &str {
45+
match self {
46+
Self::Dataframe => "dataframe",
47+
Self::DeserializeToStruct => "deserialize_to_struct",
48+
}
49+
}
50+
}
51+
52+
impl FromStr for ExampleKind {
53+
type Err = DataFusionError;
54+
55+
fn from_str(s: &str) -> Result<Self> {
56+
match s {
57+
"dataframe" => Ok(Self::Dataframe),
58+
"deserialize_to_struct" => Ok(Self::DeserializeToStruct),
59+
_ => Err(DataFusionError::Execution(format!("Unknown example: {s}"))),
60+
}
61+
}
62+
}
63+
64+
impl ExampleKind {
65+
const ALL: [Self; 2] = [Self::Dataframe, Self::DeserializeToStruct];
66+
67+
const EXAMPLE_NAME: &str = "dataframe";
68+
69+
fn variants() -> Vec<&'static str> {
70+
Self::ALL.iter().map(|x| x.as_ref()).collect()
71+
}
72+
}
73+
74+
#[tokio::main]
75+
async fn main() -> Result<()> {
76+
let usage = format!(
77+
"Usage: cargo run --example {} -- [{}]",
78+
ExampleKind::EXAMPLE_NAME,
79+
ExampleKind::variants().join("|")
80+
);
81+
82+
let arg = std::env::args().nth(1).ok_or_else(|| {
83+
eprintln!("{usage}");
84+
DataFusionError::Execution("Missing argument".to_string())
85+
})?;
86+
87+
match arg.parse::<ExampleKind>()? {
88+
ExampleKind::Dataframe => dataframe::dataframe_example().await?,
89+
ExampleKind::DeserializeToStruct => {
90+
deserialize_to_struct::deserialize_to_struct().await?
91+
}
92+
}
93+
94+
Ok(())
95+
}

0 commit comments

Comments
 (0)