Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: projection_push_down don't consider VarProvider in columns. #6254

Merged
merged 1 commit into from
May 8, 2023

Conversation

jackwener
Copy link
Member

Which issue does this PR close?

Closes #6237.

Rationale for this change

projection_push_down don't consider VarProvider in columns.

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added core Core DataFusion crate optimizer Optimizer rules labels May 5, 2023
}

#[tokio::test]
async fn use_var_provider() -> Result<()> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how to set skip_failed_rule = false in dataframe

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        let runtime = Arc::new(RuntimeEnv::default());
        let config = SessionConfig::new().with_target_partitions(4);
        let config = config.set_bool("datafusion.optimizer.skip_failed_rules", false);
        SessionState::with_config_rt(config, runtime)
        let ctx = SessionContext::with_config(config);
        let df = ctx.sql(sql) 

?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the version I got working:

#[derive(Debug)]
struct HardcodedIntProvider {}

impl VarProvider for HardcodedIntProvider {
    fn get_value(&self, _var_names: Vec<String>) -> Result<ScalarValue, DataFusionError> {
        Ok(ScalarValue::Int64(Some(1234)))
    }

    fn get_type(&self, _: &[String]) -> Option<DataType> {
        Some(DataType::Int64)
    }
}

#[tokio::test]
async fn use_var_provider() -> Result<()> {
    let schema = Arc::new(Schema::new(vec![
        Field::new("foo", DataType::Int64, false),
        Field::new("bar", DataType::Int64, false),
    ]));

    let mem_table = Arc::new(MemTable::try_new(schema, vec![])?);

    let config = SessionConfig::new()
        .with_target_partitions(4)
        .set_bool("datafusion.optimizer.skip_failed_rules", false);

    let ctx = SessionContext::with_config(config);

    ctx.register_table("csv_table", mem_table)?;
    ctx.register_variable(VarType::UserDefined, Arc::new(HardcodedIntProvider {}));

    let dataframe = ctx
        .sql("SELECT foo FROM csv_table WHERE bar > @var")
        .await?;
    dataframe.collect().await?;
    Ok(())
}

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jackwener --- it is great to be so responsive

@@ -466,7 +466,9 @@ fn get_expr(columns: &HashSet<Column>, schema: &DFSchemaRef) -> Result<Vec<Expr>
}
})
.collect::<Vec<Expr>>();
if columns.len() != expr.len() {
// Because columns may contain VarProvider, so the length of expr may be less than columns
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is some way to strip out the columns earlier (like for example, perhaps we could skip pulling out ScalarVariable here)

https://github.com/apache/arrow-datafusion/blob/cf233c855f46fad8355342c581ab55f610758b6c/datafusion/expr/src/utils.rs#L273-L275

I tried this locally:

diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 7babd659e..00e1d0769 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -270,13 +270,11 @@ pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet<Column>) -> Result<()> {
             Expr::Column(qc) => {
                 accum.insert(qc.clone());
             }
-            Expr::ScalarVariable(_, var_names) => {
-                accum.insert(Column::from_name(var_names.join(".")));
-            }
             // Use explicit pattern match instead of a default
             // implementation, so that in the future if someone adds
             // new Expr types, they will check here as well
-            Expr::Alias(_, _)
+            Expr::ScalarVariable(_, _)
+            | Expr::Alias(_, _)
             | Expr::Literal(_)
             | Expr::BinaryExpr { .. }
             | Expr::Like { .. }

and it seems promising

}

#[tokio::test]
async fn use_var_provider() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the version I got working:

#[derive(Debug)]
struct HardcodedIntProvider {}

impl VarProvider for HardcodedIntProvider {
    fn get_value(&self, _var_names: Vec<String>) -> Result<ScalarValue, DataFusionError> {
        Ok(ScalarValue::Int64(Some(1234)))
    }

    fn get_type(&self, _: &[String]) -> Option<DataType> {
        Some(DataType::Int64)
    }
}

#[tokio::test]
async fn use_var_provider() -> Result<()> {
    let schema = Arc::new(Schema::new(vec![
        Field::new("foo", DataType::Int64, false),
        Field::new("bar", DataType::Int64, false),
    ]));

    let mem_table = Arc::new(MemTable::try_new(schema, vec![])?);

    let config = SessionConfig::new()
        .with_target_partitions(4)
        .set_bool("datafusion.optimizer.skip_failed_rules", false);

    let ctx = SessionContext::with_config(config);

    ctx.register_table("csv_table", mem_table)?;
    ctx.register_variable(VarType::UserDefined, Arc::new(HardcodedIntProvider {}));

    let dataframe = ctx
        .sql("SELECT foo FROM csv_table WHERE bar > @var")
        .await?;
    dataframe.collect().await?;
    Ok(())
}

@jackwener
Copy link
Member Author

Thanks @comphead @alamb

@github-actions github-actions bot added logical-expr Logical plan and expressions and removed optimizer Optimizer rules labels May 6, 2023
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beautiful. Thank you @jackwener !

@alamb alamb merged commit 7760191 into apache:main May 8, 2023
@jackwener jackwener deleted the same_expr branch January 9, 2024 06:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

push_down_projection optimization fails when using variables
3 participants