Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mismatch in MemTable of Select Into when projecting on aggregate window functions #6566

Merged
merged 10 commits into from
Jul 5, 2023
24 changes: 23 additions & 1 deletion datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ pub struct MemTable {
}

impl MemTable {
/// Create a new in-memory table from the provided schema and record batches
/// Create a new in-memory table from the provided schema and record batches.
/// If the provided schema and record batches' schema do not match,
/// the function returns an error.
pub fn try_new(schema: SchemaRef, partitions: Vec<Vec<RecordBatch>>) -> Result<Self> {
if partitions
.iter()
Expand All @@ -73,6 +75,26 @@ impl MemTable {
}
}

/// Create a new in-memory table from the record batches.
/// In case of empty table, the schema is inferred from the input plan.
pub fn new_not_registered(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code effectively ignores the schema argument if any of partitions has a RecordBatch and uses that RecordBatch's schema instead.

I think this is a surprising behavior and might mask errors in the future

schema: SchemaRef,
partitions: Vec<Vec<RecordBatch>>,
) -> Self {
let schema = partitions
.first()
.and_then(|f| f.first())
.map(|first| first.schema())
.unwrap_or(schema);

let batches = partitions
.into_iter()
.map(|e| Arc::new(RwLock::new(e)))
.collect::<Vec<_>>();

Self { schema, batches }
}

/// Create a mem table by reading from another data source
pub async fn load(
t: Arc<dyn TableProvider>,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ impl SessionContext {
let physical = DataFrame::new(self.state(), input);

let batches: Vec<_> = physical.collect_partitioned().await?;
let table = Arc::new(MemTable::try_new(schema, batches)?);
let table = Arc::new(MemTable::new_not_registered(schema, batches));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the core problem here is schema (aka the schema of input) does not actually match the schema of the batches that are produced.

As I think @comphead is suggesting in #6566 (review), this PR seems to be trying to workaround the deeper problem of the window exec producing an output schema (different column names) than the LogicalPlan says.

Have you looked into making the names consistent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, my first suggestion is to use longer version of the name in planner (what is done in the LogicalPlan). However, there need to be many changes in tests, and the BoundedWindowAggExec lines may become too long. If it is not a problem, we can solve the issue like that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using the same names in physical and logical plans is preferable because the rest of the parts of the code expects this and sometimes makes assumptions that it is the case (because it mostly is).

If we don't make the logical and physical plans match up, I predict we will continue to hit a long tail of bugs related to schema mismatches, only when using window functions related to the discrepancy.

If the long display name is a problem (and I can see how it would be) perhaps we can figure out how to make display_name produce something shorter for window functions other than serializing the entire window definition

Here is what postgres does:

postgres=# select first_value(x) over (order by x) from foo;
 first_value
-------------
           1
(1 row)

We probably need to do something more sophisticated as DataFusion needs distinct column names.


self.register_table(&name, table)?;
self.return_empty_dataframe()
Expand Down
16 changes: 15 additions & 1 deletion datafusion/core/tests/sqllogictests/test_files/ddl.slt
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,9 @@ SELECT * FROM new_table;
statement ok
DROP TABLE new_table

statement ok
DROP TABLE my_table;

# create_table_with_schema_as_multiple_values
statement ok
CREATE TABLE test_table(c1 int, c2 float, c3 varchar) AS VALUES(1, 2, 'hello'),(2, 1, 'there'),(3, 0, '!');
Expand All @@ -362,7 +365,18 @@ SELECT * FROM new_table
2 1 there

statement ok
DROP TABLE my_table;
DROP TABLE new_table;

# Select into without alias names of window aggregates
statement ok
SELECT SUM(c1) OVER(ORDER BY c2), c2, c3 INTO new_table FROM test_table

query IRT
SELECT * FROM new_table
----
3 0 !
5 1 there
6 2 hello

statement ok
DROP TABLE new_table;
Expand Down