Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 79 additions & 5 deletions src/materialized/dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,13 @@ fn pushdown_projection_inexact(plan: LogicalPlan, indices: &HashSet<usize>) -> R
_ => unreachable!(),
};

if new_projection.is_empty() {
return Ok(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: true,
schema: Arc::new(DFSchema::empty()),
}));
}

TableScan::try_new(
scan.table_name,
scan.source,
Expand Down Expand Up @@ -1798,11 +1805,11 @@ mod test {
query_to_analyze: "SELECT column1 AS output FROM t3",
projection: &[],
expected_plan: vec![
"+--------------+-----------------------------+",
"| plan_type | plan |",
"+--------------+-----------------------------+",
"| logical_plan | TableScan: t3 projection=[] |",
"+--------------+-----------------------------+",
"+--------------+-----------------------+",
"| plan_type | plan |",
"+--------------+-----------------------+",
"| logical_plan | EmptyRelation: rows=1 |",
"+--------------+-----------------------+",
],
expected_output: vec![
"++",
Expand Down Expand Up @@ -2051,6 +2058,73 @@ mod test {
Ok(())
}

#[tokio::test]
async fn test_cross_join_unrelated_files() -> Result<()> {
let context = setup().await?;

// Test case: Cross join where only columns from left table (t1) are selected
// The cross join with t3 affects cardinality but we don't select any t3 columns
// Expected: Only files from t1 should be in dependencies, not from t3
// BUG: Currently t3 files are incorrectly included in dependencies
let query = "SELECT t1.column1, t1.column2 FROM t1 CROSS JOIN t3";

let plan = context.sql(query).await?.into_optimized_plan()?;

println!("Original plan:\n{}", plan.display_indent());

// We're partitioning on column1 which only comes from t1
let partition_col_indices: HashSet<usize> = [0].into_iter().collect(); // column1 is at index 0

let analyzed = pushdown_projection_inexact(plan.clone(), &partition_col_indices)?;
println!("After pushdown:\n{}", analyzed.display_indent());

// Register materialized view
context.register_table(
"mv_cross_join",
Arc::new(MockMaterializedView {
table_path: ListingTableUrl::parse("s3://mv_cross_join/").unwrap(),
partition_columns: vec!["column1".to_string()],
static_partition_columns: None,
query: plan,
file_ext: ".parquet",
}),
)?;

// Add file metadata for the MV
context.sql(
"INSERT INTO file_metadata VALUES
('datafusion', 'test', 'mv_cross_join', 's3://mv_cross_join/column1=2021/data.01.parquet', '2023-07-12T16:00:00Z', 0),
('datafusion', 'test', 'mv_cross_join', 's3://mv_cross_join/column1=2022/data.01.parquet', '2023-07-12T16:00:00Z', 0),
('datafusion', 'test', 'mv_cross_join', 's3://mv_cross_join/column1=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0)"
).await?.collect().await?;

// Get dependencies
let df = context
.sql("SELECT * FROM mv_dependencies('mv_cross_join', 'v2')")
.await?;
let batches = df.collect().await?;

// Print the actual dependencies for debugging
println!("Actual dependencies:");
println!("{}", pretty_format_batches(&batches)?);

// Expected: Only t1 files should be in dependencies, NOT t3 files
// This test currently FAILS because t3 files are incorrectly included
let expected = [
"+----------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+",
"| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |",
"+----------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+",
"| s3://mv_cross_join/column1=2021/ | datafusion | test | t1 | s3://t1/column1=2021/data.01.parquet | 2023-07-11T16:29:26 |",
"| s3://mv_cross_join/column1=2022/ | datafusion | test | t1 | s3://t1/column1=2022/data.01.parquet | 2023-07-11T16:45:22 |",
"| s3://mv_cross_join/column1=2023/ | datafusion | test | t1 | s3://t1/column1=2023/data.01.parquet | 2023-07-11T16:45:44 |",
"+----------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+",
];

assert_batches_sorted_eq!(expected, &batches);

Ok(())
}

#[test]
fn test_pushdown_unnest_guard_partition_date_only() -> Result<()> {
// This test simulates a simplified MV scenario:
Expand Down