Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/physical-expr-common/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub type PhysicalExprRef = Arc<dyn PhysicalExpr>;
/// [`Expr`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html
/// [`create_physical_expr`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/fn.create_physical_expr.html
/// [`Column`]: https://docs.rs/datafusion/latest/datafusion/physical_expr/expressions/struct.Column.html
pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash {
pub trait PhysicalExpr: Any + Send + Sync + Display + Debug + DynEq + DynHash {
/// Returns the physical expression as [`Any`] so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
Expand Down
99 changes: 77 additions & 22 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,6 @@ impl JoinLeftData {
/// Note this structure includes a [`OnceAsync`] that is used to coordinate the
/// loading of the left side with the processing in each output stream.
/// Therefore it can not be [`Clone`]
#[derive(Debug)]
pub struct HashJoinExec {
/// left (build) side which gets hashed
pub left: Arc<dyn ExecutionPlan>,
Expand All @@ -350,7 +349,7 @@ pub struct HashJoinExec {
///
/// Each output stream waits on the `OnceAsync` to signal the completion of
/// the hash table creation.
left_fut: OnceAsync<JoinLeftData>,
left_fut: Arc<OnceAsync<JoinLeftData>>,
/// Shared the `RandomState` for the hashing algorithm
random_state: RandomState,
/// Partitioning mode to use
Expand All @@ -366,7 +365,29 @@ pub struct HashJoinExec {
/// Cache holding plan properties like equivalences, output partitioning etc.
cache: PlanProperties,
/// Dynamic filter for pushing down to the probe side
dynamic_filter: Arc<DynamicFilterPhysicalExpr>,
dynamic_filter: Option<Arc<DynamicFilterPhysicalExpr>>,
}

impl fmt::Debug for HashJoinExec {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("HashJoinExec")
.field("left", &self.left)
.field("right", &self.right)
.field("on", &self.on)
.field("filter", &self.filter)
.field("join_type", &self.join_type)
.field("join_schema", &self.join_schema)
.field("left_fut", &self.left_fut)
.field("random_state", &self.random_state)
.field("mode", &self.mode)
.field("metrics", &self.metrics)
.field("projection", &self.projection)
.field("column_indices", &self.column_indices)
.field("null_equality", &self.null_equality)
.field("cache", &self.cache)
// Explicitly exclude dynamic_filter to avoid runtime state differences in tests
.finish()
}
}

impl HashJoinExec {
Expand Down Expand Up @@ -413,8 +434,6 @@ impl HashJoinExec {
projection.as_ref(),
)?;

let dynamic_filter = Self::create_dynamic_filter(&on);

Ok(HashJoinExec {
left,
right,
Expand All @@ -430,12 +449,13 @@ impl HashJoinExec {
column_indices,
null_equality,
cache,
dynamic_filter,
dynamic_filter: None,
})
}

fn create_dynamic_filter(on: &JoinOn) -> Arc<DynamicFilterPhysicalExpr> {
// Extract the right-side keys from the `on` clauses
// Extract the right-side keys (probe side keys) from the `on` clauses
// Dynamic filter will be created from build side values (left side) and applied to probe side (right side)
let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect();
// Initialize with a placeholder expression (true) that will be updated when the hash table is built
Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true)))
Expand Down Expand Up @@ -686,11 +706,14 @@ impl DisplayAs for HashJoinExec {
.map(|(c1, c2)| format!("({c1}, {c2})"))
.collect::<Vec<String>>()
.join(", ");
let dynamic_filter_display = match self.dynamic_filter.current() {
Ok(current) if current != lit(true) => {
format!(", filter=[{current}]")
}
_ => "".to_string(),
let dynamic_filter_display = match self.dynamic_filter.as_ref() {
Some(dynamic_filter) => match dynamic_filter.current() {
Ok(current) if current != lit(true) => {
format!(", filter=[{current}]")
}
_ => "".to_string(),
},
None => "".to_string(),
};
write!(
f,
Expand Down Expand Up @@ -794,7 +817,7 @@ impl ExecutionPlan for HashJoinExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut new_join = HashJoinExec::try_new(
let new_join = HashJoinExec::try_new(
Arc::clone(&children[0]),
Arc::clone(&children[1]),
self.on.clone(),
Expand All @@ -804,8 +827,6 @@ impl ExecutionPlan for HashJoinExec {
self.mode,
self.null_equality,
)?;
// Preserve the dynamic filter if it exists
new_join.dynamic_filter = Arc::clone(&self.dynamic_filter);
Ok(Arc::new(new_join))
}

Expand All @@ -818,15 +839,15 @@ impl ExecutionPlan for HashJoinExec {
filter: self.filter.clone(),
join_type: self.join_type,
join_schema: Arc::clone(&self.join_schema),
left_fut: OnceAsync::default(),
left_fut: Arc::new(OnceAsync::default()),
random_state: self.random_state.clone(),
mode: self.mode,
metrics: ExecutionPlanMetricsSet::new(),
projection: self.projection.clone(),
column_indices: self.column_indices.clone(),
null_equality: self.null_equality,
cache: self.cache.clone(),
dynamic_filter: Self::create_dynamic_filter(&self.on),
dynamic_filter: None,
}))
}

Expand Down Expand Up @@ -886,7 +907,8 @@ impl ExecutionPlan for HashJoinExec {
need_produce_result_in_final(self.join_type),
self.right().output_partitioning().partition_count(),
enable_dynamic_filter_pushdown
.then_some(Arc::clone(&self.dynamic_filter)),
.then_some(self.dynamic_filter.clone())
.flatten(),
on_right.clone(),
))
})?,
Expand All @@ -906,7 +928,8 @@ impl ExecutionPlan for HashJoinExec {
need_produce_result_in_final(self.join_type),
1,
enable_dynamic_filter_pushdown
.then_some(Arc::clone(&self.dynamic_filter)),
.then_some(self.dynamic_filter.clone())
.flatten(),
on_right.clone(),
))
}
Expand Down Expand Up @@ -1050,8 +1073,7 @@ impl ExecutionPlan for HashJoinExec {
&& config.optimizer.enable_dynamic_filter_pushdown
{
// Add actual dynamic filter to right side (probe side)
let dynamic_filter =
Arc::clone(&self.dynamic_filter) as Arc<dyn PhysicalExpr>;
let dynamic_filter = Self::create_dynamic_filter(&self.on);
right_child = right_child.with_self_filter(dynamic_filter);
}

Expand All @@ -1078,7 +1100,40 @@ impl ExecutionPlan for HashJoinExec {
child_pushdown_result,
));
}
Ok(FilterPushdownPropagation::if_any(child_pushdown_result))

