-
Couldn't load subscription status.
- Fork 3.9k
ARROW-4589: [Rust] Projection push down query optimizer rule #3664
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@sunchao @paddyhoran @nevi-me This is ready for review |
| use arrow::error::Result; | ||
| use std::rc::Rc; | ||
|
|
||
| /// An optimizer rules performs a transformation on a logical plan to produce an optimized logical plan. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we also restrict the comments to be 90 characters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems is still over 90 characters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed a second commit to fix this
| // sort the projection otherwise we get non-deterministic behavior | ||
| projection.sort(); | ||
|
|
||
| // now that the table scan is returning a different schema we need to create a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it "returning a different schema" - seems the same schema is still returned?
Also, to help me understand, does each plan operator has its own schema and it could be different from the global schema (e.g., the schema of the input source). If so, is the column index the index to the schema of the current schema (e.g., column index in the expr of Sort will point to the schema in the Sort plan)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you just found a bug. Good catch. It should be returning the schema after the projection has been applied.
Yes each plan operator has its own schema (for its output). In some cases (filter, limit, sort) the schema does not change so they can just delegate to their input relation.
Column indexes are always for the schema of the input relation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed a fix for the bug and added a test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Column indexes are always for the schema of the input relation.
Does the schema of the input relation change? I'm still not sure why we need to rewrite the column indexes - can they always point to the complete schema of the input source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logical plan created via the SQL query planner (and the DataFrame API when we have one) does just refer to the original table schema.
The query optimizer transforms the plan and pushes the projection down to the TableScan so that we basically pretend the table only contains the columns we care about. The rest of the plan is then rewritten to be relative to that.
Each operator in the plan is relative to its input and doesn't know about the underlying table schema which could be many levels down, especially once we have joins and subqueries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess one of the reasons for doing this, other than having a concise and simple to comprehend plan, is that ultimately these are indexes into RecordBatch instances.
Let's say we have a csv/parquet file with 300 columns and the query only references 12 of them... If we don't do this rewriting then we are going to have to load a RecordBatch with 300 columns where 288 of them are empty arrays / or empty options, or we have to have some special implementation of RecordBatch which does a mapping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Makes sense. Thanks.
| } | ||
|
|
||
| fn optimize(plan: &LogicalPlan) -> Rc<LogicalPlan> { | ||
| let rule: Rc<RefCell<OptimizerRule>> = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need Rc and RefCell here? can we do:
let mut rule = ProjectionPushDown::new();
rule.optimize(plan).unwrap()There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are correct. I was just trying to case from ProjectionPushDown to OptimizerRule since eventually there will be a list of rules to apply. I will simplify this for now though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
|
Hi @andygrove, I'll be able to take a look on Monday GMT morning |
This PR adds the first query optimizer rule, which rewrites a logical plan to push the projection down to the TableScan.
Once this is merged, I will create a follow up PR to integrate this into the query engine so that only the necessary columns are loaded from disk.