Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Commit

Permalink
#819: python bindings for window functions (#819)
Browse files Browse the repository at this point in the history
Thanks for the contribution!

GitOrigin-RevId: 743a81f4e6cd065333d3e17f7231aba45495db67
  • Loading branch information
jgoday authored and jimexist committed Dec 31, 2021
1 parent 4e87e84 commit f8433b7
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 0 deletions.
18 changes: 18 additions & 0 deletions datafusion/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import pyarrow as pa
import pytest

from datafusion import functions as f
from datafusion import DataFrame, ExecutionContext, column, literal, udf


Expand Down Expand Up @@ -117,6 +118,23 @@ def test_join():
assert table.to_pydict() == expected


def test_window_lead(df):
df = df.select(
column("a"),
f.alias(
f.window(
"lead", [column("b")], order_by=[f.order_by(column("b"))]
),
"a_next",
),
)

table = pa.Table.from_batches(df.collect())

expected = {"a": [1, 2, 3], "a_next": [5, 6, None]}
assert table.to_pydict() == expected


def test_get_dataframe(tmp_path):
ctx = ExecutionContext()

Expand Down
61 changes: 61 additions & 0 deletions src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use datafusion::physical_plan::{
aggregates::AggregateFunction, functions::BuiltinScalarFunction,
};

use crate::errors;
use crate::expression::PyExpr;

#[pyfunction]
Expand Down Expand Up @@ -85,6 +86,63 @@ fn concat_ws(sep: String, args: Vec<PyExpr>) -> PyResult<PyExpr> {
Ok(logical_plan::concat_ws(sep, &args).into())
}

/// Creates a new Sort expression
#[pyfunction]
fn order_by(
expr: PyExpr,
asc: Option<bool>,
nulls_first: Option<bool>,
) -> PyResult<PyExpr> {
Ok(PyExpr {
expr: datafusion::logical_plan::Expr::Sort {
expr: Box::new(expr.expr),
asc: asc.unwrap_or(true),
nulls_first: nulls_first.unwrap_or(true),
},
})
}

/// Creates a new Alias expression
#[pyfunction]
fn alias(expr: PyExpr, name: &str) -> PyResult<PyExpr> {
Ok(PyExpr {
expr: datafusion::logical_plan::Expr::Alias(
Box::new(expr.expr),
String::from(name),
),
})
}

/// Creates a new Window function expression
#[pyfunction]
fn window(
name: &str,
args: Vec<PyExpr>,
partition_by: Option<Vec<PyExpr>>,
order_by: Option<Vec<PyExpr>>,
) -> PyResult<PyExpr> {
use std::str::FromStr;
let fun = datafusion::physical_plan::window_functions::WindowFunction::from_str(name)
.map_err(|e| -> errors::DataFusionError { e.into() })?;
Ok(PyExpr {
expr: datafusion::logical_plan::Expr::WindowFunction {
fun,
args: args.into_iter().map(|x| x.expr).collect::<Vec<_>>(),
partition_by: partition_by
.unwrap_or(vec![])
.into_iter()
.map(|x| x.expr)
.collect::<Vec<_>>(),
order_by: order_by
.unwrap_or(vec![])
.into_iter()
.map(|x| x.expr)
.collect::<Vec<_>>(),
window_frame: None,
},
})
}

macro_rules! scalar_function {
($NAME: ident, $FUNC: ident) => {
scalar_function!($NAME, $FUNC, stringify!($NAME));
Expand Down Expand Up @@ -218,6 +276,7 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(abs))?;
m.add_wrapped(wrap_pyfunction!(acos))?;
m.add_wrapped(wrap_pyfunction!(approx_distinct))?;
m.add_wrapped(wrap_pyfunction!(alias))?;
m.add_wrapped(wrap_pyfunction!(array))?;
m.add_wrapped(wrap_pyfunction!(ascii))?;
m.add_wrapped(wrap_pyfunction!(asin))?;
Expand Down Expand Up @@ -249,6 +308,7 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(min))?;
m.add_wrapped(wrap_pyfunction!(now))?;
m.add_wrapped(wrap_pyfunction!(octet_length))?;
m.add_wrapped(wrap_pyfunction!(order_by))?;
m.add_wrapped(wrap_pyfunction!(random))?;
m.add_wrapped(wrap_pyfunction!(regexp_match))?;
m.add_wrapped(wrap_pyfunction!(regexp_replace))?;
Expand Down Expand Up @@ -278,5 +338,6 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(trim))?;
m.add_wrapped(wrap_pyfunction!(trunc))?;
m.add_wrapped(wrap_pyfunction!(upper))?;
m.add_wrapped(wrap_pyfunction!(window))?;
Ok(())
}

0 comments on commit f8433b7

Please sign in to comment.