Skip to content

Commit

Permalink
Add Python wrapper for LogicalPlan::Union (#240)
Browse files Browse the repository at this point in the history
* add union

* Make clippy happier
  • Loading branch information
iajoiner authored Feb 28, 2023
1 parent d1f6567 commit 4004fbe
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 1 deletion.
2 changes: 2 additions & 0 deletions datafusion/tests/test_imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
Aggregate,
Sort,
Analyze,
Union,
Like,
ILike,
SimilarTo,
Expand Down Expand Up @@ -105,6 +106,7 @@ def test_class_module_is_datafusion():
Limit,
Filter,
Analyze,
Union,
Like,
ILike,
SimilarTo,
Expand Down
2 changes: 1 addition & 1 deletion src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ impl PySessionContext {
let plan = plan.plan.clone();
let fut: JoinHandle<datafusion_common::Result<SendableRecordBatchStream>> =
rt.spawn(async move { plan.execute(part, ctx) });
let stream = wait_for_future(py, fut).map_err(|e| py_datafusion_err(e))?;
let stream = wait_for_future(py, fut).map_err(py_datafusion_err)?;
Ok(PyRecordBatchStream::new(stream?))
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub mod signature;
pub mod sort;
pub mod subquery;
pub mod table_scan;
pub mod union;

/// A PyExpr that can be used on a DataFrame
#[pyclass(name = "Expr", module = "datafusion.expr", subclass)]
Expand Down Expand Up @@ -266,5 +267,6 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
m.add_class::<sort::PySort>()?;
m.add_class::<analyze::PyAnalyze>()?;
m.add_class::<empty_relation::PyEmptyRelation>()?;
m.add_class::<union::PyUnion>()?;
Ok(())
}
1 change: 1 addition & 0 deletions src/expr/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use datafusion_expr::{TypeSignature, Volatility};
use pyo3::prelude::*;

#[pyclass(name = "Signature", module = "datafusion.expr", subclass)]
#[allow(dead_code)]
#[derive(Clone)]
pub struct PySignature {
type_signature: TypeSignature,
Expand Down
85 changes: 85 additions & 0 deletions src/expr/union.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion_expr::logical_plan::Union;
use pyo3::prelude::*;
use std::fmt::{self, Display, Formatter};

use crate::common::df_schema::PyDFSchema;
use crate::expr::logical_node::LogicalNode;
use crate::sql::logical::PyLogicalPlan;

#[pyclass(name = "Union", module = "datafusion.expr", subclass)]
#[derive(Clone)]
pub struct PyUnion {
union_: Union,
}

impl From<Union> for PyUnion {
fn from(union_: Union) -> PyUnion {
PyUnion { union_ }
}
}

impl From<PyUnion> for Union {
fn from(union_: PyUnion) -> Self {
union_.union_
}
}

impl Display for PyUnion {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(
f,
"Union
\nInputs: {:?}
\nSchema: {:?}",
&self.union_.inputs, &self.union_.schema,
)
}
}

#[pymethods]
impl PyUnion {
/// Retrieves the input `LogicalPlan` to this `Union` node
fn input(&self) -> PyResult<Vec<PyLogicalPlan>> {
Ok(Self::inputs(self))
}

/// Resulting Schema for this `Union` node instance
fn schema(&self) -> PyResult<PyDFSchema> {
Ok(self.union_.schema.as_ref().clone().into())
}

fn __repr__(&self) -> PyResult<String> {
Ok(format!("Union({})", self))
}

fn __name__(&self) -> PyResult<String> {
Ok("Union".to_string())
}
}

impl LogicalNode for PyUnion {
fn inputs(&self) -> Vec<PyLogicalPlan> {
self.union_
.inputs
.iter()
.map(|x| x.as_ref().clone().into())
.collect()
}
}

0 comments on commit 4004fbe

Please sign in to comment.