|
20 | 20 | use std::fmt; |
21 | 21 | use std::mem::size_of; |
22 | 22 | use std::sync::atomic::{AtomicUsize, Ordering}; |
23 | | -use std::sync::Arc; |
| 23 | +use std::sync::{Arc, OnceLock}; |
24 | 24 | use std::task::Poll; |
25 | 25 | use std::{any::Any, vec}; |
26 | 26 |
|
@@ -517,7 +517,8 @@ pub struct HashJoinExec { |
517 | 517 | /// Dynamic filter for pushing down to the probe side |
518 | 518 | dynamic_filter: Arc<DynamicFilterPhysicalExpr>, |
519 | 519 | /// Shared bounds accumulator for coordinating dynamic filter updates across partitions |
520 | | - bounds_accumulator: Arc<SharedBoundsAccumulator>, |
| 520 | + /// Lazily initialized at execution time to use actual runtime partition counts |
| 521 | + bounds_accumulator: Arc<OnceLock<Arc<SharedBoundsAccumulator>>>, |
521 | 522 | } |
522 | 523 |
|
523 | 524 | impl HashJoinExec { |
@@ -566,12 +567,7 @@ impl HashJoinExec { |
566 | 567 |
|
567 | 568 | let dynamic_filter = Self::create_dynamic_filter(&on); |
568 | 569 |
|
569 | | - let bounds_accumulator = |
570 | | - Arc::new(SharedBoundsAccumulator::new_from_partition_mode( |
571 | | - partition_mode, |
572 | | - left.as_ref(), |
573 | | - right.as_ref(), |
574 | | - )); |
| 570 | + let bounds_accumulator = Arc::new(OnceLock::new()); |
575 | 571 |
|
576 | 572 | Ok(HashJoinExec { |
577 | 573 | left, |
@@ -953,14 +949,7 @@ impl ExecutionPlan for HashJoinExec { |
953 | 949 | /// |
954 | 950 | /// This method is called during query optimization when the optimizer creates new |
955 | 951 | /// plan nodes. Importantly, it creates a fresh bounds_accumulator via `try_new` |
956 | | - /// rather than cloning the existing one, because: |
957 | | - /// |
958 | | - /// 1. The new child plans may have different partition counts, requiring a new |
959 | | - /// bounds_accumulator with the correct total_partitions count |
960 | | - /// 2. The accumulator contains execution state (completed_partitions, bounds) |
961 | | - /// that should be reset for the new execution context |
962 | | - /// 3. The dynamic_filter is preserved separately to maintain filter state |
963 | | - /// across plan transformations |
| 952 | + /// rather than cloning the existing one because partitioning may have changed. |
964 | 953 | fn with_new_children( |
965 | 954 | self: Arc<Self>, |
966 | 955 | children: Vec<Arc<dyn ExecutionPlan>>, |
@@ -998,13 +987,7 @@ impl ExecutionPlan for HashJoinExec { |
998 | 987 | null_equality: self.null_equality, |
999 | 988 | cache: self.cache.clone(), |
1000 | 989 | dynamic_filter: Self::create_dynamic_filter(&self.on), |
1001 | | - bounds_accumulator: Arc::new( |
1002 | | - SharedBoundsAccumulator::new_from_partition_mode( |
1003 | | - self.mode, |
1004 | | - self.left.as_ref(), |
1005 | | - self.right.as_ref(), |
1006 | | - ), |
1007 | | - ), |
| 990 | + bounds_accumulator: Arc::new(OnceLock::new()), |
1008 | 991 | })) |
1009 | 992 | } |
1010 | 993 |
|
@@ -1094,6 +1077,15 @@ impl ExecutionPlan for HashJoinExec { |
1094 | 1077 |
|
1095 | 1078 | let batch_size = context.session_config().batch_size(); |
1096 | 1079 |
|
| 1080 | + // Initialize bounds_accumulator lazily with runtime partition counts |
| 1081 | + let bounds_accumulator = Arc::clone(self.bounds_accumulator.get_or_init(|| { |
| 1082 | + Arc::new(SharedBoundsAccumulator::new_from_partition_mode( |
| 1083 | + self.mode, |
| 1084 | + self.left.as_ref(), |
| 1085 | + self.right.as_ref(), |
| 1086 | + )) |
| 1087 | + })); |
| 1088 | + |
1097 | 1089 | // we have the batches and the hash map with their keys. We can how create a stream |
1098 | 1090 | // over the right that uses this information to issue new batches. |
1099 | 1091 | let right_stream = self.right.execute(partition, context)?; |
@@ -1122,7 +1114,7 @@ impl ExecutionPlan for HashJoinExec { |
1122 | 1114 | batch_size, |
1123 | 1115 | hashes_buffer: vec![], |
1124 | 1116 | right_side_ordered: self.right.output_ordering().is_some(), |
1125 | | - bounds_accumulator: Arc::clone(&self.bounds_accumulator), |
| 1117 | + bounds_accumulator, |
1126 | 1118 | dynamic_filter: enable_dynamic_filter_pushdown |
1127 | 1119 | .then_some(Arc::clone(&self.dynamic_filter)), |
1128 | 1120 | })) |
|
0 commit comments