Skip to content

Commit 899f46e

Browse files
Omega359alamb
authored andcommitted
Add distinct_on to dataframe api (apache#11012)
* Add distinct_on to dataframe api apache#11011 * cargo fmt * Update datafusion/core/src/dataframe/mod.rs as per reviewer feedback Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent eea4b4c commit 899f46e

File tree

2 files changed

+118
-0
lines changed

2 files changed

+118
-0
lines changed

datafusion/core/src/dataframe/mod.rs

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,38 @@ impl DataFrame {
553553
})
554554
}
555555

556+
/// Return a new `DataFrame` with duplicated rows removed as per the specified expression list
557+
/// according to the provided sorting expressions grouped by the `DISTINCT ON` clause
558+
/// expressions.
559+
///
560+
/// # Example
561+
/// ```
562+
/// # use datafusion::prelude::*;
563+
/// # use datafusion::error::Result;
564+
/// # #[tokio::main]
565+
/// # async fn main() -> Result<()> {
566+
/// let ctx = SessionContext::new();
567+
/// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?
568+
/// // Return a single row (a, b) for each distinct value of a
569+
/// .distinct_on(vec![col("a")], vec![col("a"), col("b")], None)?;
570+
/// # Ok(())
571+
/// # }
572+
/// ```
573+
pub fn distinct_on(
574+
self,
575+
on_expr: Vec<Expr>,
576+
select_expr: Vec<Expr>,
577+
sort_expr: Option<Vec<Expr>>,
578+
) -> Result<DataFrame> {
579+
let plan = LogicalPlanBuilder::from(self.plan)
580+
.distinct_on(on_expr, select_expr, sort_expr)?
581+
.build()?;
582+
Ok(DataFrame {
583+
session_state: self.session_state,
584+
plan,
585+
})
586+
}
587+
556588
/// Return a new `DataFrame` that has statistics for a DataFrame.
557589
///
558590
/// Only summarizes numeric datatypes at the moment and returns nulls for
@@ -2390,6 +2422,91 @@ mod tests {
23902422
Ok(())
23912423
}
23922424

2425+
#[tokio::test]
2426+
async fn test_distinct_on() -> Result<()> {
2427+
let t = test_table().await?;
2428+
let plan = t
2429+
.distinct_on(vec![col("c1")], vec![col("aggregate_test_100.c1")], None)
2430+
.unwrap();
2431+
2432+
let sql_plan =
2433+
create_plan("select distinct on (c1) c1 from aggregate_test_100").await?;
2434+
2435+
assert_same_plan(&plan.plan.clone(), &sql_plan);
2436+
2437+
let df_results = plan.clone().collect().await?;
2438+
2439+
#[rustfmt::skip]
2440+
assert_batches_sorted_eq!(
2441+
["+----+",
2442+
"| c1 |",
2443+
"+----+",
2444+
"| a |",
2445+
"| b |",
2446+
"| c |",
2447+
"| d |",
2448+
"| e |",
2449+
"+----+"],
2450+
&df_results
2451+
);
2452+
2453+
Ok(())
2454+
}
2455+
2456+
#[tokio::test]
2457+
async fn test_distinct_on_sort_by() -> Result<()> {
2458+
let t = test_table().await?;
2459+
let plan = t
2460+
.select(vec![col("c1")])
2461+
.unwrap()
2462+
.distinct_on(
2463+
vec![col("c1")],
2464+
vec![col("c1")],
2465+
Some(vec![col("c1").sort(true, true)]),
2466+
)
2467+
.unwrap()
2468+
.sort(vec![col("c1").sort(true, true)])
2469+
.unwrap();
2470+
2471+
let df_results = plan.clone().collect().await?;
2472+
2473+
#[rustfmt::skip]
2474+
assert_batches_sorted_eq!(
2475+
["+----+",
2476+
"| c1 |",
2477+
"+----+",
2478+
"| a |",
2479+
"| b |",
2480+
"| c |",
2481+
"| d |",
2482+
"| e |",
2483+
"+----+"],
2484+
&df_results
2485+
);
2486+
2487+
Ok(())
2488+
}
2489+
2490+
#[tokio::test]
2491+
async fn test_distinct_on_sort_by_unprojected() -> Result<()> {
2492+
let t = test_table().await?;
2493+
let err = t
2494+
.select(vec![col("c1")])
2495+
.unwrap()
2496+
.distinct_on(
2497+
vec![col("c1")],
2498+
vec![col("c1")],
2499+
Some(vec![col("c1").sort(true, true)]),
2500+
)
2501+
.unwrap()
2502+
// try to sort on some value not present in input to distinct
2503+
.sort(vec![col("c2").sort(true, true)])
2504+
.unwrap_err();
2505+
assert_eq!(err.strip_backtrace(), "Error during planning: For SELECT DISTINCT, ORDER BY expressions c2 must appear in select list");
2506+
2507+
Ok(())
2508+
}
2509+
23932510
#[tokio::test]
23942511
async fn join() -> Result<()> {
23952512
let left = test_table().await?.select_columns(&["c1", "c2"])?;

docs/source/user-guide/dataframe.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ execution. The plan is evaluated (executed) when an action method is invoked, su
6464
| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------ |
6565
| aggregate | Perform an aggregate query with optional grouping expressions. |
6666
| distinct | Filter out duplicate rows. |
67+
| distinct_on | Filter out duplicate rows based on provided expressions. |
6768
| drop_columns | Create a projection with all but the provided column names. |
6869
| except | Calculate the exception of two DataFrames. The two DataFrames must have exactly the same schema |
6970
| filter | Filter a DataFrame to only include rows that match the specified filter expression. |

0 commit comments

Comments
 (0)