Skip to content

Commit 69e5382

Browse files
authored
Improve DataFrame functional tests (#8630)
1 parent 7443f30 commit 69e5382

File tree

1 file changed

+82
-138
lines changed
  • datafusion/core/src/dataframe

1 file changed

+82
-138
lines changed

datafusion/core/src/dataframe/mod.rs

Lines changed: 82 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -1356,15 +1356,30 @@ mod tests {
13561356

13571357
use arrow::array::{self, Int32Array};
13581358
use arrow::datatypes::DataType;
1359-
use datafusion_common::{Constraint, Constraints, ScalarValue};
1359+
use datafusion_common::{Constraint, Constraints};
13601360
use datafusion_expr::{
13611361
avg, cast, count, count_distinct, create_udf, expr, lit, max, min, sum,
1362-
BinaryExpr, BuiltInWindowFunction, Operator, ScalarFunctionImplementation,
1363-
Volatility, WindowFrame, WindowFunction,
1362+
BuiltInWindowFunction, ScalarFunctionImplementation, Volatility, WindowFrame,
1363+
WindowFunction,
13641364
};
13651365
use datafusion_physical_expr::expressions::Column;
13661366
use datafusion_physical_plan::get_plan_string;
13671367

1368+
// Get string representation of the plan
1369+
async fn assert_physical_plan(df: &DataFrame, expected: Vec<&str>) {
1370+
let physical_plan = df
1371+
.clone()
1372+
.create_physical_plan()
1373+
.await
1374+
.expect("Error creating physical plan");
1375+
1376+
let actual = get_plan_string(&physical_plan);
1377+
assert_eq!(
1378+
expected, actual,
1379+
"\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
1380+
);
1381+
}
1382+
13681383
pub fn table_with_constraints() -> Arc<dyn TableProvider> {
13691384
let dual_schema = Arc::new(Schema::new(vec![
13701385
Field::new("id", DataType::Int32, false),
@@ -1587,47 +1602,36 @@ mod tests {
15871602
let config = SessionConfig::new().with_target_partitions(1);
15881603
let ctx = SessionContext::new_with_config(config);
15891604

1590-
let table1 = table_with_constraints();
1591-
let df = ctx.read_table(table1)?;
1592-
let col_id = Expr::Column(datafusion_common::Column {
1593-
relation: None,
1594-
name: "id".to_string(),
1595-
});
1596-
let col_name = Expr::Column(datafusion_common::Column {
1597-
relation: None,
1598-
name: "name".to_string(),
1599-
});
1605+
let df = ctx.read_table(table_with_constraints())?;
16001606

1601-
// group by contains id column
1602-
let group_expr = vec![col_id.clone()];
1607+
// GROUP BY id
1608+
let group_expr = vec![col("id")];
16031609
let aggr_expr = vec![];
16041610
let df = df.aggregate(group_expr, aggr_expr)?;
16051611

1606-
// expr list contains id, name
1607-
let expr_list = vec![col_id, col_name];
1608-
let df = df.select(expr_list)?;
1609-
let physical_plan = df.clone().create_physical_plan().await?;
1610-
let expected = vec![
1611-
"AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name], aggr=[]",
1612-
" MemoryExec: partitions=1, partition_sizes=[1]",
1613-
];
1614-
// Get string representation of the plan
1615-
let actual = get_plan_string(&physical_plan);
1616-
assert_eq!(
1617-
expected, actual,
1618-
"\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
1619-
);
1620-
// Since id and name are functionally dependant, we can use name among expression
1621-
// even if it is not part of the group by expression.
1622-
let df_results = collect(physical_plan, ctx.task_ctx()).await?;
1612+
// Since id and name are functionally dependant, we can use name among
1613+
// expression even if it is not part of the group by expression and can
1614+
// select "name" column even though it wasn't explicitly grouped
1615+
let df = df.select(vec![col("id"), col("name")])?;
1616+
assert_physical_plan(
1617+
&df,
1618+
vec![
1619+
"AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name], aggr=[]",
1620+
" MemoryExec: partitions=1, partition_sizes=[1]",
1621+
],
1622+
)
1623+
.await;
1624+
1625+
let df_results = df.collect().await?;
16231626

16241627
#[rustfmt::skip]
1625-
assert_batches_sorted_eq!(
1626-
["+----+------+",
1628+
assert_batches_sorted_eq!([
1629+
"+----+------+",
16271630
"| id | name |",
16281631
"+----+------+",
16291632
"| 1 | a |",
1630-
"+----+------+",],
1633+
"+----+------+"
1634+
],
16311635
&df_results
16321636
);
16331637

@@ -1640,57 +1644,31 @@ mod tests {
16401644
let config = SessionConfig::new().with_target_partitions(1);
16411645
let ctx = SessionContext::new_with_config(config);
16421646

1643-
let table1 = table_with_constraints();
1644-
let df = ctx.read_table(table1)?;
1645-
let col_id = Expr::Column(datafusion_common::Column {
1646-
relation: None,
1647-
name: "id".to_string(),
1648-
});
1649-
let col_name = Expr::Column(datafusion_common::Column {
1650-
relation: None,
1651-
name: "name".to_string(),
1652-
});
1647+
let df = ctx.read_table(table_with_constraints())?;
16531648

1654-
// group by contains id column
1655-
let group_expr = vec![col_id.clone()];
1649+
// GROUP BY id
1650+
let group_expr = vec![col("id")];
16561651
let aggr_expr = vec![];
16571652
let df = df.aggregate(group_expr, aggr_expr)?;
16581653

1659-
let condition1 = Expr::BinaryExpr(BinaryExpr::new(
1660-
Box::new(col_id.clone()),
1661-
Operator::Eq,
1662-
Box::new(Expr::Literal(ScalarValue::Int32(Some(1)))),
1663-
));
1664-
let condition2 = Expr::BinaryExpr(BinaryExpr::new(
1665-
Box::new(col_name),
1666-
Operator::Eq,
1667-
Box::new(Expr::Literal(ScalarValue::Utf8(Some("a".to_string())))),
1668-
));
1669-
// Predicate refers to id, and name fields
1670-
let predicate = Expr::BinaryExpr(BinaryExpr::new(
1671-
Box::new(condition1),
1672-
Operator::And,
1673-
Box::new(condition2),
1674-
));
1654+
// Predicate refers to id, and name fields:
1655+
// id = 1 AND name = 'a'
1656+
let predicate = col("id").eq(lit(1i32)).and(col("name").eq(lit("a")));
16751657
let df = df.filter(predicate)?;
1676-
let physical_plan = df.clone().create_physical_plan().await?;
1677-
1678-
let expected = vec![
1658+
assert_physical_plan(
1659+
&df,
1660+
vec![
16791661
"CoalesceBatchesExec: target_batch_size=8192",
16801662
" FilterExec: id@0 = 1 AND name@1 = a",
16811663
" AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name], aggr=[]",
16821664
" MemoryExec: partitions=1, partition_sizes=[1]",
1683-
];
1684-
// Get string representation of the plan
1685-
let actual = get_plan_string(&physical_plan);
1686-
assert_eq!(
1687-
expected, actual,
1688-
"\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
1689-
);
1665+
],
1666+
)
1667+
.await;
16901668

16911669
// Since id and name are functionally dependant, we can use name among expression
16921670
// even if it is not part of the group by expression.
1693-
let df_results = collect(physical_plan, ctx.task_ctx()).await?;
1671+
let df_results = df.collect().await?;
16941672

16951673
#[rustfmt::skip]
16961674
assert_batches_sorted_eq!(
@@ -1711,53 +1689,35 @@ mod tests {
17111689
let config = SessionConfig::new().with_target_partitions(1);
17121690
let ctx = SessionContext::new_with_config(config);
17131691

1714-
let table1 = table_with_constraints();
1715-
let df = ctx.read_table(table1)?;
1716-
let col_id = Expr::Column(datafusion_common::Column {
1717-
relation: None,
1718-
name: "id".to_string(),
1719-
});
1720-
let col_name = Expr::Column(datafusion_common::Column {
1721-
relation: None,
1722-
name: "name".to_string(),
1723-
});
1692+
let df = ctx.read_table(table_with_constraints())?;
17241693

1725-
// group by contains id column
1726-
let group_expr = vec![col_id.clone()];
1694+
// GROUP BY id
1695+
let group_expr = vec![col("id")];
17271696
let aggr_expr = vec![];
17281697
// group by id,
17291698
let df = df.aggregate(group_expr, aggr_expr)?;
17301699

1731-
let condition1 = Expr::BinaryExpr(BinaryExpr::new(
1732-
Box::new(col_id.clone()),
1733-
Operator::Eq,
1734-
Box::new(Expr::Literal(ScalarValue::Int32(Some(1)))),
1735-
));
17361700
// Predicate refers to id field
1737-
let predicate = condition1;
1738-
// id=0
1701+
// id = 1
1702+
let predicate = col("id").eq(lit(1i32));
17391703
let df = df.filter(predicate)?;
17401704
// Select expression refers to id, and name columns.
17411705
// id, name
1742-
let df = df.select(vec![col_id.clone(), col_name.clone()])?;
1743-
let physical_plan = df.clone().create_physical_plan().await?;
1744-
1745-
let expected = vec![
1706+
let df = df.select(vec![col("id"), col("name")])?;
1707+
assert_physical_plan(
1708+
&df,
1709+
vec![
17461710
"CoalesceBatchesExec: target_batch_size=8192",
17471711
" FilterExec: id@0 = 1",
17481712
" AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name], aggr=[]",
17491713
" MemoryExec: partitions=1, partition_sizes=[1]",
1750-
];
1751-
// Get string representation of the plan
1752-
let actual = get_plan_string(&physical_plan);
1753-
assert_eq!(
1754-
expected, actual,
1755-
"\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
1756-
);
1714+
],
1715+
)
1716+
.await;
17571717

17581718
// Since id and name are functionally dependant, we can use name among expression
17591719
// even if it is not part of the group by expression.
1760-
let df_results = collect(physical_plan, ctx.task_ctx()).await?;
1720+
let df_results = df.collect().await?;
17611721

17621722
#[rustfmt::skip]
17631723
assert_batches_sorted_eq!(
@@ -1778,51 +1738,35 @@ mod tests {
17781738
let config = SessionConfig::new().with_target_partitions(1);
17791739
let ctx = SessionContext::new_with_config(config);
17801740

1781-
let table1 = table_with_constraints();
1782-
let df = ctx.read_table(table1)?;
1783-
let col_id = Expr::Column(datafusion_common::Column {
1784-
relation: None,
1785-
name: "id".to_string(),
1786-
});
1741+
let df = ctx.read_table(table_with_constraints())?;
17871742

1788-
// group by contains id column
1789-
let group_expr = vec![col_id.clone()];
1743+
// GROUP BY id
1744+
let group_expr = vec![col("id")];
17901745
let aggr_expr = vec![];
1791-
// group by id,
17921746
let df = df.aggregate(group_expr, aggr_expr)?;
17931747

1794-
let condition1 = Expr::BinaryExpr(BinaryExpr::new(
1795-
Box::new(col_id.clone()),
1796-
Operator::Eq,
1797-
Box::new(Expr::Literal(ScalarValue::Int32(Some(1)))),
1798-
));
17991748
// Predicate refers to id field
1800-
let predicate = condition1;
1801-
// id=1
1749+
// id = 1
1750+
let predicate = col("id").eq(lit(1i32));
18021751
let df = df.filter(predicate)?;
18031752
// Select expression refers to id column.
18041753
// id
1805-
let df = df.select(vec![col_id.clone()])?;
1806-
let physical_plan = df.clone().create_physical_plan().await?;
1754+
let df = df.select(vec![col("id")])?;
18071755

18081756
// In this case aggregate shouldn't be expanded, since these
18091757
// columns are not used.
1810-
let expected = vec![
1811-
"CoalesceBatchesExec: target_batch_size=8192",
1812-
" FilterExec: id@0 = 1",
1813-
" AggregateExec: mode=Single, gby=[id@0 as id], aggr=[]",
1814-
" MemoryExec: partitions=1, partition_sizes=[1]",
1815-
];
1816-
// Get string representation of the plan
1817-
let actual = get_plan_string(&physical_plan);
1818-
assert_eq!(
1819-
expected, actual,
1820-
"\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
1821-
);
1758+
assert_physical_plan(
1759+
&df,
1760+
vec![
1761+
"CoalesceBatchesExec: target_batch_size=8192",
1762+
" FilterExec: id@0 = 1",
1763+
" AggregateExec: mode=Single, gby=[id@0 as id], aggr=[]",
1764+
" MemoryExec: partitions=1, partition_sizes=[1]",
1765+
],
1766+
)
1767+
.await;
18221768

1823-
// Since id and name are functionally dependant, we can use name among expression
1824-
// even if it is not part of the group by expression.
1825-
let df_results = collect(physical_plan, ctx.task_ctx()).await?;
1769+
let df_results = df.collect().await?;
18261770

18271771
#[rustfmt::skip]
18281772
assert_batches_sorted_eq!(

0 commit comments

Comments
 (0)