From 4004fbe2e8052698b7bcf59c1b77b735dc19771d Mon Sep 17 00:00:00 2001 From: Ian Alexander Joiner <14581281+iajoiner@users.noreply.github.com> Date: Tue, 28 Feb 2023 12:50:34 -0500 Subject: [PATCH] Add Python wrapper for LogicalPlan::Union (#240) * add union * Make clippy happier --- datafusion/tests/test_imports.py | 2 + src/context.rs | 2 +- src/expr.rs | 2 + src/expr/signature.rs | 1 + src/expr/union.rs | 85 ++++++++++++++++++++++++++++++++ 5 files changed, 91 insertions(+), 1 deletion(-) create mode 100644 src/expr/union.rs diff --git a/datafusion/tests/test_imports.py b/datafusion/tests/test_imports.py index 20e7abbc..8792fac4 100644 --- a/datafusion/tests/test_imports.py +++ b/datafusion/tests/test_imports.py @@ -44,6 +44,7 @@ Aggregate, Sort, Analyze, + Union, Like, ILike, SimilarTo, @@ -105,6 +106,7 @@ def test_class_module_is_datafusion(): Limit, Filter, Analyze, + Union, Like, ILike, SimilarTo, diff --git a/src/context.rs b/src/context.rs index 1acf5f28..4a193776 100644 --- a/src/context.rs +++ b/src/context.rs @@ -605,7 +605,7 @@ impl PySessionContext { let plan = plan.plan.clone(); let fut: JoinHandle> = rt.spawn(async move { plan.execute(part, ctx) }); - let stream = wait_for_future(py, fut).map_err(|e| py_datafusion_err(e))?; + let stream = wait_for_future(py, fut).map_err(py_datafusion_err)?; Ok(PyRecordBatchStream::new(stream?)) } } diff --git a/src/expr.rs b/src/expr.rs index a288b389..c4a9f005 100644 --- a/src/expr.rs +++ b/src/expr.rs @@ -67,6 +67,7 @@ pub mod signature; pub mod sort; pub mod subquery; pub mod table_scan; +pub mod union; /// A PyExpr that can be used on a DataFrame #[pyclass(name = "Expr", module = "datafusion.expr", subclass)] @@ -266,5 +267,6 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/src/expr/signature.rs b/src/expr/signature.rs index c59c9900..2893bef6 100644 --- a/src/expr/signature.rs +++ b/src/expr/signature.rs @@ -19,6 +19,7 @@ use datafusion_expr::{TypeSignature, Volatility}; use pyo3::prelude::*; #[pyclass(name = "Signature", module = "datafusion.expr", subclass)] +#[allow(dead_code)] #[derive(Clone)] pub struct PySignature { type_signature: TypeSignature, diff --git a/src/expr/union.rs b/src/expr/union.rs new file mode 100644 index 00000000..186fbed1 --- /dev/null +++ b/src/expr/union.rs @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion_expr::logical_plan::Union; +use pyo3::prelude::*; +use std::fmt::{self, Display, Formatter}; + +use crate::common::df_schema::PyDFSchema; +use crate::expr::logical_node::LogicalNode; +use crate::sql::logical::PyLogicalPlan; + +#[pyclass(name = "Union", module = "datafusion.expr", subclass)] +#[derive(Clone)] +pub struct PyUnion { + union_: Union, +} + +impl From for PyUnion { + fn from(union_: Union) -> PyUnion { + PyUnion { union_ } + } +} + +impl From for Union { + fn from(union_: PyUnion) -> Self { + union_.union_ + } +} + +impl Display for PyUnion { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!( + f, + "Union + \nInputs: {:?} + \nSchema: {:?}", + &self.union_.inputs, &self.union_.schema, + ) + } +} + +#[pymethods] +impl PyUnion { + /// Retrieves the input `LogicalPlan` to this `Union` node + fn input(&self) -> PyResult> { + Ok(Self::inputs(self)) + } + + /// Resulting Schema for this `Union` node instance + fn schema(&self) -> PyResult { + Ok(self.union_.schema.as_ref().clone().into()) + } + + fn __repr__(&self) -> PyResult { + Ok(format!("Union({})", self)) + } + + fn __name__(&self) -> PyResult { + Ok("Union".to_string()) + } +} + +impl LogicalNode for PyUnion { + fn inputs(&self) -> Vec { + self.union_ + .inputs + .iter() + .map(|x| x.as_ref().clone().into()) + .collect() + } +}