-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix output schema generated by CommonSubExprEliminate #3726
Fix output schema generated by CommonSubExprEliminate #3726
Conversation
Fall back to the merged schema from the whole logical plan if the input schema was not sufficient to resolve the datatype of a sub-expression. This re-enables the fallback logic added in 3860cd3 (apache#1925).
// expression type could not be resolved in schema, fall back to all schemas | ||
let merged_schema = | ||
self.all_schemas | ||
.iter() | ||
.fold(DFSchema::empty(), |mut lhs, rhs| { | ||
lhs.merge(rhs); | ||
lhs | ||
}); | ||
expr.get_type(&merged_schema)? | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am honestly unsure if this fall-back logic is necessary at all since the (sub-expression) data types should be resolvable just from the input schema to the respective logical plan node.
However, I did not want to break anything since this fall-back logic was already in place.
All tests pass with and without the fall-back logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think all exprs should be resolvable with the unified schemas, as explained above -- but maybe it is a performance optimization 🤔.
Perhaps you could leave a comment explaining that we are not sure if it is necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code may be a workaround from some issue we have since fixed 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#3730 <-- pr to remove the fallback
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comments that explain that the fall-back logic can likely be removed.
let expected = r###"[ | ||
( | ||
"CAST(table.a AS Int64)table.a", | ||
Int64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This used to be just Boolean
before the fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me -- thank you @alex-natzka
cc @waynexia -- do you have time to review this PR as well?
/// all schemas in the logical plan, as a fall back if we cannot resolve an expression type | ||
/// from the input schema alone | ||
all_schemas: Vec<DFSchemaRef>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand in what cases we wouldn't be able to resolve an expr type from the input schema alone.
The only case I can think of is when the plan node has more than one input (e.g. a Join or a Union) -- but thus I would expect that we always resolve the type of the expressions using the input schema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While Joins and Unions are not (yet?) handled by this optimization rule, I think that even in these cases we should be able to construct one consolidated schema that is used to resolve the expression type - otherwise the expression probably is invalid in the first place.
Right now the fall-back logic just randomly merges all possible schemas into one - there's no guarantee the resulting, merged schema will be any good for resolving the expression at hand. That's especially the case if the logical plan involves lots of aliasing - there may be many fields from different nodes that have the same name, e.g. a
, but different data types; the merged schema will have only one a
column, though, the first that we encounter while merging schemas.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am glad I was not the only one confused :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't worry -- there have been many changes in DataFusion and it was (and still is) a fast changing codebase -- the day I write perfect code without any errors will be the day I hold other people to the same standard 😆
// expression type could not be resolved in schema, fall back to all schemas | ||
let merged_schema = | ||
self.all_schemas | ||
.iter() | ||
.fold(DFSchema::empty(), |mut lhs, rhs| { | ||
lhs.merge(rhs); | ||
lhs | ||
}); | ||
expr.get_type(&merged_schema)? | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think all exprs should be resolvable with the unified schemas, as explained above -- but maybe it is a performance optimization 🤔.
Perhaps you could leave a comment explaining that we are not sure if it is necessary
@@ -597,22 +640,36 @@ mod test { | |||
fn id_array_visitor() -> Result<()> { | |||
let expr = binary_expr( | |||
binary_expr( | |||
sum(binary_expr(col("a"), Operator::Plus, lit("1"))), | |||
sum(binary_expr(col("a"), Operator::Plus, lit(1))), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this test doesn't make sense as coercion should have happened before this pass
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran all the tests without the fallback of the plan's schemas and they worked. Thus I think it is not necessary.
However, I like your incremental approach to development of keeping the old code there as it is no worse than master. I would be fine with merging this PR as is and I will create a PR that removes a workaround as a follow on.
I will wait until tomorrow to see if @waynexia would like to comment too.
// expression type could not be resolved in schema, fall back to all schemas | ||
let merged_schema = | ||
self.all_schemas | ||
.iter() | ||
.fold(DFSchema::empty(), |mut lhs, rhs| { | ||
lhs.merge(rhs); | ||
lhs | ||
}); | ||
expr.get_type(&merged_schema)? | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code may be a workaround from some issue we have since fixed 🤔
I will review it tomorrow |
Point out that it can likely be removed.
Github actions is having issues I think -- the CI failures are not related to changes in this PR |
I plan on reviewing this today |
Sorry for the late reply, I plan to review it later this day. |
I would like to merge this PR -- I'll plan on doing so tomorrow if @waynexia and/or @andygrove haven't had a chance by then |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've reviewed this fix and it looks reasonable to me. Thanks @alex-natzka @alamb
Thanks again @alex-natzka and @waynexia |
Benchmark runs are scheduled for baseline = e10d647 and contender = 0cf5630. 0cf5630 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Closes #3635.
Rationale for this change
The optimization rule
CommonSubexprEliminate
produces a wrong output schema in many situations. If a logical plan depends on the output schema, it will simply be broken.What changes are included in this PR?
ExprIdentifierVisitor
, which is used to determine the datatype of every sub-expression, the datatype is now determined based on the schema of the logical plan for every sub-expression individually instead of determining the datatype of the overall expression and wrongly assigning it to every sub-expression.ExprIdentifierVisitor
now, accordingly, does not have adatatype
attribute but, instead,input_schema
andall_schemas
.The latter is necessary for a fall-back logic, where if the datatype of a sub-expression cannot be determined using the input schema, we merge the schemas from all nodes of the overall logical plan and try again.
Filter
logical plan.Are there any user-facing changes?
No.