Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make it easier to create WindowFunctions with the Expr API #6747

Closed
alamb opened this issue Jun 22, 2023 · 23 comments · Fixed by #11550
Closed

Make it easier to create WindowFunctions with the Expr API #6747

alamb opened this issue Jun 22, 2023 · 23 comments · Fixed by #11550
Assignees
Labels
enhancement New feature or request help wanted Extra attention is needed

Comments

@alamb
Copy link
Contributor

alamb commented Jun 22, 2023

Is your feature request related to a problem or challenge?

Follow on to #5781

There are at least three things named WindowFunction in DataFusion -- Expr::WindowFunction, window_function::WindowFunction and expr::WindowFunction

https://docs.rs/datafusion-expr/26.0.0/datafusion_expr/index.html?search=WindowFunction

Constructing an Expr::WindowFunction to pass to LogicalPlanBuilder::window is quite challenging

Describe the solution you'd like

I would like to make this process easier with a builder style:

for lead(foo) OVER(PARTITION BY bar) for example:

let expr = lead(col("foo"))
  .with_partition_by(col("bar"))

Describe alternatives you've considered

No response

Additional context

No response

@alamb
Copy link
Contributor Author

alamb commented May 2, 2024

Here is another example from #10345 / @timsaucer showing how non easy it is to create a window function via the expr API

use datafusion::{logical_expr::{expr::WindowFunction, BuiltInWindowFunction, WindowFrame, WindowFunctionDefinition}, prelude::*};

#[tokio::main]
async fn main() -> datafusion::error::Result<()> {

    let ctx = SessionContext::new();
    let mut df = ctx.read_csv("/Users/tsaucer/working/testing_ballista/lead_lag/example.csv", CsvReadOptions::default()).await?;

    df = df.with_column("array_col", make_array(vec![col("a"), col("b"), col("c")]))?;

    df.clone().show().await?;

    let lag_expr = Expr::WindowFunction(WindowFunction::new(
        WindowFunctionDefinition::BuiltInWindowFunction(
            BuiltInWindowFunction::Lead,
        ),
        vec![col("array_col")],
        vec![],
        vec![],
        WindowFrame::new(None),
        None,
    ));

    df = df.select(vec![col("a"), col("b"), col("c"), col("array_col"), lag_expr.alias("lagged")])?;

    df.show().await?;

    Ok(())
}

It would be great if instead of

    let lag_expr = Expr::WindowFunction(WindowFunction::new(
        WindowFunctionDefinition::BuiltInWindowFunction(
            BuiltInWindowFunction::Lead,
        ),
        vec![col("array_col")],
        vec![],
        vec![],
        WindowFrame::new(None),
        None,
    ));

It looked more like

    let lag_expr = lead(
        vec![col("array_col")],
        vec![],
        vec![],
        WindowFrame::new(None),
        None,
    ));

Maybe even better like a builder style

    let lag_expr = lead(col("array_col")).build()

Which would permit adding the various OVER clauses like

    let lag_expr = lead(col("array_col"))
      .partition_by(vec![])
      .order_by(vec![])
      .build()

Maybe there are some inspirations in the polars API too: https://docs.pola.rs/user-guide/expressions/window/#group-by-aggregations-in-selection

@alamb alamb added the help wanted Extra attention is needed label May 2, 2024
@alamb
Copy link
Contributor Author

alamb commented May 2, 2024

🤔 it seems like spark's API is like

count("dt").over(w).alias("count")).show()

https://stackoverflow.com/questions/32769328/how-to-use-window-functions-in-pyspark-using-dataframes

So maybe for DataFusion it could look like

   let w = Window::new()
     .partition_by(col("id"))
     .order_by(col("dt"));

    let lag_expr = lag(col("array_col"))
       .over(w)

@alamb
Copy link
Contributor Author

alamb commented May 2, 2024

Note I have some code in #6746 that had some part of it (along with an example)

@shanretoo
Copy link
Contributor

I am willing to help with this task.

@timsaucer
Copy link
Contributor

