-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mismatch in MemTable of Select Into when projecting on aggregate window functions #6566
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We touched this window problem here #5695 (comment)
so to fix the issue completely we need to realias correctly instead of shorten the name in physical plan
https://github.com/apache/arrow-datafusion/blob/26e1b20ea3362ea62cb713004a0636b8af6a16d7/datafusion/core/src/physical_plan/planner.rs#L1630
@@ -73,6 +75,26 @@ impl MemTable { | |||
} | |||
} | |||
|
|||
/// Create a new in-memory table from the record batches. | |||
/// In case of empty table, the schema is inferred from the input plan. | |||
pub fn new_not_registered( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this code effectively ignores the schema
argument if any of partitions
has a RecordBatch and uses that RecordBatch's schema instead.
I think this is a surprising behavior and might mask errors in the future
@@ -518,7 +518,7 @@ impl SessionContext { | |||
let physical = DataFrame::new(self.state(), input); | |||
|
|||
let batches: Vec<_> = physical.collect_partitioned().await?; | |||
let table = Arc::new(MemTable::try_new(schema, batches)?); | |||
let table = Arc::new(MemTable::new_not_registered(schema, batches)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like the core problem here is schema
(aka the schema of input
) does not actually match the schema of the batches that are produced.
As I think @comphead is suggesting in #6566 (review), this PR seems to be trying to workaround the deeper problem of the window exec producing an output schema (different column names) than the LogicalPlan says.
Have you looked into making the names consistent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, my first suggestion is to use longer version of the name in planner (what is done in the LogicalPlan). However, there need to be many changes in tests, and the BoundedWindowAggExec lines may become too long. If it is not a problem, we can solve the issue like that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using the same names in physical and logical plans is preferable because the rest of the parts of the code expects this and sometimes makes assumptions that it is the case (because it mostly is).
If we don't make the logical and physical plans match up, I predict we will continue to hit a long tail of bugs related to schema mismatches, only when using window functions related to the discrepancy.
If the long display name is a problem (and I can see how it would be) perhaps we can figure out how to make display_name
produce something shorter for window functions other than serializing the entire window definition
Here is what postgres does:
postgres=# select first_value(x) over (order by x) from foo;
first_value
-------------
1
(1 row)
We probably need to do something more sophisticated as DataFusion needs distinct column names.
In planner, modifying the lines
to that
solves my issue. Is this what you mean by realiasing? |
Yes, that is what I was referring to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/apache/arrow-datafusion/pull/6373/files
This is realias for one of the similar cases in the plan builder. After that schema and batches are in sync.
Sorry I should have attached this PR earlier to save @berkaysynnada time
I have updated the code and took out the old stuff. When the final window plan is built, we realias the aggregate window expressions. It is worked for SELECT INTO case. I'd like to know what you think about these changes. Thank you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @berkaysynnada. Great work. The realias way is good.
And also its good to have a generic function to shorten the column name.
I will let @alamb to look into this again. Initial vision was to move the window shortening name logic to the builder and realias instead of changing the name forcibly in physical plan.
With introduced logic the PR becomes more breaking.
- columns names not synced in the SELECT and SELECT INTO which might be unexpected.
- columns are our contract with users, so new col names in the memtable need to be tested for every function and documented
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @berkaysynnada and @comphead -- I am sorry for the delay in response. I don't think this is quite the right approach yet
datafusion/sql/src/select.rs
Outdated
let right = create_physical_name(right, false)?; | ||
Ok(format!("{left} {op} {right}")) | ||
} | ||
Expr::Case(case) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, this appears to the same code as https://github.com/apache/arrow-datafusion/blob/1af846bd8de387ce7a6e61a2008917a7610b9a7b/datafusion/physical-expr/src/expressions/case.rs#L66-L77
If we ever changed the code in phsical-expr and did not change this code, would that cause problems?
datafusion/sql/src/select.rs
Outdated
@@ -555,3 +565,288 @@ fn match_window_definitions( | |||
} | |||
Ok(()) | |||
} | |||
|
|||
fn create_function_physical_name( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this functionality need to remain in sync with the creation of physical names?
datafusion/sql/src/select.rs
Outdated
if select.into.is_some() { | ||
for expr in select_exprs_post_aggr.iter_mut() { | ||
if let Expr::Column(_) = expr.clone() { | ||
*expr = expr.clone().alias(physical_name(expr)?); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am sorry if my past comments have been confusing. Here is what I was trying to say earlier in #6566 (comment):
I ran this command to get some logs (with the extra debug in PR #6626):
RUST_LOG=debug cargo test --test sqllogictests -- ddl 2>&1 | tee /tmp/debug.log
Here is the content of debug.log: debug.log
From the log, here is the LogialPlan
that shows the WindowAggr
declares it makes a column named SUM(test_table.c1) ORDER BY [test_table.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
(yes that whole thing!)
Projection: SUM(test_table.c1) ORDER BY [test_table.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, test_table.c2, test_table.c3
WindowAggr: windowExpr=[[SUM(test_table.c1) ORDER BY [test_table.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
TableScan: test_table projection=[c1, c2, c3]
Here is the final ExecutionPlan
, also showing the same giant column as the declared output name:
[2023-06-10T11:43:29Z DEBUG datafusion::physical_plan::planner] Optimized physical plan:
ProjectionExec: expr=[SUM(test_table.c1) ORDER BY [test_table.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as SUM(test_table.c1), c2@1 as c2, c3@2 as c3]
BoundedWindowAggExec: wdw=[SUM(test_table.c1): Ok(Field { name: "SUM(test_table.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Float32(NULL)), end_bound: CurrentRow }], mode=[Sorted]
SortPreservingMergeExec: [c2@1 ASC NULLS LAST]
SortExec: expr=[c2@1 ASC NULLS LAST]
MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
However, looking at the logs, what the execution plan actually produces a column named SUM(test_table.c1)
:
[2023-06-10T11:43:29Z DEBUG datafusion::datasource::memory] mem schema does not contain batches schema.
Target_schema: Schema { fields: [
Field { name: "SUM(test_table.c1) ORDER BY [test_table.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "c2", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "c3", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {}
}.
Batches Schema: Schema { fields: [
Field { name: "SUM(test_table.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "c2", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "c3", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {}
}
Thus, what I was trying to say earlier is that I think the root of the problem is the mismatch between what the plans say the field name of the output is and what the field name that the WindowExec is actually producing.
So I think we should fix this bug by resolving the mismatch. Either:
- Update the Logical/Physical plans so the field names of WindowAgg matches what the
BoundedWindowAggExec
actually produces - OR Update
BoundedWindowAggExec
to produce the field names declared by the `WindowAggExec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @alamb @berkaysynnada
Would you mind if I also open a small PR for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind if I also open a small PR for this?
I would be very much appreciative, personally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late reply. I have tried both suggestions @alamb:
- We need to modify 3 parts of the code:
a- In theproject()
function in select.rs, the final schema will be constructed with a shortened form of the window function.
fn to_field(&self, input_schema: &DFSchema) -> Result<DFField> {
match self {
Expr::Column(c) => Ok(DFField::new(
c.relation.clone(),
&c.name,
self.get_type(input_schema)?,
self.nullable(input_schema)?,
)),
_ => {
Ok(DFField::new_unqualified(
&self.display_name()?,
self.get_type(input_schema)?,
self.nullable(input_schema)?,
))
}
}
}
is expanded with that arm:
Expr::WindowFunction(WindowFunction { fun, args, .. }) => {
Ok(DFField::new_unqualified(
&vec![create_function_name(&fun.to_string(), false, args)?].join(" "),
self.get_type(input_schema)?,
self.nullable(input_schema)?,
))
}
b- In the project()
function again, qualified wildcard columns are normalized. However, the column name is in the longer form, and the schema of the plan is in the shorter form. Therefore, we also change the expr_as_column_expr()
function so that window function expressions are converted to column expressions with the shortened column name format, which can be copied from the schema.
c- PushDownProjection
rule again creates new column expressions with display_name()
function (which returns the long format) in the window handling arm. These column names also need to be shortened to satisfy subsequent control.
My opinion: Can we directly change the display_name()
function for window functions such that only function name and arguments are returned? Thus we don't need to change any of what I mentioned above.
Expr::WindowFunction(WindowFunction {
fun, args, window_frame, partition_by, order_by,
}) => {
let mut parts: Vec<String> =
vec![create_function_name(&fun.to_string(), false, args)?];
if !partition_by.is_empty() {
parts.push(format!("PARTITION BY {partition_by:?}"));
}
if !order_by.is_empty() {
parts.push(format!("ORDER BY {order_by:?}"));
}
parts.push(format!("{window_frame}"));
Ok(parts.join(" "))
}
new version:
Expr::WindowFunction(WindowFunction {
fun, args, ..
}) => {
let mut parts: Vec<String> =
vec![create_function_name(&fun.to_string(), false, args)?];
Ok(parts.join(" "))
}
- We can only change
create_window_expr()
such that it creates the window name withdisplay_name()
rather thanphysical_name()
, but it makes the plans longer, also lots of test change burden.
I would like to wrap up this PR and any thoughts you have would be really helpful. Can you review the alternatives above when you get a chance? Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@berkaysynnada thanks for checking that.
I was also working on that.
Changing display_name
was the one I started with but in this case other scenarios will fail. When window plan created the DFS schema check name uniqueness from display_name
not considering aliases. So this query will fail
SELECT
first_value(c9) OVER (PARTITION BY c2 ORDER BY c9) first_c9,
first_value(c9) OVER (PARTITION BY c2 ORDER BY c9 DESC) first_c9_desc
FROM aggregate_test_100
I'm still thinking how to overcome that without breaking changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, we need to get back on column naming convention as currently long names are not user friendly and not useful without aliases in nested queries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should also be considered that we cannot support more than one column with the same alias, if we intend to shorten the names at the output by realiasing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. We will go forward with that approach and @berkaysynnada will update you guys of the progress.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Marking as draft as we work through feedback |
We have observed that the mismatch problem is not only related to SELECT INTO's. Table creation with CREATE TABLE AS has the same problem while matching the schemas. With my last commits, I have changed how the window expressions are named to be used in window executors. Now, window function columns of the batches have the same name as the plans. .slt and unit tests are edited accordingly. We now have longer plans and column names, but the problem is solved. To make progress, I think applying this PR and opening another issue that shortens the window names would work better. |
Thanks @berkaysynnada I will check this today. You right, the problem is related to create mem table and can be reproduced with
To make it window aliases shorten this is good idea, I can create a followup issue. I was looking into this for last couple of days and there is mess in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm. Thankss @berkaysynnada I will create a followup issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you everyone. 👍
…ow functions (apache#6566) * Schema check of partitions and input plan is removed for newly registered tables. * minor changes * In Select Into queries, aggregate windows are realiased with physical_name() * debugging * display_name() output is simplified for window functions * Windows are displayed in long format * Window names in tests are edited * Create table as test is added --------- Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
…ow functions (apache#6566) * Schema check of partitions and input plan is removed for newly registered tables. * minor changes * In Select Into queries, aggregate windows are realiased with physical_name() * debugging * display_name() output is simplified for window functions * Windows are displayed in long format * Window names in tests are edited * Create table as test is added --------- Co-authored-by: Mustafa Akur <mustafa.akur@synnada.ai>
Which issue does this PR close?
Closes #6492.
Rationale for this change
When writing query result to MemTable using SELECT .. INTO syntax, window aggregate projections without alias give error because of schema mismatch. As an example,
SELECT SUM(c1) OVER(ORDER BY c1) as sum1 INTO new_table FROM annotated_data_infinite
has no problem but
SELECT SUM(c1) OVER(ORDER BY c1) INTO new_table FROM annotated_data_infinite
gives an error:
Plan("Mismatch between schema and batches").
This is because of the schema, which is created from the input LogicalPlan, has fields whose names are the result of display_name() (It writes the whole expression, func + window specs). However, the RecordBatch's fields of partitions are the result of physical_name(). (It writes only the function part of the expr).
What changes are included in this PR?
In
create_memory_table()
function, there is a match arm that handles the case which the table does not exist. In that case, we initialize the MemTable with try_new(), comparing the fields one-to-one. For these not registered and newly created tables, we don't need to check LogicalPlan schema and the schema coming from partitions. By implementing a MemTable::new_not_registered() function, we can directly adopt the schema coming from partitions. In case of empty batches (Create Table statements without values inserted), we can use input plan's schema.Are these changes tested?
Yes, the erroneous example above is tested.
Are there any user-facing changes?