From 2172d3fe5bc67650af89d1decf9abeca274e70b7 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Fri, 10 Mar 2023 16:42:40 -0500 Subject: [PATCH] Explain bindings (#264) * Introduce to_variant trait function to LogicalNode and create Explain LogicalNode bindings * Cargo fmt * Add missing classes to list of exports so test_imports will pass --- datafusion/__init__.py | 2 + src/errors.rs | 4 ++ src/expr.rs | 9 +-- src/expr/aggregate.rs | 6 +- src/expr/analyze.rs | 6 +- src/expr/cross_join.rs | 6 +- src/expr/empty_relation.rs | 15 ++++- src/expr/explain.rs | 110 +++++++++++++++++++++++++++++++++++++ src/expr/filter.rs | 8 ++- src/expr/join.rs | 20 ++++--- src/expr/limit.rs | 10 +++- src/expr/logical_node.rs | 4 ++ src/expr/projection.rs | 4 ++ src/expr/sort.rs | 4 ++ src/expr/table_scan.rs | 10 +++- src/expr/union.rs | 8 ++- src/sql/logical.rs | 24 ++++---- 17 files changed, 214 insertions(+), 36 deletions(-) create mode 100644 src/expr/explain.rs diff --git a/datafusion/__init__.py b/datafusion/__init__.py index a7878e1b..4956d9d3 100644 --- a/datafusion/__init__.py +++ b/datafusion/__init__.py @@ -76,6 +76,7 @@ Cast, TryCast, Between, + Explain, ) __version__ = importlib_metadata.version(__name__) @@ -127,6 +128,7 @@ "Cast", "TryCast", "Between", + "Explain", ] diff --git a/src/errors.rs b/src/errors.rs index c9abf06b..e739fe31 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -82,3 +82,7 @@ pub fn py_runtime_err(e: impl Debug) -> PyErr { pub fn py_datafusion_err(e: impl Debug) -> PyErr { PyErr::new::(format!("{e:?}")) } + +pub fn py_unsupported_variant_err(e: impl Debug) -> PyErr { + PyErr::new::(format!("{e:?}")) +} diff --git a/src/expr.rs b/src/expr.rs index cf1bc5d7..33f36f59 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -50,6 +50,7 @@ pub mod column; pub mod cross_join; pub mod empty_relation; pub mod exists; +pub mod explain; pub mod filter; pub mod grouping_set; pub mod in_list; @@ -260,10 +261,7 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; - // operators - m.add_class::()?; - m.add_class::()?; - m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; @@ -274,5 +272,8 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/src/expr/aggregate.rs b/src/expr/aggregate.rs index 0449d16b..c3de9673 100644 --- a/src/expr/aggregate.rs +++ b/src/expr/aggregate.rs @@ -20,8 +20,8 @@ use datafusion_expr::logical_plan::Aggregate; use pyo3::prelude::*; use std::fmt::{self, Display, Formatter}; +use super::logical_node::LogicalNode; use crate::common::df_schema::PyDFSchema; -use crate::expr::logical_node::LogicalNode; use crate::expr::PyExpr; use crate::sql::logical::PyLogicalPlan; @@ -103,4 +103,8 @@ impl LogicalNode for PyAggregate { fn inputs(&self) -> Vec { vec![PyLogicalPlan::from((*self.aggregate.input).clone())] } + + fn to_variant(&self, py: Python) -> PyResult { + Ok(self.clone().into_py(py)) + } } diff --git a/src/expr/analyze.rs b/src/expr/analyze.rs index 5e195f7d..bbec3a80 100644 --- a/src/expr/analyze.rs +++ b/src/expr/analyze.rs @@ -19,8 +19,8 @@ use datafusion_expr::logical_plan::Analyze; use pyo3::prelude::*; use std::fmt::{self, Display, Formatter}; +use super::logical_node::LogicalNode; use crate::common::df_schema::PyDFSchema; -use crate::expr::logical_node::LogicalNode; use crate::sql::logical::PyLogicalPlan; #[pyclass(name = "Analyze", module = "datafusion.expr", subclass)] @@ -77,4 +77,8 @@ impl LogicalNode for PyAnalyze { fn inputs(&self) -> Vec { vec![PyLogicalPlan::from((*self.analyze.input).clone())] } + + fn to_variant(&self, py: Python) -> PyResult { + Ok(self.clone().into_py(py)) + } } diff --git a/src/expr/cross_join.rs b/src/expr/cross_join.rs index 4f5952c2..68793f24 100644 --- a/src/expr/cross_join.rs +++ b/src/expr/cross_join.rs @@ -19,8 +19,8 @@ use datafusion_expr::logical_plan::CrossJoin; use pyo3::prelude::*; use std::fmt::{self, Display, Formatter}; +use super::logical_node::LogicalNode; use crate::common::df_schema::PyDFSchema; -use crate::expr::logical_node::LogicalNode; use crate::sql::logical::PyLogicalPlan; #[pyclass(name = "CrossJoin", module = "datafusion.expr", subclass)] @@ -87,4 +87,8 @@ impl LogicalNode for PyCrossJoin { PyLogicalPlan::from((*self.cross_join.right).clone()), ] } + + fn to_variant(&self, py: Python) -> PyResult { + Ok(self.clone().into_py(py)) + } } diff --git a/src/expr/empty_relation.rs b/src/expr/empty_relation.rs index 8b2621da..0bc222e5 100644 --- a/src/expr/empty_relation.rs +++ b/src/expr/empty_relation.rs @@ -15,11 +15,13 @@ // specific language governing permissions and limitations // under the License. -use crate::common::df_schema::PyDFSchema; +use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan}; use datafusion_expr::EmptyRelation; use pyo3::prelude::*; use std::fmt::{self, Display, Formatter}; +use super::logical_node::LogicalNode; + #[pyclass(name = "EmptyRelation", module = "datafusion.expr", subclass)] #[derive(Clone)] pub struct PyEmptyRelation { @@ -70,3 +72,14 @@ impl PyEmptyRelation { Ok("EmptyRelation".to_string()) } } + +impl LogicalNode for PyEmptyRelation { + fn inputs(&self) -> Vec { + // table scans are leaf nodes and do not have inputs + vec![] + } + + fn to_variant(&self, py: Python) -> PyResult { + Ok(self.clone().into_py(py)) + } +} diff --git a/src/expr/explain.rs b/src/expr/explain.rs new file mode 100644 index 00000000..d5d6a7bb --- /dev/null +++ b/src/expr/explain.rs @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::{self, Display, Formatter}; + +use datafusion_expr::{logical_plan::Explain, LogicalPlan}; +use pyo3::prelude::*; + +use crate::{common::df_schema::PyDFSchema, errors::py_type_err, sql::logical::PyLogicalPlan}; + +use super::logical_node::LogicalNode; + +#[pyclass(name = "Explain", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyExplain { + explain: Explain, +} + +impl From for Explain { + fn from(explain: PyExplain) -> Self { + explain.explain + } +} + +impl From for PyExplain { + fn from(explain: Explain) -> PyExplain { + PyExplain { explain } + } +} + +impl Display for PyExplain { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!( + f, + "Explain + verbose: {:?} + plan: {:?} + stringified_plans: {:?} + schema: {:?} + logical_optimization_succeeded: {:?}", + &self.explain.verbose, + &self.explain.plan, + &self.explain.stringified_plans, + &self.explain.schema, + &self.explain.logical_optimization_succeeded + ) + } +} + +#[pymethods] +impl PyExplain { + fn explain_string(&self) -> PyResult> { + let mut string_plans: Vec = Vec::new(); + for stringified_plan in &self.explain.stringified_plans { + string_plans.push((*stringified_plan.plan).clone()); + } + Ok(string_plans) + } + + fn verbose(&self) -> bool { + self.explain.verbose + } + + fn plan(&self) -> PyResult { + Ok(PyLogicalPlan::from((*self.explain.plan).clone())) + } + + fn schema(&self) -> PyDFSchema { + (*self.explain.schema).clone().into() + } + + fn logical_optimization_succceeded(&self) -> bool { + self.explain.logical_optimization_succeeded + } +} + +impl TryFrom for PyExplain { + type Error = PyErr; + + fn try_from(logical_plan: LogicalPlan) -> Result { + match logical_plan { + LogicalPlan::Explain(explain) => Ok(PyExplain { explain }), + _ => Err(py_type_err("unexpected plan")), + } + } +} + +impl LogicalNode for PyExplain { + fn inputs(&self) -> Vec { + vec![] + } + + fn to_variant(&self, py: Python) -> PyResult { + Ok(self.clone().into_py(py)) + } +} diff --git a/src/expr/filter.rs b/src/expr/filter.rs index 0994b4e8..2def2f7d 100644 --- a/src/expr/filter.rs +++ b/src/expr/filter.rs @@ -47,8 +47,8 @@ impl Display for PyFilter { write!( f, "Filter - \nPredicate: {:?} - \nInput: {:?}", + Predicate: {:?} + Input: {:?}", &self.filter.predicate, &self.filter.input ) } @@ -80,4 +80,8 @@ impl LogicalNode for PyFilter { fn inputs(&self) -> Vec { vec![PyLogicalPlan::from((*self.filter.input).clone())] } + + fn to_variant(&self, py: Python) -> PyResult { + Ok(self.clone().into_py(py)) + } } diff --git a/src/expr/join.rs b/src/expr/join.rs index 428fb670..80166296 100644 --- a/src/expr/join.rs +++ b/src/expr/join.rs @@ -95,14 +95,14 @@ impl Display for PyJoin { write!( f, "Join - \nLeft: {:?} - \nRight: {:?} - \nOn: {:?} - \nFilter: {:?} - \nJoinType: {:?} - \nJoinConstraint: {:?} - \nSchema: {:?} - \nNullEqualsNull: {:?}", + Left: {:?} + Right: {:?} + On: {:?} + Filter: {:?} + JoinType: {:?} + JoinConstraint: {:?} + Schema: {:?} + NullEqualsNull: {:?}", &self.join.left, &self.join.right, &self.join.on, @@ -178,4 +178,8 @@ impl LogicalNode for PyJoin { PyLogicalPlan::from((*self.join.right).clone()), ] } + + fn to_variant(&self, py: Python) -> PyResult { + Ok(self.clone().into_py(py)) + } } diff --git a/src/expr/limit.rs b/src/expr/limit.rs index 2366be63..d7b3f4ca 100644 --- a/src/expr/limit.rs +++ b/src/expr/limit.rs @@ -46,9 +46,9 @@ impl Display for PyLimit { write!( f, "Limit - \nSkip: {} - \nFetch: {:?} - \nInput: {:?}", + Skip: {} + Fetch: {:?} + Input: {:?}", &self.limit.skip, &self.limit.fetch, &self.limit.input ) } @@ -85,4 +85,8 @@ impl LogicalNode for PyLimit { fn inputs(&self) -> Vec { vec![PyLogicalPlan::from((*self.limit.input).clone())] } + + fn to_variant(&self, py: Python) -> PyResult { + Ok(self.clone().into_py(py)) + } } diff --git a/src/expr/logical_node.rs b/src/expr/logical_node.rs index 7d4fe54d..757e4f94 100644 --- a/src/expr/logical_node.rs +++ b/src/expr/logical_node.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use pyo3::{PyObject, PyResult, Python}; + use crate::sql::logical::PyLogicalPlan; /// Representation of a `LogicalNode` in the in overall `LogicalPlan` @@ -22,4 +24,6 @@ use crate::sql::logical::PyLogicalPlan; pub trait LogicalNode { /// The input plan to the current logical node instance. fn inputs(&self) -> Vec; + + fn to_variant(&self, py: Python) -> PyResult; } diff --git a/src/expr/projection.rs b/src/expr/projection.rs index 2551803b..f5ba12db 100644 --- a/src/expr/projection.rs +++ b/src/expr/projection.rs @@ -96,4 +96,8 @@ impl LogicalNode for PyProjection { fn inputs(&self) -> Vec { vec![PyLogicalPlan::from((*self.projection.input).clone())] } + + fn to_variant(&self, py: Python) -> PyResult { + Ok(self.clone().into_py(py)) + } } diff --git a/src/expr/sort.rs b/src/expr/sort.rs index 5037b6d2..8843c638 100644 --- a/src/expr/sort.rs +++ b/src/expr/sort.rs @@ -91,4 +91,8 @@ impl LogicalNode for PySort { fn inputs(&self) -> Vec { vec![PyLogicalPlan::from((*self.sort.input).clone())] } + + fn to_variant(&self, py: Python) -> PyResult { + Ok(self.clone().into_py(py)) + } } diff --git a/src/expr/table_scan.rs b/src/expr/table_scan.rs index bd9e7dbd..63684fe7 100644 --- a/src/expr/table_scan.rs +++ b/src/expr/table_scan.rs @@ -52,9 +52,9 @@ impl Display for PyTableScan { write!( f, "TableScan\nTable Name: {} - \nProjections: {:?} - \nProjected Schema: {:?} - \nFilters: {:?}", + Projections: {:?} + Projected Schema: {:?} + Filters: {:?}", &self.table_scan.table_name, &self.py_projections(), &self.py_schema(), @@ -131,4 +131,8 @@ impl LogicalNode for PyTableScan { // table scans are leaf nodes and do not have inputs vec![] } + + fn to_variant(&self, py: Python) -> PyResult { + Ok(self.clone().into_py(py)) + } } diff --git a/src/expr/union.rs b/src/expr/union.rs index 186fbed1..98e8eaae 100644 --- a/src/expr/union.rs +++ b/src/expr/union.rs @@ -46,8 +46,8 @@ impl Display for PyUnion { write!( f, "Union - \nInputs: {:?} - \nSchema: {:?}", + Inputs: {:?} + Schema: {:?}", &self.union_.inputs, &self.union_.schema, ) } @@ -82,4 +82,8 @@ impl LogicalNode for PyUnion { .map(|x| x.as_ref().clone().into()) .collect() } + + fn to_variant(&self, py: Python) -> PyResult { + Ok(self.clone().into_py(py)) + } } diff --git a/src/sql/logical.rs b/src/sql/logical.rs index 5a21ca8d..95ff3a29 100644 --- a/src/sql/logical.rs +++ b/src/sql/logical.rs @@ -17,10 +17,11 @@ use std::sync::Arc; -use crate::errors::py_runtime_err; +use crate::errors::py_unsupported_variant_err; use crate::expr::aggregate::PyAggregate; use crate::expr::analyze::PyAnalyze; use crate::expr::empty_relation::PyEmptyRelation; +use crate::expr::explain::PyExplain; use crate::expr::filter::PyFilter; use crate::expr::limit::PyLimit; use crate::expr::projection::PyProjection; @@ -29,6 +30,8 @@ use crate::expr::table_scan::PyTableScan; use datafusion_expr::LogicalPlan; use pyo3::prelude::*; +use crate::expr::logical_node::LogicalNode; + #[pyclass(name = "LogicalPlan", module = "datafusion", subclass)] #[derive(Debug, Clone)] pub struct PyLogicalPlan { @@ -53,15 +56,16 @@ impl PyLogicalPlan { /// Return the specific logical operator fn to_variant(&self, py: Python) -> PyResult { Python::with_gil(|_| match self.plan.as_ref() { - LogicalPlan::Aggregate(plan) => Ok(PyAggregate::from(plan.clone()).into_py(py)), - LogicalPlan::Analyze(plan) => Ok(PyAnalyze::from(plan.clone()).into_py(py)), - LogicalPlan::EmptyRelation(plan) => Ok(PyEmptyRelation::from(plan.clone()).into_py(py)), - LogicalPlan::Filter(plan) => Ok(PyFilter::from(plan.clone()).into_py(py)), - LogicalPlan::Limit(plan) => Ok(PyLimit::from(plan.clone()).into_py(py)), - LogicalPlan::Projection(plan) => Ok(PyProjection::from(plan.clone()).into_py(py)), - LogicalPlan::Sort(plan) => Ok(PySort::from(plan.clone()).into_py(py)), - LogicalPlan::TableScan(plan) => Ok(PyTableScan::from(plan.clone()).into_py(py)), - other => Err(py_runtime_err(format!( + LogicalPlan::Aggregate(plan) => PyAggregate::from(plan.clone()).to_variant(py), + LogicalPlan::Analyze(plan) => PyAnalyze::from(plan.clone()).to_variant(py), + LogicalPlan::EmptyRelation(plan) => PyEmptyRelation::from(plan.clone()).to_variant(py), + LogicalPlan::Explain(plan) => PyExplain::from(plan.clone()).to_variant(py), + LogicalPlan::Filter(plan) => PyFilter::from(plan.clone()).to_variant(py), + LogicalPlan::Limit(plan) => PyLimit::from(plan.clone()).to_variant(py), + LogicalPlan::Projection(plan) => PyProjection::from(plan.clone()).to_variant(py), + LogicalPlan::Sort(plan) => PySort::from(plan.clone()).to_variant(py), + LogicalPlan::TableScan(plan) => PyTableScan::from(plan.clone()).to_variant(py), + other => Err(py_unsupported_variant_err(format!( "Cannot convert this plan to a LogicalNode: {:?}", other ))),