Great! I've rebased @alamb 's branch and added the changes I suggested. I was about to start testing the code and then I was going to write up the unit tests. My work in progress is here: https://github.com/timsaucer/datafusion/tree/feature/easier_window_funcs There was a little bit of changes I needed to make around the null_options. I got distracted by a task in the datafusion-python repo but I was hoping to tackle this very soon.

@shanretoo
Copy link
Contributor

Thanks for your update! I'll work on the tests.

@shanretoo
Copy link
Contributor

FYI, my work is in: https://github.com/shanretoo/datafusion/tree/feat-window-fn

@shanretoo
Copy link
Contributor

@timsaucer I have fixed the calls of expr::WindowFunction to meet the changes and add tests for those window functions in dataframe_functions.rs.
Let me know if I missed anything.

@timsaucer
Copy link
Contributor

Oh, great. Have you been able to run the example code above using the new easy interface?

@shanretoo
Copy link
Contributor

You can check it in the unit test: test_fn_lead.

@timsaucer
Copy link
Contributor

Thank you. I pulled your branch and many of the tests are failing for me even though the functions are returning correct values when I add additional debug statements. I think what's happening here is that because we have the partition_by there is no guarantee what order the results come back as. On my machine the unit tests are returning the partitions on column C in order 10 then 1. I'm guessing on yours it was the opposite.

There are a couple of things I think we can do to resolve this. One way would be to make a new macro for testing these partitioned functions. I could do something like

macro_rules! assert_sorted_fn_batches {
    ($EXPR:expr, $EXPECTED: expr, $SORTBY: expr) => {
        let df = create_test_table().await?;
        let df = df.select($EXPR)?.sort($SORTBY)?.limit(0, Some(10))?;
        let batches = df.collect().await?;

        assert_batches_eq!($EXPECTED, &batches);
    };
}

And then the lead function test would become


async fn test_fn_lead() -> Result<()> {

    let expr = lead(col("b"), Some(1), Some(ScalarValue::Int32(Some(-1))))
        .with_partition_by(vec![col("c")])
        .with_order_by(vec![col("b").sort(true, true)])
        .build()
        .alias("lead_b");

    let expected = [
        "+----+--------+",
        "| c  | lead_b |",
        "+----+--------+",
        "| 1  | 10     |",
        "| 1  | 10     |",
        "| 1  | -1     |",
        "| 10 | -1     |",
        "+----+--------+",
    ];

    let select_expr = vec![col("c"), expr];
    let sort_by = vec![col("c").sort(true, true)];

    assert_sorted_fn_batches!(select_expr, expected, sort_by);

    Ok(())
}

I've added an alias just because I think it makes the test more readable. If we wanted to get really explicit we could also output column A, and sort by columns A and C then we would have guaranteed the correctness because each row would be unique.

@timsaucer
Copy link
Contributor

The one thing I think we're missing is the other variants for these. I don't think it's covered in other unit tests that I can find. So for example, for lead we would want to validate:

  • that setting a different shift offset works as expected
  • That setting no shift offset default works (this can be the basic test)
  • Setting no default value gives nulls
  • With and without partition_by
  • Testing with and without order_by (required by some window functions, so only test with)
  • Testing with and without null treatment - I'm not sure which of the functions this impacts
  • Testing with and without window frames

What do you think? I might try to write a macro around all these variants.

I'm now unblocked on the other task I was working on, so I can pick it up if you'd like or I'm happy to work on other things. Please let me know.

@shanretoo
Copy link
Contributor

Sorry, my fault. I haven't taken into account the ordering issue. Maybe we could add a following match arm in the macro to omit the order_by parameter and add the output column A to ensure the correctness. What do you think?

