Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LogicalPlanStats to logical plan nodes #13618

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add LogicalPlanStats
  • Loading branch information
peter-toth committed Dec 1, 2024
commit 4b3b35bfd03ef58c2d4b9ab1748bef31b6fea845
16 changes: 8 additions & 8 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,9 @@ impl AdjustedPrintOptions {
// all rows
if matches!(
plan,
LogicalPlan::Explain(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Analyze(_)
LogicalPlan::Explain(_, _)
| LogicalPlan::DescribeTable(_, _)
| LogicalPlan::Analyze(_, _)
) {
self.inner.maxrows = MaxRows::Unlimited;
}
Expand Down Expand Up @@ -311,7 +311,7 @@ async fn create_plan(
// Note that cmd is a mutable reference so that create_external_table function can remove all
// datafusion-cli specific options before passing through to datafusion. Otherwise, datafusion
// will raise Configuration errors.
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd), _) = &plan {
// To support custom formats, treat error as None
let format = config_file_type_from_str(&cmd.file_type);
register_object_store_and_config_extensions(
Expand All @@ -323,7 +323,7 @@ async fn create_plan(
.await?;
}

if let LogicalPlan::Copy(copy_to) = &mut plan {
if let LogicalPlan::Copy(copy_to, _) = &mut plan {
let format = config_file_type_from_str(&copy_to.file_type.get_ext());

register_object_store_and_config_extensions(
Expand Down Expand Up @@ -412,7 +412,7 @@ mod tests {
let ctx = SessionContext::new();
let plan = ctx.state().create_logical_plan(sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd), _) = &plan {
let format = config_file_type_from_str(&cmd.file_type);
register_object_store_and_config_extensions(
&ctx,
Expand All @@ -438,7 +438,7 @@ mod tests {

let plan = ctx.state().create_logical_plan(sql).await?;

if let LogicalPlan::Copy(cmd) = &plan {
if let LogicalPlan::Copy(cmd, _) = &plan {
let format = config_file_type_from_str(&cmd.file_type.get_ext());
register_object_store_and_config_extensions(
&ctx,
Expand Down Expand Up @@ -492,7 +492,7 @@ mod tests {
for statement in statements {
//Should not fail
let mut plan = create_plan(&ctx, statement).await?;
if let LogicalPlan::Copy(copy_to) = &mut plan {
if let LogicalPlan::Copy(copy_to, _) = &mut plan {
assert_eq!(copy_to.output_url, location);
assert_eq!(copy_to.file_type.get_ext(), "parquet".to_string());
ctx.runtime_env()
Expand Down
4 changes: 2 additions & 2 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,8 @@ pub struct ParquetMetadataFunc {}
impl TableFunctionImpl for ParquetMetadataFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
let filename = match exprs.first() {
Some(Expr::Literal(ScalarValue::Utf8(Some(s)))) => s, // single quote: parquet_metadata('x.parquet')
Some(Expr::Column(Column { name, .. })) => name, // double quote: parquet_metadata("x.parquet")
Some(Expr::Literal(ScalarValue::Utf8(Some(s)), _)) => s, // single quote: parquet_metadata('x.parquet')
Some(Expr::Column(Column { name, .. }, _)) => name, // double quote: parquet_metadata("x.parquet")
_ => {
return plan_err!(
"parquet_metadata requires string argument as its input"
Expand Down
10 changes: 5 additions & 5 deletions datafusion-cli/src/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ mod tests {
let ctx = SessionContext::new();
let mut plan = ctx.state().create_logical_plan(&sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd), _) = &mut plan {
ctx.register_table_options_extension_from_scheme(scheme);
let mut table_options = ctx.state().default_table_options();
table_options.alter_with_string_hash_map(&cmd.options)?;
Expand Down Expand Up @@ -538,7 +538,7 @@ mod tests {
let ctx = SessionContext::new();
let mut plan = ctx.state().create_logical_plan(&sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd), _) = &mut plan {
ctx.register_table_options_extension_from_scheme(scheme);
let mut table_options = ctx.state().default_table_options();
table_options.alter_with_string_hash_map(&cmd.options)?;
Expand All @@ -564,7 +564,7 @@ mod tests {

let mut plan = ctx.state().create_logical_plan(&sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd), _) = &mut plan {
ctx.register_table_options_extension_from_scheme(scheme);
let mut table_options = ctx.state().default_table_options();
table_options.alter_with_string_hash_map(&cmd.options)?;
Expand Down Expand Up @@ -592,7 +592,7 @@ mod tests {
let ctx = SessionContext::new();
let mut plan = ctx.state().create_logical_plan(&sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd), _) = &mut plan {
ctx.register_table_options_extension_from_scheme(scheme);
let mut table_options = ctx.state().default_table_options();
table_options.alter_with_string_hash_map(&cmd.options)?;
Expand Down Expand Up @@ -629,7 +629,7 @@ mod tests {
let ctx = SessionContext::new();
let mut plan = ctx.state().create_logical_plan(&sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut plan {
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd), _) = &mut plan {
ctx.register_table_options_extension_from_scheme(scheme);
let mut table_options = ctx.state().default_table_options();
table_options.alter_with_string_hash_map(&cmd.options)?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/analyzer_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl AnalyzerRule for RowLevelAccessControl {
}

fn is_employee_table_scan(plan: &LogicalPlan) -> bool {
if let LogicalPlan::TableScan(scan) = plan {
if let LogicalPlan::TableScan(scan, _) = plan {
scan.table_name.table() == "employee"
} else {
false
Expand Down
12 changes: 6 additions & 6 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ async fn main() -> Result<()> {
let expr = col("a") + lit(5);

// The same same expression can be created directly, with much more code:
let expr2 = Expr::BinaryExpr(BinaryExpr::new(
let expr2 = Expr::binary_expr(BinaryExpr::new(
Box::new(col("a")),
Operator::Plus,
Box::new(Expr::Literal(ScalarValue::Int32(Some(5)))),
Box::new(Expr::literal(ScalarValue::Int32(Some(5)))),
));
assert_eq!(expr, expr2);

Expand Down Expand Up @@ -396,20 +396,20 @@ fn type_coercion_demo() -> Result<()> {
let coerced_expr = expr
.transform(|e| {
// Only type coerces binary expressions.
let Expr::BinaryExpr(e) = e else {
let Expr::BinaryExpr(e, _) = e else {
return Ok(Transformed::no(e));
};
if let Expr::Column(ref col_expr) = *e.left {
if let Expr::Column(ref col_expr, _) = *e.left {
let field = df_schema.field_with_name(None, col_expr.name())?;
let cast_to_type = field.data_type();
let coerced_right = e.right.cast_to(cast_to_type, &df_schema)?;
Ok(Transformed::yes(Expr::BinaryExpr(BinaryExpr::new(
Ok(Transformed::yes(Expr::binary_expr(BinaryExpr::new(
e.left,
e.op,
Box::new(coerced_right),
))))
} else {
Ok(Transformed::no(Expr::BinaryExpr(e)))
Ok(Transformed::no(Expr::binary_expr(e)))
}
})?
.data;
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/function_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ impl ScalarFunctionWrapper {
fn replacement(expr: &Expr, args: &[Expr]) -> Result<Expr> {
let result = expr.clone().transform(|e| {
let r = match e {
Expr::Placeholder(placeholder) => {
Expr::Placeholder(placeholder, _) => {
let placeholder_position =
Self::parse_placeholder_identifier(&placeholder.id)?;
if placeholder_position < args.len() {
Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/examples/optimizer_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl MyOptimizerRule {
expr.transform_up(|expr| {
// Closure called for each sub tree
match expr {
Expr::BinaryExpr(binary_expr) if is_binary_eq(&binary_expr) => {
Expr::BinaryExpr(binary_expr, _) if is_binary_eq(&binary_expr) => {
// destruture the expression
let BinaryExpr { left, op: _, right } = binary_expr;
// rewrite to `my_eq(left, right)`
Expand All @@ -171,7 +171,7 @@ fn is_binary_eq(binary_expr: &BinaryExpr) -> bool {

/// Return true if the expression is a literal or column reference
fn is_lit_or_col(expr: &Expr) -> bool {
matches!(expr, Expr::Column(_) | Expr::Literal(_))
matches!(expr, Expr::Column(_, _) | Expr::Literal(_, _))
}

/// A simple user defined filter function
Expand Down
5 changes: 3 additions & 2 deletions datafusion-examples/examples/simple_udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ struct LocalCsvTableFunc {}

impl TableFunctionImpl for LocalCsvTableFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
let Some(Expr::Literal(ScalarValue::Utf8(Some(ref path)))) = exprs.first() else {
let Some(Expr::Literal(ScalarValue::Utf8(Some(ref path)), _)) = exprs.first()
else {
return plan_err!("read_csv requires at least one string argument");
};

Expand All @@ -145,7 +146,7 @@ impl TableFunctionImpl for LocalCsvTableFunc {
let info = SimplifyContext::new(&execution_props);
let expr = ExprSimplifier::new(info).simplify(expr.clone())?;

if let Expr::Literal(ScalarValue::Int64(Some(limit))) = expr {
if let Expr::Literal(ScalarValue::Int64(Some(limit)), _) = expr {
Ok(limit as usize)
} else {
plan_err!("Limit must be an integer")
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/simplify_udaf_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl AggregateUDFImpl for BetterAvgUdaf {
// as an example for this functionality we replace UDF function
// with build-in aggregate function to illustrate the use
let simplify = |aggregate_function: AggregateFunction, _: &dyn SimplifyInfo| {
Ok(Expr::AggregateFunction(AggregateFunction::new_udf(
Ok(Expr::aggregate_function(AggregateFunction::new_udf(
avg_udaf(),
// yes it is the same Avg, `BetterAvgUdaf` was just a
// marketing pitch :)
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/simplify_udwf_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl WindowUDFImpl for SimplifySmoothItUdf {
/// this function will simplify `SimplifySmoothItUdf` to `SmoothItUdf`.
fn simplify(&self) -> Option<WindowFunctionSimplification> {
let simplify = |window_function: WindowFunction, _: &dyn SimplifyInfo| {
Ok(Expr::WindowFunction(WindowFunction {
Ok(Expr::window_function(WindowFunction {
fun: datafusion_expr::WindowFunctionDefinition::AggregateUDF(avg_udaf()),
args: window_function.args,
partition_by: window_function.partition_by,
Expand Down
8 changes: 4 additions & 4 deletions datafusion-examples/examples/sql_analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fn total_join_count(plan: &LogicalPlan) -> usize {
// We can use the TreeNode API to walk over a LogicalPlan.
plan.apply(|node| {
// if we encounter a join we update the running count
if matches!(node, LogicalPlan::Join(_)) {
if matches!(node, LogicalPlan::Join(_, _)) {
total += 1;
}
Ok(TreeNodeRecursion::Continue)
Expand Down Expand Up @@ -89,7 +89,7 @@ fn count_trees(plan: &LogicalPlan) -> (usize, Vec<usize>) {
while let Some(node) = to_visit.pop() {
// if we encounter a join, we know were at the root of the tree
// count this tree and recurse on it's inputs
if matches!(node, LogicalPlan::Join(_)) {
if matches!(node, LogicalPlan::Join(_, _)) {
let (group_count, inputs) = count_tree(node);
total += group_count;
groups.push(group_count);
Expand Down Expand Up @@ -146,12 +146,12 @@ fn count_tree(join: &LogicalPlan) -> (usize, Vec<&LogicalPlan>) {
// / \
// B C
// we can continue the recursion in this case
if let LogicalPlan::Projection(_) = node {
if let LogicalPlan::Projection(_, _) = node {
return Ok(TreeNodeRecursion::Continue);
}

// any join we count
if matches!(node, LogicalPlan::Join(_)) {
if matches!(node, LogicalPlan::Join(_, _)) {
total += 1;
Ok(TreeNodeRecursion::Continue)
} else {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ pub trait TableProvider: Debug + Sync + Send {
/// let support: Vec<_> = filters.iter().map(|expr| {
/// match expr {
/// // This example only supports a between expr with a single column named "c1".
/// Expr::Between(between_expr) => {
/// Expr::Between(between_expr, _) => {
/// between_expr.expr
/// .try_as_col()
/// .map(|column| {
Expand Down
5 changes: 5 additions & 0 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,11 @@ impl<T> Transformed<T> {
Self::new(data, false, TreeNodeRecursion::Continue)
}

/// Wrapper for unchanged data with [`TreeNodeRecursion::Jump`] statement.
pub fn jump(data: T) -> Self {
Self::new(data, false, TreeNodeRecursion::Jump)
}

/// Applies an infallible `f` to the data of this [`Transformed`] object,
/// without modifying the `transformed` flag.
pub fn update_data<U, F: FnOnce(T) -> U>(self, f: F) -> Transformed<U> {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/benches/map_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ fn criterion_benchmark(c: &mut Criterion) {
let mut value_buffer = Vec::new();

for i in 0..1000 {
key_buffer.push(Expr::Literal(ScalarValue::Utf8(Some(keys[i].clone()))));
value_buffer.push(Expr::Literal(ScalarValue::Int32(Some(values[i]))));
key_buffer.push(Expr::literal(ScalarValue::Utf8(Some(keys[i].clone()))));
value_buffer.push(Expr::literal(ScalarValue::Int32(Some(values[i]))));
}
c.bench_function("map_1000_1", |b| {
b.iter(|| {
Expand Down
18 changes: 9 additions & 9 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl DataFrame {
.collect::<Result<Vec<_>>>()?;
let expr: Vec<Expr> = fields
.into_iter()
.map(|(qualifier, field)| Expr::Column(Column::from((qualifier, field))))
.map(|(qualifier, field)| Expr::column(Column::from((qualifier, field))))
.collect();
self.select(expr)
}
Expand Down Expand Up @@ -369,7 +369,7 @@ impl DataFrame {
.enumerate()
.map(|(idx, _)| self.plan.schema().qualified_field(idx))
.filter(|(qualifier, f)| !fields_to_drop.contains(&(*qualifier, f)))
.map(|(qualifier, field)| Expr::Column(Column::from((qualifier, field))))
.map(|(qualifier, field)| Expr::column(Column::from((qualifier, field))))
.collect();
self.select(expr)
}
Expand Down Expand Up @@ -513,7 +513,7 @@ impl DataFrame {
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>,
) -> Result<DataFrame> {
let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_)]);
let is_grouping_set = matches!(group_expr.as_slice(), [Expr::GroupingSet(_, _)]);
let aggr_expr_len = aggr_expr.len();
let plan = LogicalPlanBuilder::from(self.plan)
.aggregate(group_expr, aggr_expr)?
Expand All @@ -527,7 +527,7 @@ impl DataFrame {
.into_iter()
.enumerate()
.filter(|(idx, _)| *idx != grouping_id_pos)
.map(|(_, column)| Expr::Column(column))
.map(|(_, column)| Expr::column(column))
.collect::<Vec<_>>();
LogicalPlanBuilder::from(plan).project(exprs)?.build()?
} else {
Expand Down Expand Up @@ -1164,7 +1164,7 @@ impl DataFrame {
/// ```
pub async fn count(self) -> Result<usize> {
let rows = self
.aggregate(vec![], vec![count(Expr::Literal(COUNT_STAR_EXPANSION))])?
.aggregate(vec![], vec![count(Expr::literal(COUNT_STAR_EXPANSION))])?
.collect()
.await?;
let len = *rows
Expand Down Expand Up @@ -1403,7 +1403,7 @@ impl DataFrame {
/// # }
/// ```
pub fn explain(self, verbose: bool, analyze: bool) -> Result<DataFrame> {
if matches!(self.plan, LogicalPlan::Explain(_)) {
if matches!(self.plan, LogicalPlan::Explain(_, _)) {
return plan_err!("Nested EXPLAINs are not supported");
}
let plan = LogicalPlanBuilder::from(self.plan)
Expand Down Expand Up @@ -2175,7 +2175,7 @@ mod tests {
async fn select_with_window_exprs() -> Result<()> {
// build plan using Table API
let t = test_table().await?;
let first_row = Expr::WindowFunction(WindowFunction::new(
let first_row = Expr::window_function(WindowFunction::new(
WindowFunctionDefinition::WindowUDF(first_value_udwf()),
vec![col("aggregate_test_100.c1")],
))
Expand Down Expand Up @@ -2741,7 +2741,7 @@ mod tests {
vec![col("c3")],
);

Expr::WindowFunction(w)
Expr::window_function(w)
.null_treatment(NullTreatment::IgnoreNulls)
.order_by(vec![col("c2").sort(true, true), col("c3").sort(true, true)])
.window_frame(WindowFrame::new_bounds(
Expand Down Expand Up @@ -3007,7 +3007,7 @@ mod tests {
let join = left.clone().join_on(
right.clone(),
JoinType::Inner,
Some(Expr::Literal(ScalarValue::Null)),
Some(Expr::literal(ScalarValue::Null)),
)?;
let expected_plan = "EmptyRelation";
assert_eq!(expected_plan, format!("{}", join.into_optimized_plan()?));
Expand Down
Loading