Skip to content

Commit 638dc46

Browse files
haohuaijinalamb
andauthored
fix: deserialization error for FilterExec (predicates with inlist) (#17224)
* reproduce for deserialization inlist * fix: inlist deser error --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 944d8c0 commit 638dc46

File tree

2 files changed

+47
-25
lines changed

2 files changed

+47
-25
lines changed

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -582,43 +582,33 @@ impl protobuf::PhysicalPlanNode {
582582
) -> Result<Arc<dyn ExecutionPlan>> {
583583
let input: Arc<dyn ExecutionPlan> =
584584
into_physical_plan(&filter.input, ctx, runtime, extension_codec)?;
585-
let projection = if !filter.projection.is_empty() {
586-
Some(
587-
filter
588-
.projection
589-
.iter()
590-
.map(|i| *i as usize)
591-
.collect::<Vec<_>>(),
592-
)
593-
} else {
594-
None
595-
};
596-
597-
// Use the projected schema if projection is present, otherwise use the full schema
598-
let predicate_schema = if let Some(ref proj_indices) = projection {
599-
// Create projected schema for parsing the predicate
600-
let projected_fields: Vec<_> = proj_indices
601-
.iter()
602-
.map(|&i| input.schema().field(i).clone())
603-
.collect();
604-
Arc::new(Schema::new(projected_fields))
605-
} else {
606-
input.schema()
607-
};
608585

609586
let predicate = filter
610587
.expr
611588
.as_ref()
612589
.map(|expr| {
613-
parse_physical_expr(expr, ctx, predicate_schema.as_ref(), extension_codec)
590+
parse_physical_expr(expr, ctx, input.schema().as_ref(), extension_codec)
614591
})
615592
.transpose()?
616593
.ok_or_else(|| {
617594
DataFusionError::Internal(
618595
"filter (FilterExecNode) in PhysicalPlanNode is missing.".to_owned(),
619596
)
620597
})?;
598+
621599
let filter_selectivity = filter.default_filter_selectivity.try_into();
600+
let projection = if !filter.projection.is_empty() {
601+
Some(
602+
filter
603+
.projection
604+
.iter()
605+
.map(|i| *i as usize)
606+
.collect::<Vec<_>>(),
607+
)
608+
} else {
609+
None
610+
};
611+
622612
let filter =
623613
FilterExec::try_new(predicate, input)?.with_projection(projection)?;
624614
match filter_selectivity {

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2057,6 +2057,39 @@ async fn test_round_trip_date_part_display() -> Result<()> {
20572057
Ok(())
20582058
}
20592059

2060+
#[tokio::test]
2061+
async fn test_tpch_part_in_list_query_with_real_parquet_data() -> Result<()> {
2062+
use datafusion_common::test_util::datafusion_test_data;
2063+
2064+
let ctx = SessionContext::new();
2065+
2066+
// Register the TPC-H part table using the local test data
2067+
let test_data = datafusion_test_data();
2068+
let table_sql = format!(
2069+
"CREATE EXTERNAL TABLE part STORED AS PARQUET LOCATION '{test_data}/tpch_part_small.parquet'"
2070+
);
2071+
ctx.sql(&table_sql).await.map_err(|e| {
2072+
DataFusionError::External(format!("Failed to create part table: {e}").into())
2073+
})?;
2074+
2075+
// Test the exact problematic query
2076+
let sql =
2077+
"SELECT p_size FROM part WHERE p_size IN (14, 6, 5, 31) and p_partkey > 1000";
2078+
2079+
let logical_plan = ctx.sql(sql).await?.into_unoptimized_plan();
2080+
let optimized_plan = ctx.state().optimize(&logical_plan)?;
2081+
let physical_plan = ctx.state().create_physical_plan(&optimized_plan).await?;
2082+
2083+
// Serialize the physical plan - bug may happen here already but not necessarily manifests
2084+
let codec = DefaultPhysicalExtensionCodec {};
2085+
let proto = PhysicalPlanNode::try_from_physical_plan(physical_plan.clone(), &codec)?;
2086+
2087+
// This will fail with the bug, but should succeed when fixed
2088+
let _deserialized_plan =
2089+
proto.try_into_physical_plan(&ctx, ctx.runtime_env().as_ref(), &codec)?;
2090+
Ok(())
2091+
}
2092+
20602093
#[tokio::test]
20612094
/// Tests that we can serialize an unoptimized "analyze" plan and it will work on the other end
20622095
async fn analyze_roundtrip_unoptimized() -> Result<()> {
@@ -2090,6 +2123,5 @@ async fn analyze_roundtrip_unoptimized() -> Result<()> {
20902123
let physical_planner =
20912124
datafusion::physical_planner::DefaultPhysicalPlanner::default();
20922125
physical_planner.optimize_physical_plan(unoptimized, &session_state, |_, _| {})?;
2093-
20942126
Ok(())
20952127
}

0 commit comments

Comments
 (0)