macro_rules! assert_sorted_fn_batches {
    ($EXPR:expr, $EXPECTED: expr) => {
        let sort_by = $EXPR
            .iter()
            .map(|e| {
                let alias = e.name_for_alias().expect("failed to get an alias");
                col(alias).sort(true, true)
            })
            .collect::<Vec<_>>();
        assert_sorted_fn_batches!($EXPR, $EXPECTED, sort_by);
    };

@shanretoo
Copy link
Contributor

Have you checked tests in sqllogictest?
If we want to make sure all the variants work as expected, I think we should add those tests in sqllogictest.
And for the unit tests here, we can just check the situations that might have different results, for example, those default values we set in the builder functions.
What do you think?

You can take over this and I'm happy to help when needed.

@timsaucer
Copy link
Contributor

I think you're doing a great job, and good point on the sqllogictest. TBH I find those tests harder to wrap my head around than the rust tests, but that's more personal preference.

About the test function, I realize we can probably make it simpler:

macro_rules! assert_unordered_fn_batches {
    ($EXPRS:expr, $EXPECTED: expr) => {
        let df = create_test_table().await?;
        let df = df.select($EXPRS)?.limit(0, Some(10))?;
        let batches = df.collect().await?;

        assert_batches_sorted_eq!($EXPECTED, &batches);
    };
}
#[tokio::test]
async fn test_fn_lead() -> Result<()> {

    let expr = lead(col("b"), Some(1), Some(ScalarValue::Int32(Some(-1))))
        .with_partition_by(vec![col("c")])
        .with_order_by(vec![col("b").sort(true, true)])
        .build()
        .alias("lead_b");

    let expected = [
        "+-----------+----+--------+",
        "| a         | c  | lead_b |",
        "+-----------+----+--------+",
        "| 123AbcDef | 10 | -1     |",
        "| CBAdef    | 1  | -1     |",
        "| abc123    | 1  | 10     |",
        "| abcDEF    | 1  | 10     |",
        "+-----------+----+--------+",
    ];

    let select_expr = vec![col("a"), col("c"), expr];

    assert_unordered_fn_batches!(select_expr, expected);

    Ok(())
}

What do you think?

@shanretoo
Copy link
Contributor

Looks good. It is clearer to understand the results in this way.

@alamb
Copy link
Contributor Author

alamb commented Jun 8, 2024

Update here is that @jayzhan211 and I have been working on a similar API for creating Aggregate exprs on #10560. I am quite pleased with how it worked out. Perhaps we can follow a similar model for the window functions

@alamb
Copy link
Contributor Author

alamb commented Jul 7, 2024

In case anyone is following along, @jayzhan211 added a really nice trait for working with aggregate functions. Maybe we can do something similar for window functions eventually

/// Extensions for configuring [`Expr::AggregateFunction`]
///
/// Adds methods to [`Expr`] that make it easy to set optional aggregate options
/// such as `ORDER BY`, `FILTER` and `DISTINCT`
///
/// # Example
/// ```no_run
/// # use datafusion_common::Result;
/// # use datafusion_expr::{AggregateUDF, col, Expr, lit};
/// # use sqlparser::ast::NullTreatment;
/// # fn count(arg: Expr) -> Expr { todo!{} }
/// # fn first_value(arg: Expr) -> Expr { todo!{} }
/// # fn main() -> Result<()> {
/// use datafusion_expr::AggregateExt;
///
/// // Create COUNT(x FILTER y > 5)
/// let agg = count(col("x"))
/// .filter(col("y").gt(lit(5)))
/// .build()?;
/// // Create FIRST_VALUE(x ORDER BY y IGNORE NULLS)
/// let sort_expr = col("y").sort(true, true);
/// let agg = first_value(col("x"))
/// .order_by(vec![sort_expr])
/// .null_treatment(NullTreatment::IgnoreNulls)
/// .build()?;
/// # Ok(())
/// # }
/// ```
pub trait AggregateExt {
/// Add `ORDER BY <order_by>`
///
/// Note: `order_by` must be [`Expr::Sort`]
fn order_by(self, order_by: Vec<Expr>) -> AggregateBuilder;
/// Add `FILTER <filter>`
fn filter(self, filter: Expr) -> AggregateBuilder;
/// Add `DISTINCT`
fn distinct(self) -> AggregateBuilder;
/// Add `RESPECT NULLS` or `IGNORE NULLS`
fn null_treatment(self, null_treatment: NullTreatment) -> AggregateBuilder;
}

@timsaucer
Copy link
Contributor

timsaucer commented Jul 18, 2024

I've started looking at this and coming up against one blocker that prevents just following the exact pattern.

My first thought was to implement a trait like

pub trait WindowExt {
    fn order_by(self, order_by: Vec<Expr>) -> WindowBuilder;
    fn partition_by(self, partitions: Vec<Expr>) -> WindowBuilder;
    fn window_frame(self, window_frame: WindowFrame) -> WindowBuilder;
    fn null_treatment(self, null_treatment: NullTreatment) -> WindowBuilder;
}

The problem with this is that we would have two traits implemented on Expr that have two identical function names, order_by and null_treatment. I could give them different names, but that isn't a great user experience. Plus there's the fact that all aggregate functions can be used as window functions.

My current thinking is that instead of doing this, I should rename AggregateExt to something like ExprExt. This trait would have something like

pub trait ExprExt {
    fn order_by(self, order_by: Vec<Expr>) -> ExprBuilder;
    fn filter(self, filter: Expr) -> ExprBuilder;
    fn distinct(self) -> ExprBuilder;
    fn partition_by(self, partitions: Vec<Expr>) -> ExprBuilder;
    fn window_frame(self, window_frame: WindowFrame) -> ExprBuilder;
    fn null_treatment(self, null_treatment: NullTreatment) -> ExprBuilder;
}

Then the ExprBuilder would do something like

pub struct ExprBuilder {
    expr_data: ExprBuilderData,
    order_by: Option<Vec<Expr>>,
    filter: Option<Expr>,
    distinct: bool,
    null_treatment: Option<NullTreatment>,
}

And finally

enum ExprBuilderData {
    AggregateBuilderData(AggregateFunction),
    WindowBuilderData(WindowFunction),
}

(case statement remove from original comment)

I haven't dug too much deeper into it, but these are my initial design ideas. @jayzhan211 and @alamb what do you think?

@timsaucer
Copy link
Contributor

I'm also wondering if instead of carrying the data around in the builder, we can just update the member within expr_data as we go. That is, initialize it to defaults and update the values as the builder is called.

@jayzhan211
Copy link
Contributor

The overall idea looks good to me, but I'm not sure about the Case you mentioned, is there any Expr that is also possible be benefited from builder mode?

My current idea is specific to function only, but if there is other non-function expr, we could extend it like ExprExt, ExprBuidler with the related methods they need.

pub trait FunctionExt {
    /// Add `ORDER BY <order_by>`
    ///
    /// Note: `order_by` must be [`Expr::Sort`]
    fn order_by(self, order_by: Vec<Expr>) -> AggregateBuilder;
    /// Add `FILTER <filter>`
    fn filter(self, filter: Expr) -> AggregateBuilder;
    /// Add `DISTINCT`
    fn distinct(self) -> AggregateBuilder;
    /// Add `RESPECT NULLS` or `IGNORE NULLS`
    fn null_treatment(self, null_treatment: NullTreatment) -> AggregateBuilder;

    fn partiion_by:
    fn window_frame:
    ...
}

pub struct FuncBuilder {
    fun: Option<FuncKind>,
    order_by: Option<Vec<Expr>>,
    filter: Option<Expr>,
    distinct: bool,
    null_treatment: Option<NullTreatment>,
    partition_by,
    window_frame,
    ....
}

pub enum FuncKind {
    Aggregate(AggregateFunction),
    Window(WindowFunction),
}

@timsaucer
Copy link
Contributor

Good point. I was just thinking about it and came on here to remove that from my comment! So we are well aligned. Great suggestions. I'll move forward on working on this tomorrow.

@timsaucer
Copy link
Contributor

I started a new branch off main with these changes. Tomorrow I'll review the previous branch @shanretoo was working on to make sure I didn't miss any unit tests he added. Otherwise all of the cargo tests pass for me with this change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request help wanted Extra attention is needed
Projects
None yet
4 participants