Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mismatch in MemTable of Select Into when projecting on aggregate window functions #6566

Merged
merged 10 commits into from
Jul 5, 2023
16 changes: 15 additions & 1 deletion datafusion/core/tests/sqllogictests/test_files/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,9 @@ SELECT * FROM new_table;
statement ok
DROP TABLE new_table

statement ok
DROP TABLE my_table;

# create_table_with_schema_as_multiple_values
statement ok
CREATE TABLE test_table(c1 int, c2 float, c3 varchar) AS VALUES(1, 2, 'hello'),(2, 1, 'there'),(3, 0, '!');
Expand All @@ -362,7 +365,18 @@ SELECT * FROM new_table
2 1 there

statement ok
DROP TABLE my_table;
DROP TABLE new_table;

# Select into without alias names of window aggregates
statement ok
SELECT SUM(c1) OVER(ORDER BY c2), c2, c3 INTO new_table FROM test_table

query IRT
SELECT * FROM new_table
----
3 0 !
5 1 there
6 2 hello

statement ok
DROP TABLE new_table;
Expand Down
299 changes: 297 additions & 2 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
// specific language governing permissions and limitations
// under the License.

use std::fmt::Write;

use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use crate::utils::{
check_columns_satisfy_exprs, extract_aliases, rebase_expr, resolve_aliases_to_exprs,
resolve_columns, resolve_positions_to_exprs,
};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::expr::*;
use datafusion_expr::expr_rewriter::{
normalize_col, normalize_col_with_schemas_and_ambiguity_check,
};
Expand All @@ -31,7 +34,8 @@ use datafusion_expr::utils::{
};
use datafusion_expr::Expr::Alias;
use datafusion_expr::{
Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning,
BinaryExpr, Cast, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder,
Partitioning, TryCast,
};

use sqlparser::ast::{Distinct, Expr as SQLExpr, WildcardAdditionalOptions, WindowType};
Expand Down Expand Up @@ -194,7 +198,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.iter()
.map(|expr| rebase_expr(expr, &window_func_exprs, &plan))
.collect::<Result<Vec<Expr>>>()?;