let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone());
assert_eq!(child_pushdown_result.self_filters.len(), 2); // Should always be 2, we have 2 children
let right_child_self_filters = &child_pushdown_result.self_filters[1]; // We only push down filters to the right child
// We expect 0 or 1 self filters
if let Some(filter) = right_child_self_filters.first() {
// Note that we don't check PushdDownPredicate::discrimnant because even if nothing said
// "yes, I can fully evaluate this filter" things might still use it for statistics -> it's worth updating
let predicate = Arc::clone(&filter.predicate);
if let Ok(dynamic_filter) =
Arc::downcast::<DynamicFilterPhysicalExpr>(predicate)
{
// We successfully pushed down our self filter - we need to make a new node with the dynamic filter
let new_node = Arc::new(HashJoinExec {
left: Arc::clone(&self.left),
right: Arc::clone(&self.right),
on: self.on.clone(),
filter: self.filter.clone(),
join_type: self.join_type,
join_schema: Arc::clone(&self.join_schema),
left_fut: Arc::clone(&self.left_fut),
random_state: self.random_state.clone(),
mode: self.mode,
metrics: ExecutionPlanMetricsSet::new(),
projection: self.projection.clone(),
column_indices: self.column_indices.clone(),
null_equality: self.null_equality,
cache: self.cache.clone(),
dynamic_filter: Some(dynamic_filter),
});
result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>);
}
}
Ok(result)
}
}

