-
Notifications
You must be signed in to change notification settings - Fork 1.5k
replace reassign_predicate_columns helper with PhysicalExpr::with_schema #15779
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @adriangb. I've a suggestion. As I said, if I don't misestimate the need, this requirement has arisen in some other places as well. So, let's solve it for all
/// Adapt this [`PhysicalExpr`] to a new schema. | ||
/// For example, `Column("b", 1)` can be adapted to `Column("b", 0)` | ||
/// given the schema `Schema::new(vec![("b", DataType::Int32)])`. | ||
fn with_schema(&self, _schema: &Schema) -> Result<Option<Arc<dyn PhysicalExpr>>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think introducing such an API is dangerous. Since during the physical planning duplicate names are possible, a column shouldn't be used in a place apart from its reference schema.
I guess this need occurs because expressions need to be updated while going up and down across the plan tree. I've actually thought on that, and have an idea:
There should be an API at the ExecutionPlan level, like update_expression(&self, expr: Arc, down: bool) -> Option<Arc>
if down
is true, it means that this expression should be valid at the output of this operator, and we need that expression in a way that how it's represented at the input schema of this operator. If the input schema doesn't have any part of this expression, or the expression cannot be built on the output schema columns of this operator, then the function return None.
Likewise, if the down flag is false, this means that this expression references to the input schema of this operator, and we request it corresponding to the output schema of this operator. Similar failure cases returns None here as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, a similar approach is done in projection_pushdown rule, but this design is better IMO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since during the physical planning duplicate names are possible, a column shouldn't be used in a place apart from its reference schema.
Ah good point. It's unclear to me where duplicate columns are allowed and where they aren't. E.g. DFSchema
does not allow duplicate columns IIRC.
There should be an API at the ExecutionPlan level, like update_expression(&self, expr: Arc, down: bool) -> Option
There's some missing types. Do you mean:
trait ExecutionPlan {
fn update_expression(&self, expr: Arc<dyn PhysicalExpr>, down: bool) -> Option<Arc<dyn PhysicalExpr>>>;
}
?
I don't fully follow. What is this function expected to do for different operators, and how does it behave differently across operators? It would help me if you shared the larger vision of how this gets used for this PRs use case, in other places, etc. The code proposed in this PR is being used in places where there is no ExecutionPlan
(e.g. in parquet pruning within ParquetOpener
).
Tangential but I generally don't like the idea of having a bool
parameter like that and would rather it be two functions update_expression_for_input_schema
and update_expression_for_output_schema
or something like that.
Back on topic: how is this different than say PhysicalExpression::with_schema(expr, ExecutionPlan::schema(plan))
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's some missing types. Do you mean:
Yes, exactly like that
I don't fully follow. What is this function expected to do for different operators, and how does it behave differently across operators?
Every operator takes an input schema, and provides an output schema, possibly after doing some modifications on the input schema. So, that modification of schemas should be trackable by some API's. For example, this is an example schema tree:
A: [a+b] + [Sum(x)] + [y]
B: -- [a+b] + [Sum(x)] + [y]
C: ---- [a+b] + [x] + [y]
D: ------[a+b]
E: --------[a] + [b] + [c]
F: ------[x] + [y]
G: --------[z] + [y] + [x]
We should be able to ask, for example to D, "update (a+b)@0*2
as if it's in your input schema", and it will say it is (a@0+b@1)*2
. The inverse question should also be possible. This "foo" expression is written according to your input schema, give me as if it is in your output schema.
TLDR, what I ask from the ExecutionPlan is, operators should be able to track the projection mappings of their input to output columns.
Tangential but I generally don't like the idea of having a bool parameter like that and would rather it be two functions update_expression_for_input_schema and update_expression_for_output_schema or something like that.
We can do that in 2 different methods for sure
Back on topic: how is this different than say PhysicalExpression::with_schema(expr, ExecutionPlan::schema(plan))?
When you provide only 1 schema, and if you don't know the relation between the schema and expression, you cannot resolve the same name issues. Your suggestion seems to provide a flexibility, as you can just give a custom schema and an expression, but what I try to point out is, we cannot do that easily, because we need to track from the source of the expression, until its target. Therefore; I'm suggesting an ExecutionPlan API which helps tracking the all passes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are you going to do that deep inside of ParquetSource, where what you want to provide is the physical file schema that has nothing to do with an ExecutionPlan?
I think I'll need to see some working code with your API replacing this PR to understand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataSource's and FileSources can also share a similar API. Let me open a draft PR for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure would love to see a PR with your suggestion that accomplishes what this one does!
Fundamentally, if I understand your proposal correctly, I would suggest splitting into something like:
trait ExecutionPlan {
fn update_expression_for_input_schema(&self, expr: Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
expr.with_schema(self.input.schema())
}
fn update_expression_for_output_schema(&self, expr: Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
expr.with_schema(self.schema())
}
|
||
fn with_schema(&self, schema: &Schema) -> Result<Option<Arc<dyn PhysicalExpr>>> { | ||
// Find our index in the new schema | ||
let new_index = schema.index_of(&self.name)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This index_of
works with find()
, which have caused many bugs on tables having same named columns
I think this is simpler and avoids the need for specialized code as in #6114.
I also think this could pave the way to one day achieve #15057 in a dynamic way, i.e. let a
PhysicalExpr
how it can be evaluated against a file.This could mean e.g. that variant / json is implemented completely as an external crate.
In the short term this would not be possible because we'd have to add methods to pass the new schema along to functions so they can match their children... etc., it gets a bit complicated but the fact that it's theoretically possible is interesting.