if select.into.is_some() {
for expr in select_exprs_post_aggr.iter_mut() {
if let Expr::Column(_) = expr.clone() {
*expr = expr.clone().alias(physical_name(expr)?);
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry if my past comments have been confusing. Here is what I was trying to say earlier in #6566 (comment):

I ran this command to get some logs (with the extra debug in PR #6626):

RUST_LOG=debug cargo test --test sqllogictests -- ddl 2>&1 | tee /tmp/debug.log

Here is the content of debug.log: debug.log

From the log, here is the LogialPlan that shows the WindowAggr declares it makes a column named SUM(test_table.c1) ORDER BY [test_table.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW (yes that whole thing!)

    Projection: SUM(test_table.c1) ORDER BY [test_table.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, test_table.c2, test_table.c3
      WindowAggr: windowExpr=[[SUM(test_table.c1) ORDER BY [test_table.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
        TableScan: test_table projection=[c1, c2, c3]

Here is the final ExecutionPlan, also showing the same giant column as the declared output name:

[2023-06-10T11:43:29Z DEBUG datafusion::physical_plan::planner] Optimized physical plan:
    ProjectionExec: expr=[SUM(test_table.c1) ORDER BY [test_table.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as SUM(test_table.c1), c2@1 as c2, c3@2 as c3]
      BoundedWindowAggExec: wdw=[SUM(test_table.c1): Ok(Field { name: "SUM(test_table.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Float32(NULL)), end_bound: CurrentRow }], mode=[Sorted]
        SortPreservingMergeExec: [c2@1 ASC NULLS LAST]
          SortExec: expr=[c2@1 ASC NULLS LAST]
            MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]

However, looking at the logs, what the execution plan actually produces a column named SUM(test_table.c1):

[2023-06-10T11:43:29Z DEBUG datafusion::datasource::memory] mem schema does not contain batches schema.

Target_schema: Schema { fields: [
  Field { name: "SUM(test_table.c1) ORDER BY [test_table.c2 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
  Field { name: "c2", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
  Field { name: "c3", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {}
}.


Batches Schema: Schema { fields: [
  Field { name: "SUM(test_table.c1)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
  Field { name: "c2", data_type: Float32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
  Field { name: "c3", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {}
}

Thus, what I was trying to say earlier is that I think the root of the problem is the mismatch between what the plans say the field name of the output is and what the field name that the WindowExec is actually producing.

So I think we should fix this bug by resolving the mismatch. Either:

  1. Update the Logical/Physical plans so the field names of WindowAgg matches what the BoundedWindowAggExec actually produces
  2. OR Update BoundedWindowAggExec to produce the field names declared by the `WindowAggExec

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @alamb @berkaysynnada
Would you mind if I also open a small PR for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind if I also open a small PR for this?

I would be very much appreciative, personally

Copy link
Contributor Author

@berkaysynnada berkaysynnada Jun 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply. I have tried both suggestions @alamb:

  1. We need to modify 3 parts of the code:
    a- In the project() function in select.rs, the final schema will be constructed with a shortened form of the window function.
    fn to_field(&self, input_schema: &DFSchema) -> Result<DFField> {
        match self {
            Expr::Column(c) => Ok(DFField::new(
                c.relation.clone(),
                &c.name,
                self.get_type(input_schema)?,
                self.nullable(input_schema)?,
            )),
            _ => {
                Ok(DFField::new_unqualified(
                    &self.display_name()?,
                    self.get_type(input_schema)?,
                    self.nullable(input_schema)?,
                ))
            }
        }
    }

is expanded with that arm:

          Expr::WindowFunction(WindowFunction { fun, args, .. }) => {
                Ok(DFField::new_unqualified(
                    &vec![create_function_name(&fun.to_string(), false, args)?].join(" "),
                    self.get_type(input_schema)?,
                    self.nullable(input_schema)?,
                ))
            }

b- In the project() function again, qualified wildcard columns are normalized. However, the column name is in the longer form, and the schema of the plan is in the shorter form. Therefore, we also change the expr_as_column_expr() function so that window function expressions are converted to column expressions with the shortened column name format, which can be copied from the schema.
c- PushDownProjection rule again creates new column expressions with display_name() function (which returns the long format) in the window handling arm. These column names also need to be shortened to satisfy subsequent control.

My opinion: Can we directly change the display_name() function for window functions such that only function name and arguments are returned? Thus we don't need to change any of what I mentioned above.

        Expr::WindowFunction(WindowFunction {
            fun, args, window_frame, partition_by, order_by,
        }) => {
            let mut parts: Vec<String> =
                vec![create_function_name(&fun.to_string(), false, args)?];
            if !partition_by.is_empty() {
                parts.push(format!("PARTITION BY {partition_by:?}"));
            }
            if !order_by.is_empty() {
                parts.push(format!("ORDER BY {order_by:?}"));
            }
            parts.push(format!("{window_frame}"));
            Ok(parts.join(" "))
        }

new version:

Expr::WindowFunction(WindowFunction {
            fun, args, ..
        }) => {
            let mut parts: Vec<String> =
                vec![create_function_name(&fun.to_string(), false, args)?];
            Ok(parts.join(" "))
        }
  1. We can only change create_window_expr() such that it creates the window name with display_name() rather than physical_name(), but it makes the plans longer, also lots of test change burden.

I would like to wrap up this PR and any thoughts you have would be really helpful. Can you review the alternatives above when you get a chance? Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@berkaysynnada thanks for checking that.

I was also working on that.
Changing display_name was the one I started with but in this case other scenarios will fail. When window plan created the DFS schema check name uniqueness from display_name not considering aliases. So this query will fail

SELECT
  first_value(c9) OVER (PARTITION BY c2 ORDER BY c9) first_c9,
  first_value(c9) OVER (PARTITION BY c2 ORDER BY c9 DESC) first_c9_desc
FROM aggregate_test_100

I'm still thinking how to overcome that without breaking changes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb, @comphead: What do you think? Should we move forward with this approach?

Yes, that is what I think we should do. If the overly verbose column (at the output) names are a problem, perhaps we can look into updating the planner to automatically add more reasonable aliases

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, we need to get back on column naming convention as currently long names are not user friendly and not useful without aliases in nested queries

Copy link
Contributor Author

@berkaysynnada berkaysynnada Jun 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should also be considered that we cannot support more than one column with the same alias, if we intend to shorten the names at the output by realiasing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. We will go forward with that approach and @berkaysynnada will update you guys of the progress.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related issues: #6543 and #6758

plan
};

Expand Down Expand Up @@ -555,3 +565,288 @@ fn match_window_definitions(
}
Ok(())
}

fn create_function_physical_name(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this functionality need to remain in sync with the creation of physical names?

fun: &str,
distinct: bool,
args: &[Expr],
) -> Result<String> {
let names: Vec<String> = args
.iter()
.map(|e| create_physical_name(e, false))
.collect::<Result<_>>()?;

let distinct_str = match distinct {
true => "DISTINCT ",
false => "",
};
Ok(format!("{}({}{})", fun, distinct_str, names.join(",")))
}

fn physical_name(e: &Expr) -> Result<String> {
create_physical_name(e, true)
}

fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
match e {
Expr::Column(c) => {
if is_first_expr {
Ok(c.name.clone())
} else {
Ok(c.flat_name())
}
}
Expr::Alias(_, name) => Ok(name.clone()),
Expr::ScalarVariable(_, variable_names) => Ok(variable_names.join(".")),
Expr::Literal(value) => Ok(format!("{value:?}")),
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
let left = create_physical_name(left, false)?;
let right = create_physical_name(right, false)?;
Ok(format!("{left} {op} {right}"))
}
Expr::Case(case) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, this appears to the same code as https://github.com/apache/arrow-datafusion/blob/1af846bd8de387ce7a6e61a2008917a7610b9a7b/datafusion/physical-expr/src/expressions/case.rs#L66-L77

If we ever changed the code in phsical-expr and did not change this code, would that cause problems?

let mut name = "CASE ".to_string();
if let Some(e) = &case.expr {
let _ = write!(name, "{e:?} ");
}
for (w, t) in &case.when_then_expr {
let _ = write!(name, "WHEN {w:?} THEN {t:?} ");
}
if let Some(e) = &case.else_expr {
let _ = write!(name, "ELSE {e:?} ");
}
name += "END";
Ok(name)
}
Expr::Cast(Cast { expr, .. }) => {
// CAST does not change the expression name
create_physical_name(expr, false)
}
Expr::TryCast(TryCast { expr, .. }) => {
// CAST does not change the expression name
create_physical_name(expr, false)
}
Expr::Not(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("NOT {expr}"))
}
Expr::Negative(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("(- {expr})"))
}
Expr::IsNull(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NULL"))
}
Expr::IsNotNull(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NOT NULL"))
}
Expr::IsTrue(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS TRUE"))
}
Expr::IsFalse(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS FALSE"))
}
Expr::IsUnknown(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS UNKNOWN"))
}
Expr::IsNotTrue(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NOT TRUE"))
}
Expr::IsNotFalse(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NOT FALSE"))
}
Expr::IsNotUnknown(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NOT UNKNOWN"))
}
Expr::GetIndexedField(GetIndexedField { key, expr }) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr}[{key}]"))
}
Expr::ScalarFunction(func) => {
create_function_physical_name(&func.fun.to_string(), false, &func.args)
}
Expr::ScalarUDF(ScalarUDF { fun, args }) => {
create_function_physical_name(&fun.name, false, args)
}
Expr::WindowFunction(WindowFunction { fun, args, .. }) => {
create_function_physical_name(&fun.to_string(), false, args)
}
Expr::AggregateFunction(AggregateFunction {
fun,
distinct,
args,
..
}) => create_function_physical_name(&fun.to_string(), *distinct, args),
Expr::AggregateUDF(AggregateUDF {
fun,
args,
filter,
order_by,
}) => {
// TODO: Add support for filter and order by in AggregateUDF
if filter.is_some() {
return Err(DataFusionError::Execution(
"aggregate expression with filter is not supported".to_string(),
));
}
if order_by.is_some() {
return Err(DataFusionError::Execution(
"aggregate expression with order_by is not supported".to_string(),
));
}
let mut names = Vec::with_capacity(args.len());
for e in args {
names.push(create_physical_name(e, false)?);
}
Ok(format!("{}({})", fun.name, names.join(",")))
}
Expr::GroupingSet(grouping_set) => match grouping_set {
GroupingSet::Rollup(exprs) => Ok(format!(
"ROLLUP ({})",
exprs
.iter()
.map(|e| create_physical_name(e, false))
.collect::<Result<Vec<_>>>()?
.join(", ")
)),
GroupingSet::Cube(exprs) => Ok(format!(
"CUBE ({})",
exprs
.iter()
.map(|e| create_physical_name(e, false))
.collect::<Result<Vec<_>>>()?
.join(", ")
)),
GroupingSet::GroupingSets(lists_of_exprs) => {
let mut strings = vec![];
for exprs in lists_of_exprs {
let exprs_str = exprs
.iter()
.map(|e| create_physical_name(e, false))
.collect::<Result<Vec<_>>>()?
.join(", ");
strings.push(format!("({exprs_str})"));
}
Ok(format!("GROUPING SETS ({})", strings.join(", ")))
}
},

Expr::InList(InList {
expr,
list,
negated,
}) => {
let expr = create_physical_name(expr, false)?;
let list = list.iter().map(|expr| create_physical_name(expr, false));
if *negated {
Ok(format!("{expr} NOT IN ({list:?})"))
} else {
Ok(format!("{expr} IN ({list:?})"))
}
}
Expr::Exists { .. } => Err(DataFusionError::NotImplemented(
"EXISTS is not yet supported in the physical plan".to_string(),
)),
Expr::InSubquery(_) => Err(DataFusionError::NotImplemented(
"IN subquery is not yet supported in the physical plan".to_string(),
)),
Expr::ScalarSubquery(_) => Err(DataFusionError::NotImplemented(
"Scalar subqueries are not yet supported in the physical plan".to_string(),
)),
Expr::Between(Between {
expr,
negated,
low,
high,
}) => {
let expr = create_physical_name(expr, false)?;
let low = create_physical_name(low, false)?;
let high = create_physical_name(high, false)?;
if *negated {
Ok(format!("{expr} NOT BETWEEN {low} AND {high}"))
} else {
Ok(format!("{expr} BETWEEN {low} AND {high}"))
}
}
Expr::Like(Like {
negated,
expr,
pattern,
escape_char,
}) => {
let expr = create_physical_name(expr, false)?;
let pattern = create_physical_name(pattern, false)?;
let escape = if let Some(char) = escape_char {
format!("CHAR '{char}'")
} else {
"".to_string()
};
if *negated {
Ok(format!("{expr} NOT LIKE {pattern}{escape}"))
} else {
Ok(format!("{expr} LIKE {pattern}{escape}"))
}
}
Expr::ILike(Like {
negated,
expr,
pattern,
escape_char,
}) => {
let expr = create_physical_name(expr, false)?;
let pattern = create_physical_name(pattern, false)?;
let escape = if let Some(char) = escape_char {
format!("CHAR '{char}'")
} else {
"".to_string()
};
if *negated {
Ok(format!("{expr} NOT ILIKE {pattern}{escape}"))
} else {
Ok(format!("{expr} ILIKE {pattern}{escape}"))
}
}
Expr::SimilarTo(Like {
negated,
expr,
pattern,
escape_char,
}) => {
let expr = create_physical_name(expr, false)?;
let pattern = create_physical_name(pattern, false)?;
let escape = if let Some(char) = escape_char {
format!("CHAR '{char}'")
} else {
"".to_string()
};
if *negated {
Ok(format!("{expr} NOT SIMILAR TO {pattern}{escape}"))
} else {
Ok(format!("{expr} SIMILAR TO {pattern}{escape}"))
}
}
Expr::Sort { .. } => Err(DataFusionError::Internal(
"Create physical name does not support sort expression".to_string(),
)),
Expr::Wildcard => Err(DataFusionError::Internal(
"Create physical name does not support wildcard".to_string(),
)),
Expr::QualifiedWildcard { .. } => Err(DataFusionError::Internal(
"Create physical name does not support qualified wildcard".to_string(),
)),
Expr::Placeholder(_) => Err(DataFusionError::Internal(
"Create physical name does not support placeholder".to_string(),
)),
Expr::OuterReferenceColumn(_, _) => Err(DataFusionError::Internal(
"Create physical name does not support OuterReferenceColumn".to_string(),
)),
}
}