Skip to content

Commit 2b15ad1

Browse files
authored
consolidate dataframe_subquery.rs into dataframe.rs (#13950)
1 parent 9b5995f commit 2b15ad1

File tree

3 files changed

+92
-119
lines changed

3 files changed

+92
-119
lines changed

datafusion-examples/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ cargo run --example dataframe
5757
- [`custom_datasource.rs`](examples/custom_datasource.rs): Run queries against a custom datasource (TableProvider)
5858
- [`custom_file_format.rs`](examples/custom_file_format.rs): Write data to a custom file format
5959
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3 and writing back to s3
60-
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data. Also demonstrates the various methods to write out a DataFrame to a table, parquet file, csv file, and json file.
60+
- [`dataframe.rs`](examples/dataframe.rs): Run a query using a DataFrame API against parquet files, csv files, and in-memory data, including multiple subqueries. Also demonstrates the various methods to write out a DataFrame to a table, parquet file, csv file, and json file.
6161
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde
6262
- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify, analyze and coerce `Expr`s
6363
- [`file_stream_provider.rs`](examples/file_stream_provider.rs): Run a query on `FileStreamProvider` which implements `StreamProvider` for reading and writing to arbitrary stream sources / sinks.

datafusion-examples/examples/dataframe.rs

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@ use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
1919
use datafusion::arrow::datatypes::{DataType, Field, Schema};
2020
use datafusion::dataframe::DataFrameWriteOptions;
2121
use datafusion::error::Result;
22+
use datafusion::functions_aggregate::average::avg;
23+
use datafusion::functions_aggregate::min_max::max;
2224
use datafusion::prelude::*;
2325
use datafusion_common::config::CsvOptions;
2426
use datafusion_common::parsers::CompressionTypeVariant;
2527
use datafusion_common::DataFusionError;
28+
use datafusion_common::ScalarValue;
2629
use std::fs::File;
2730
use std::io::Write;
2831
use std::sync::Arc;
@@ -44,7 +47,14 @@ use tempfile::tempdir;
4447
///
4548
/// * [write_out]: write out a DataFrame to a table, parquet file, csv file, or json file
4649
///
50+
/// # Executing subqueries
51+
///
52+
/// * [where_scalar_subquery]: execute a scalar subquery
53+
/// * [where_in_subquery]: execute a subquery with an IN clause
54+
/// * [where_exist_subquery]: execute a subquery with an EXISTS clause
55+
///
4756
/// # Querying data
57+
///
4858
/// * [query_to_date]: execute queries against parquet files
4959
#[tokio::main]
5060
async fn main() -> Result<()> {
@@ -55,6 +65,11 @@ async fn main() -> Result<()> {
5565
read_memory(&ctx).await?;
5666
write_out(&ctx).await?;
5767
query_to_date().await?;
68+
register_aggregate_test_data("t1", &ctx).await?;
69+
register_aggregate_test_data("t2", &ctx).await?;
70+
where_scalar_subquery(&ctx).await?;
71+
where_in_subquery(&ctx).await?;
72+
where_exist_subquery(&ctx).await?;
5873
Ok(())
5974
}
6075

@@ -250,3 +265,79 @@ async fn query_to_date() -> Result<()> {
250265

251266
Ok(())
252267
}
268+
269+
/// Use the DataFrame API to execute the following subquery:
270+
/// select c1,c2 from t1 where (select avg(t2.c2) from t2 where t1.c1 = t2.c1)>0 limit 3;
271+
async fn where_scalar_subquery(ctx: &SessionContext) -> Result<()> {
272+
ctx.table("t1")
273+
.await?
274+
.filter(
275+
scalar_subquery(Arc::new(
276+
ctx.table("t2")
277+
.await?
278+
.filter(out_ref_col(DataType::Utf8, "t1.c1").eq(col("t2.c1")))?
279+
.aggregate(vec![], vec![avg(col("t2.c2"))])?
280+
.select(vec![avg(col("t2.c2"))])?
281+
.into_unoptimized_plan(),
282+
))
283+
.gt(lit(0u8)),
284+
)?
285+
.select(vec![col("t1.c1"), col("t1.c2")])?
286+
.limit(0, Some(3))?
287+
.show()
288+
.await?;
289+
Ok(())
290+
}
291+
292+
/// Use the DataFrame API to execute the following subquery:
293+
/// select t1.c1, t1.c2 from t1 where t1.c2 in (select max(t2.c2) from t2 where t2.c1 > 0 ) limit 3;
294+
async fn where_in_subquery(ctx: &SessionContext) -> Result<()> {
295+
ctx.table("t1")
296+
.await?
297+
.filter(in_subquery(
298+
col("t1.c2"),
299+
Arc::new(
300+
ctx.table("t2")
301+
.await?
302+
.filter(col("t2.c1").gt(lit(ScalarValue::UInt8(Some(0)))))?
303+
.aggregate(vec![], vec![max(col("t2.c2"))])?
304+
.select(vec![max(col("t2.c2"))])?
305+
.into_unoptimized_plan(),
306+
),
307+
))?
308+
.select(vec![col("t1.c1"), col("t1.c2")])?
309+
.limit(0, Some(3))?
310+
.show()
311+
.await?;
312+
Ok(())
313+
}
314+
315+
/// Use the DataFrame API to execute the following subquery:
316+
/// select t1.c1, t1.c2 from t1 where exists (select t2.c2 from t2 where t1.c1 = t2.c1) limit 3;
317+
async fn where_exist_subquery(ctx: &SessionContext) -> Result<()> {
318+
ctx.table("t1")
319+
.await?
320+
.filter(exists(Arc::new(
321+
ctx.table("t2")
322+
.await?
323+
.filter(out_ref_col(DataType::Utf8, "t1.c1").eq(col("t2.c1")))?
324+
.select(vec![col("t2.c2")])?
325+
.into_unoptimized_plan(),
326+
)))?
327+
.select(vec![col("t1.c1"), col("t1.c2")])?
328+
.limit(0, Some(3))?
329+
.show()
330+
.await?;
331+
Ok(())
332+
}
333+
334+
async fn register_aggregate_test_data(name: &str, ctx: &SessionContext) -> Result<()> {
335+
let testdata = datafusion::test_util::arrow_test_data();
336+
ctx.register_csv(
337+
name,
338+
&format!("{testdata}/csv/aggregate_test_100.csv"),
339+
CsvReadOptions::default(),
340+
)
341+
.await?;
342+
Ok(())
343+
}

datafusion-examples/examples/dataframe_subquery.rs

Lines changed: 0 additions & 118 deletions
This file was deleted.

0 commit comments

Comments
 (0)