-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: projection_push_down
don't consider VarProvider in columns.
#6254
Conversation
} | ||
|
||
#[tokio::test] | ||
async fn use_var_provider() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know how to set skip_failed_rule = false
in dataframe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let runtime = Arc::new(RuntimeEnv::default());
let config = SessionConfig::new().with_target_partitions(4);
let config = config.set_bool("datafusion.optimizer.skip_failed_rules", false);
SessionState::with_config_rt(config, runtime)
let ctx = SessionContext::with_config(config);
let df = ctx.sql(sql)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the version I got working:
#[derive(Debug)]
struct HardcodedIntProvider {}
impl VarProvider for HardcodedIntProvider {
fn get_value(&self, _var_names: Vec<String>) -> Result<ScalarValue, DataFusionError> {
Ok(ScalarValue::Int64(Some(1234)))
}
fn get_type(&self, _: &[String]) -> Option<DataType> {
Some(DataType::Int64)
}
}
#[tokio::test]
async fn use_var_provider() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("foo", DataType::Int64, false),
Field::new("bar", DataType::Int64, false),
]));
let mem_table = Arc::new(MemTable::try_new(schema, vec![])?);
let config = SessionConfig::new()
.with_target_partitions(4)
.set_bool("datafusion.optimizer.skip_failed_rules", false);
let ctx = SessionContext::with_config(config);
ctx.register_table("csv_table", mem_table)?;
ctx.register_variable(VarType::UserDefined, Arc::new(HardcodedIntProvider {}));
let dataframe = ctx
.sql("SELECT foo FROM csv_table WHERE bar > @var")
.await?;
dataframe.collect().await?;
Ok(())
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jackwener --- it is great to be so responsive
@@ -466,7 +466,9 @@ fn get_expr(columns: &HashSet<Column>, schema: &DFSchemaRef) -> Result<Vec<Expr> | |||
} | |||
}) | |||
.collect::<Vec<Expr>>(); | |||
if columns.len() != expr.len() { | |||
// Because columns may contain VarProvider, so the length of expr may be less than columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if there is some way to strip out the columns earlier (like for example, perhaps we could skip pulling out ScalarVariable
here)
I tried this locally:
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index 7babd659e..00e1d0769 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -270,13 +270,11 @@ pub fn expr_to_columns(expr: &Expr, accum: &mut HashSet<Column>) -> Result<()> {
Expr::Column(qc) => {
accum.insert(qc.clone());
}
- Expr::ScalarVariable(_, var_names) => {
- accum.insert(Column::from_name(var_names.join(".")));
- }
// Use explicit pattern match instead of a default
// implementation, so that in the future if someone adds
// new Expr types, they will check here as well
- Expr::Alias(_, _)
+ Expr::ScalarVariable(_, _)
+ | Expr::Alias(_, _)
| Expr::Literal(_)
| Expr::BinaryExpr { .. }
| Expr::Like { .. }
and it seems promising
} | ||
|
||
#[tokio::test] | ||
async fn use_var_provider() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the version I got working:
#[derive(Debug)]
struct HardcodedIntProvider {}
impl VarProvider for HardcodedIntProvider {
fn get_value(&self, _var_names: Vec<String>) -> Result<ScalarValue, DataFusionError> {
Ok(ScalarValue::Int64(Some(1234)))
}
fn get_type(&self, _: &[String]) -> Option<DataType> {
Some(DataType::Int64)
}
}
#[tokio::test]
async fn use_var_provider() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("foo", DataType::Int64, false),
Field::new("bar", DataType::Int64, false),
]));
let mem_table = Arc::new(MemTable::try_new(schema, vec![])?);
let config = SessionConfig::new()
.with_target_partitions(4)
.set_bool("datafusion.optimizer.skip_failed_rules", false);
let ctx = SessionContext::with_config(config);
ctx.register_table("csv_table", mem_table)?;
ctx.register_variable(VarType::UserDefined, Arc::new(HardcodedIntProvider {}));
let dataframe = ctx
.sql("SELECT foo FROM csv_table WHERE bar > @var")
.await?;
dataframe.collect().await?;
Ok(())
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Beautiful. Thank you @jackwener !
Which issue does this PR close?
Closes #6237.
Rationale for this change
projection_push_down
don't consider VarProvider in columns.What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?