Expand Down
32 changes: 32 additions & 0 deletions datafusion/sqllogictest/test_files/push_down_filter.slt
Original file line number Diff line number Diff line change
Expand Up @@ -286,5 +286,37 @@ explain select a from t where CAST(a AS string) = '0123';
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/t.parquet]]}, projection=[a], file_type=parquet, predicate=CAST(a@0 AS Utf8View) = 0123


# Test dynamic filter pushdown with swapped join inputs (issue #17196)
# Create tables with different sizes to force join input swapping
statement ok
copy (select i as k from generate_series(1, 100) t(i)) to 'test_files/scratch/push_down_filter/small_table.parquet';

statement ok
copy (select i as k, i as v from generate_series(1, 1000) t(i)) to 'test_files/scratch/push_down_filter/large_table.parquet';

statement ok
create external table small_table stored as parquet location 'test_files/scratch/push_down_filter/small_table.parquet';

statement ok
create external table large_table stored as parquet location 'test_files/scratch/push_down_filter/large_table.parquet';

# Test that dynamic filter is applied to the correct table after join input swapping
# The small_table should be the build side, large_table should be the probe side with dynamic filter
query TT
explain select * from small_table join large_table on small_table.k = large_table.k where large_table.v >= 50;
----
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)]
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/small_table.parquet]]}, projection=[k], file_type=parquet
04)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilterPhysicalExpr [ true ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, required_guarantees=[]
Comment on lines +309 to +313
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the DynamicFilterPhysicalExpr be applied to the left table here, i.e., small_table?

Copy link
Contributor Author

@adriangb adriangb Aug 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that at runtime (i.e. when ExecutionPlan::execute is called and work actually begins) the left side is always the build side and the right side is always the probe side. During optimizer passes which one is left and right may be swapped / re-arranged but all of the dynamic filter stuff happens after this so we can always push the filters into the right side. At least for inner joins, other join types may be more complex and I haven't even begun to wrap my head around those cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tree looks like this:

copy (select i as k from generate_series(1, 100) t(i)) to 'test_files/scratch/push_down_filter/small_table.parquet';
copy (select i as k, i as v from generate_series(1, 1000) t(i)) to 'test_files/scratch/push_down_filter/large_table.parquet';
create external table small_table stored as parquet location 'test_files/scratch/push_down_filter/small_table.parquet';
create external table large_table stored as parquet location 'test_files/scratch/push_down_filter/large_table.parquet';
explain select * from small_table join large_table on small_table.k = large_table.k where large_table.v >= 50;
+---------------+------------------------------------------------------------+
| plan_type     | plan                                                       |
+---------------+------------------------------------------------------------+
| physical_plan | ┌───────────────────────────┐                              |
|               | │    CoalesceBatchesExec    │                              |
|               | │    --------------------   │                              |
|               | │     target_batch_size:    │                              |
|               | │            8192           │                              |
|               | └─────────────┬─────────────┘                              |
|               | ┌─────────────┴─────────────┐                              |
|               | │        HashJoinExec       │                              |
|               | │    --------------------   ├──────────────┐               |
|               | │        on: (k = k)        │              │               |
|               | └─────────────┬─────────────┘              │               |
|               | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
|               | │       DataSourceExec      ││    CoalesceBatchesExec    │ |
|               | │    --------------------   ││    --------------------   │ |
|               | │          files: 1         ││     target_batch_size:    │ |
|               | │      format: parquet      ││            8192           │ |
|               | └───────────────────────────┘└─────────────┬─────────────┘ |
|               |                              ┌─────────────┴─────────────┐ |
|               |                              │         FilterExec        │ |
|               |                              │    --------------------   │ |
|               |                              │     predicate: v >= 50    │ |
|               |                              └─────────────┬─────────────┘ |
|               |                              ┌─────────────┴─────────────┐ |
|               |                              │      RepartitionExec      │ |
|               |                              │    --------------------   │ |
|               |                              │ partition_count(in->out): │ |
|               |                              │          1 -> 12          │ |
|               |                              │                           │ |
|               |                              │    partitioning_scheme:   │ |
|               |                              │    RoundRobinBatch(12)    │ |
|               |                              └─────────────┬─────────────┘ |
|               |                              ┌─────────────┴─────────────┐ |
|               |                              │       DataSourceExec      │ |
|               |                              │    --------------------   │ |
|               |                              │          files: 1         │ |
|               |                              │      format: parquet      │ |
|               |                              │                           │ |
|               |                              │         predicate:        │ |
|               |                              │      v >= 50 AND true     │ |
|               |                              └───────────────────────────┘ |
|               |                                                            |
+---------------+------------------------------------------------------------+

This makes sense: you always want the small table to be the build side and the large table to be the probe side.

If I change the query to:

explain select * from large_table join small_table on small_table.k = large_table.k where large_table.v >= 50;

Then we'll make a different plan but we swap the join around so that the large table continues to be the probe side:

+---------------+------------------------------------------------------------+
| plan_type     | plan                                                       |
+---------------+------------------------------------------------------------+
| physical_plan | ┌───────────────────────────┐                              |
|               | │       ProjectionExec      │                              |
|               | │    --------------------   │                              |
|               | │            k: k           │                              |
|               | │            v: v           │                              |
|               | └─────────────┬─────────────┘                              |
|               | ┌─────────────┴─────────────┐                              |
|               | │    CoalesceBatchesExec    │                              |
|               | │    --------------------   │                              |
|               | │     target_batch_size:    │                              |
|               | │            8192           │                              |
|               | └─────────────┬─────────────┘                              |
|               | ┌─────────────┴─────────────┐                              |
|               | │        HashJoinExec       │                              |
|               | │    --------------------   ├──────────────┐               |
|               | │        on: (k = k)        │              │               |
|               | └─────────────┬─────────────┘              │               |
|               | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
|               | │       DataSourceExec      ││    CoalesceBatchesExec    │ |
|               | │    --------------------   ││    --------------------   │ |
|               | │          files: 1         ││     target_batch_size:    │ |
|               | │      format: parquet      ││            8192           │ |
|               | └───────────────────────────┘└─────────────┬─────────────┘ |
|               |                              ┌─────────────┴─────────────┐ |
|               |                              │         FilterExec        │ |
|               |                              │    --------------------   │ |
|               |                              │     predicate: v >= 50    │ |
|               |                              └─────────────┬─────────────┘ |
|               |                              ┌─────────────┴─────────────┐ |
|               |                              │      RepartitionExec      │ |
|               |                              │    --------------------   │ |
|               |                              │ partition_count(in->out): │ |
|               |                              │          1 -> 12          │ |
|               |                              │                           │ |
|               |                              │    partitioning_scheme:   │ |
|               |                              │    RoundRobinBatch(12)    │ |
|               |                              └─────────────┬─────────────┘ |
|               |                              ┌─────────────┴─────────────┐ |
|               |                              │       DataSourceExec      │ |
|               |                              │    --------------------   │ |
|               |                              │          files: 1         │ |
|               |                              │      format: parquet      │ |
|               |                              │                           │ |
|               |                              │         predicate:        │ |
|               |                              │      v >= 50 AND true     │ |
|               |                              └───────────────────────────┘ |
|               |                                                            |
+---------------+------------------------------------------------------------+
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | ProjectionExec: expr=[k@1 as k, v@2 as v, k@0 as k], metrics=[output_rows=51, elapsed_compute=3.428µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
|                   |   CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=51, elapsed_compute=4.584µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|                   |     HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)], filter=[k@0 >= 1 AND k@0 <= 100], metrics=[output_rows=51, elapsed_compute=1.336377ms, build_input_batches=1, build_input_rows=100, input_batches=1, input_rows=951, output_batches=1, build_mem_used=3032, build_time=983.25µs, join_time=347.668µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
|                   |       DataSourceExec: file_groups={1 group: [[Users/adrian/GitHub/datafusion-clone/test_files/scratch/push_down_filter/small_table.parquet]]}, projection=[k], file_type=parquet, metrics=[output_rows=100, elapsed_compute=1ns, batches_splitted=0, bytes_scanned=285, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=0, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=2ns, metadata_load_time=41.043µs, page_index_eval_time=2ns, row_pushdown_eval_time=2ns, statistics_eval_time=2ns, time_elapsed_opening=102.5µs, time_elapsed_processing=742.668µs, time_elapsed_scanning_total=731.75µs, time_elapsed_scanning_until_data=702.584µs] |
|                   |       CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=951, elapsed_compute=22.585µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
|                   |         FilterExec: v@1 >= 50, metrics=[output_rows=951, elapsed_compute=152.886µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|                   |           RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1, metrics=[fetch_time=2.19975ms, repartition_time=1ns, send_time=5.47µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|                   |             DataSourceExec: file_groups={1 group: [[Users/adrian/GitHub/datafusion-clone/test_files/scratch/push_down_filter/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50 AND DynamicFilterPhysicalExpr [ k@0 >= 1 AND k@0 <= 100 ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50 AND k_null_count@4 != row_count@2 AND k_max@3 >= 1 AND k_null_count@4 != row_count@2 AND k_min@5 <= 100, required_guarantees=[]                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
|                   | , metrics=[output_rows=1000, elapsed_compute=1ns, batches_splitted=0, bytes_scanned=5888, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=1000, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=226.251µs, metadata_load_time=204.001µs, page_index_eval_time=143.126µs, row_pushdown_eval_time=2ns, statistics_eval_time=109.96µs, time_elapsed_opening=1.627167ms, time_elapsed_processing=2.125167ms, time_elapsed_scanning_total=581.5µs, time_elapsed_scanning_until_data=546.208µs]                                                                                                                                                       |
|                   |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataSourceExec small_table.parquet
DataSourceExec large_table.parquet, predicate=v@1 >= 50 AND DynamicFilterPhysicalExpr [ true ]

But in this case large_table is filtered by both v >= 50 and the dynamic filter. Shouldn't the dynamic filter be applied to small_table instead?

If where v = 50 is executed instead, the dynamic filter now appears in the small_table table:

DataSourceExec large_table.parquet, predicate=v@1 = 50
DataSourceExec small_table.parquet, predicate=DynamicFilterPhysicalExpr [ k@0 >= 50 AND k@0 <= 50 ]

Copy link
Contributor Author

@adriangb adriangb Aug 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dynamic filter is always applied to the probe side. In those examples above it seems the join side changes, probably because v = 50 is highly selective so the large table effectively becomes ~ 1 row and thus can be used for the build side. You can see that even without dynamic filters changing the query like that flips the build/probe sides:

copy (select i as k from generate_series(1, 100) t(i)) to 'test_files/scratch/push_down_filter/small_table.parquet';
copy (select i as k, i as v from generate_series(1, 1000) t(i)) to 'test_files/scratch/push_down_filter/large_table.parquet';
set datafusion.optimizer.enable_dynamic_filter_pushdown = false;
create external table small_table stored as parquet location 'test_files/scratch/push_down_filter/small_table.parquet';
create external table large_table stored as parquet location 'test_files/scratch/push_down_filter/large_table.parquet';
explain analyze select * from small_table join large_table on small_table.k = large_table.k where large_table.v >= 50;
explain analyze select * from small_table join large_table on small_table.k = large_table.k where large_table.v = 50;
DataFusion CLI v49.0.1
+-------+
| count |
+-------+
| 100   |
+-------+
1 row(s) fetched. 
Elapsed 0.027 seconds.

+-------+
| count |
+-------+
| 1000  |
+-------+
1 row(s) fetched. 
Elapsed 0.006 seconds.

0 row(s) fetched. 
Elapsed 0.001 seconds.

0 row(s) fetched. 
Elapsed 0.002 seconds.

0 row(s) fetched. 
Elapsed 0.002 seconds.

+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=51, elapsed_compute=5.546µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
|                   |   HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)], metrics=[output_rows=51, elapsed_compute=1.976173ms, build_input_batches=1, build_input_rows=100, input_batches=1, input_rows=951, output_batches=1, build_mem_used=3032, build_time=1.452836ms, join_time=520.127µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
|                   |     DataSourceExec: file_groups={1 group: [[Users/adrian/GitHub/datafusion-clone/test_files/scratch/push_down_filter/small_table.parquet]]}, projection=[k], file_type=parquet, metrics=[output_rows=100, elapsed_compute=1ns, batches_splitted=0, bytes_scanned=285, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=0, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=2ns, metadata_load_time=56.793µs, page_index_eval_time=2ns, row_pushdown_eval_time=2ns, statistics_eval_time=2ns, time_elapsed_opening=136.292µs, time_elapsed_processing=1.070917ms, time_elapsed_scanning_total=978.833µs, time_elapsed_scanning_until_data=940.417µs] |
|                   |     CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=951, elapsed_compute=24.711µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|                   |       FilterExec: v@1 >= 50, metrics=[output_rows=951, elapsed_compute=263.386µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
|                   |         RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1, metrics=[fetch_time=2.629708ms, repartition_time=1ns, send_time=8.553µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|                   |           DataSourceExec: file_groups={1 group: [[Users/adrian/GitHub/datafusion-clone/test_files/scratch/push_down_filter/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 50, pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 50, required_guarantees=[]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|                   | , metrics=[output_rows=1000, elapsed_compute=1ns, batches_splitted=0, bytes_scanned=5888, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=1000, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=292.459µs, metadata_load_time=146.834µs, page_index_eval_time=123.543µs, row_pushdown_eval_time=2ns, statistics_eval_time=135.417µs, time_elapsed_opening=1.7685ms, time_elapsed_processing=2.520792ms, time_elapsed_scanning_total=854.999µs, time_elapsed_scanning_until_data=810.208µs]                                                                                                                                                        |
|                   |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched. 
Elapsed 0.013 seconds.

+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | ProjectionExec: expr=[k@2 as k, k@0 as k, v@1 as v], metrics=[output_rows=1, elapsed_compute=3.75µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|                   |   CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=6.459µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|                   |     HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(k@0, k@0)], metrics=[output_rows=1, elapsed_compute=194.128µs, build_input_batches=1, build_input_rows=1, input_batches=1, input_rows=100, output_batches=1, build_mem_used=89, build_time=95.668µs, join_time=54.624µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|                   |       CoalescePartitionsExec, metrics=[output_rows=1, elapsed_compute=32.583µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|                   |         CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=14.793µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
|                   |           FilterExec: v@1 = 50, metrics=[output_rows=1, elapsed_compute=111.469µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|                   |             RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1, metrics=[fetch_time=1.516833ms, repartition_time=1ns, send_time=5.594µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
|                   |               DataSourceExec: file_groups={1 group: [[Users/adrian/GitHub/datafusion-clone/test_files/scratch/push_down_filter/large_table.parquet]]}, projection=[k, v], file_type=parquet, predicate=v@1 = 50, pruning_predicate=v_null_count@2 != row_count@3 AND v_min@0 <= 50 AND 50 <= v_max@1, required_guarantees=[v in (50)]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|                   | , metrics=[output_rows=1000, elapsed_compute=1ns, batches_splitted=0, bytes_scanned=5888, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=1000, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=146.792µs, metadata_load_time=189.126µs, page_index_eval_time=66.084µs, row_pushdown_eval_time=2ns, statistics_eval_time=67.584µs, time_elapsed_opening=850.75µs, time_elapsed_processing=1.408834ms, time_elapsed_scanning_total=641.791µs, time_elapsed_scanning_until_data=616.5µs]                                                                                                                                                            |
|                   |       DataSourceExec: file_groups={1 group: [[Users/adrian/GitHub/datafusion-clone/test_files/scratch/push_down_filter/small_table.parquet]]}, projection=[k], file_type=parquet, metrics=[output_rows=100, elapsed_compute=1ns, batches_splitted=0, bytes_scanned=285, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=0, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=2ns, metadata_load_time=45.376µs, page_index_eval_time=2ns, row_pushdown_eval_time=2ns, statistics_eval_time=2ns, time_elapsed_opening=92.333µs, time_elapsed_processing=260.126µs, time_elapsed_scanning_total=290.167µs, time_elapsed_scanning_until_data=214.041µs] |
|                   |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row(s) fetched. 
Elapsed 0.008 seconds.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so the filter is always applied from the build side to the probe side, independently of the query. Maybe I misunderstood but I though that was the original issue (#17196), that the filter was always applied to the same side, even when it made sense to do the opposite.

For example:

copy (select i as k from generate_series(1, 1000000) t(i)) to 't1.parquet';
copy (select i as k, i as v from generate_series(1, 10000000) t(i)) to 't2.parquet';
create external table t1 stored as parquet location 't1.parquet';
create external table t2 stored as parquet location 't2.parquet';

explain analyze select *
from t1
join t2 on t1.k = t2.k
where t2.v >= 1000000;

+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=71.205µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
|                   |   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(k@0, k@0)], filter=[k@0 >= 14 AND k@0 <= 999997], metrics=[output_rows=1, elapsed_compute=740.259412ms, build_input_batches=120, build_input_rows=1000000, input_batches=12, input_rows=11713, output_batches=12, build_mem_used=34742816, build_time=703.6063ms, join_time=7.0188ms]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
|                   |     CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1000000, elapsed_compute=15.225303ms]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
|                   |       RepartitionExec: partitioning=Hash([k@0], 12), input_partitions=12, metrics=[fetch_time=1.6347235s, repartition_time=105.8625ms, send_time=8.7999ms]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|                   |         RepartitionExec: partitioning=RoundRobinBatch(12), input_partitions=1, metrics=[fetch_time=135.86ms, repartition_time=1ns, send_time=1.4348ms]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
|                   |           DataSourceExec: file_groups={1 group: [[t1.parquet]]}, projection=[k], file_type=parquet, metrics=[output_rows=1000000, elapsed_compute=1ns, batches_splitted=0, bytes_scanned=1310405, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=0, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=2ns, metadata_load_time=120.801µs, page_index_eval_time=2ns, row_pushdown_eval_time=2ns, statistics_eval_time=2ns, time_elapsed_opening=232.8µs, time_elapsed_processing=134.8576ms, time_elapsed_scanning_total=145.4359ms, time_elapsed_scanning_until_data=17.3163ms]                                            |
|                   |     CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=11713, elapsed_compute=244.4µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
|                   |       RepartitionExec: partitioning=Hash([k@0], 12), input_partitions=12, metrics=[fetch_time=55.1884ms, repartition_time=1.360111ms, send_time=123.432µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
|                   |         CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=11713, elapsed_compute=107.3µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
|                   |           FilterExec: v@1 >= 1000000, metrics=[output_rows=11713, elapsed_compute=681.411µs]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|                   |             DataSourceExec: file_groups={12 groups: [[t2.parquet:0..2133322], [t2.parquet:2133322..4266644], [t2.parquet:4266644..6399966], [t2.parquet:6399966..8533288], [t2.parquet:8533288..10666610], ...]}, projection=[k, v], file_type=parquet, predicate=v@1 >= 1000000 AND DynamicFilterPhysicalExpr [ k@0 >= 14 AND k@0 <= 999997 ], pruning_predicate=v_null_count@1 != row_count@2 AND v_max@0 >= 1000000 AND k_null_count@4 != row_count@2 AND k_max@3 >= 14 AND k_null_count@4 != row_count@2 AND k_min@5 <= 999997, required_guarantees=[]                                                                                                                                                                                                                                                                                                                                                                         |
|                   | , metrics=[output_rows=20480, elapsed_compute=12ns, batches_splitted=0, bytes_scanned=379500, file_open_errors=0, file_scan_errors=0, files_ranges_pruned_statistics=0, num_predicate_creation_errors=0, page_index_rows_matched=20480, page_index_rows_pruned=1028096, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=9, bloom_filter_eval_time=180.612µs, metadata_load_time=7.630712ms, page_index_eval_time=317.512µs, row_pushdown_eval_time=24ns, statistics_eval_time=1.603512ms, time_elapsed_opening=16.959ms, time_elapsed_processing=53.3003ms, time_elapsed_scanning_total=39.3134ms, time_elapsed_scanning_until_data=36.7037ms]                                                                                                                  |
|                   |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

In this example t2 is filtered by v and by a dynamic filter on k, while in theory it would be faster to apply a dynamic filter from t2 to t1 (maybe we would need some heuristic to determine which would be the best approach?).


statement ok
drop table small_table;

statement ok
drop table large_table;

statement ok
drop table t;