File tree Expand file tree Collapse file tree 11 files changed +31
-40
lines changed
datafusion/physical-expr/src Expand file tree Collapse file tree 11 files changed +31
-40
lines changed Original file line number Diff line number Diff line change @@ -134,10 +134,6 @@ impl AggregateExpr for Avg {
134134 is_row_accumulator_support_dtype ( & self . sum_data_type )
135135 }
136136
137- fn supports_bounded_execution ( & self ) -> bool {
138- true
139- }
140-
141137 fn create_row_accumulator (
142138 & self ,
143139 start_index : usize ,
@@ -263,6 +259,9 @@ impl Accumulator for AvgAccumulator {
263259 ) ) ,
264260 }
265261 }
262+ fn supports_retract_batch ( & self ) -> bool {
263+ true
264+ }
266265
267266 fn size ( & self ) -> usize {
268267 std:: mem:: size_of_val ( self ) - std:: mem:: size_of_val ( & self . sum ) + self . sum . size ( )
Original file line number Diff line number Diff line change @@ -133,10 +133,6 @@ impl AggregateExpr for Count {
133133 true
134134 }
135135
136- fn supports_bounded_execution ( & self ) -> bool {
137- true
138- }
139-
140136 fn create_row_accumulator (
141137 & self ,
142138 start_index : usize ,
@@ -214,6 +210,10 @@ impl Accumulator for CountAccumulator {
214210 Ok ( ScalarValue :: Int64 ( Some ( self . count ) ) )
215211 }
216212
213+ fn supports_retract_batch ( & self ) -> bool {
214+ true
215+ }
216+
217217 fn size ( & self ) -> usize {
218218 std:: mem:: size_of_val ( self )
219219 }
Original file line number Diff line number Diff line change @@ -125,10 +125,6 @@ impl AggregateExpr for Max {
125125 is_row_accumulator_support_dtype ( & self . data_type )
126126 }
127127
128- fn supports_bounded_execution ( & self ) -> bool {
129- true
130- }
131-
132128 fn create_row_accumulator (
133129 & self ,
134130 start_index : usize ,
@@ -699,6 +695,10 @@ impl Accumulator for SlidingMaxAccumulator {
699695 Ok ( self . max . clone ( ) )
700696 }
701697
698+ fn supports_retract_batch ( & self ) -> bool {
699+ true
700+ }
701+
702702 fn size ( & self ) -> usize {
703703 std:: mem:: size_of_val ( self ) - std:: mem:: size_of_val ( & self . max ) + self . max . size ( )
704704 }
@@ -825,10 +825,6 @@ impl AggregateExpr for Min {
825825 is_row_accumulator_support_dtype ( & self . data_type )
826826 }
827827
828- fn supports_bounded_execution ( & self ) -> bool {
829- true
830- }
831-
832828 fn create_row_accumulator (
833829 & self ,
834830 start_index : usize ,
@@ -958,6 +954,10 @@ impl Accumulator for SlidingMinAccumulator {
958954 Ok ( self . min . clone ( ) )
959955 }
960956
957+ fn supports_retract_batch ( & self ) -> bool {
958+ true
959+ }
960+
961961 fn size ( & self ) -> usize {
962962 std:: mem:: size_of_val ( self ) - std:: mem:: size_of_val ( & self . min ) + self . min . size ( )
963963 }
Original file line number Diff line number Diff line change @@ -96,12 +96,6 @@ pub trait AggregateExpr: Send + Sync + Debug + PartialEq<dyn Any> {
9696 false
9797 }
9898
99- /// Specifies whether this aggregate function can run using bounded memory.
100- /// Any accumulator returning "true" needs to implement `retract_batch`.
101- fn supports_bounded_execution ( & self ) -> bool {
102- false
103- }
104-
10599 /// RowAccumulator to access/update row-based aggregation state in-place.
106100 /// Currently, row accumulator only supports states of fixed-sized type.
107101 ///
Original file line number Diff line number Diff line change @@ -131,10 +131,6 @@ impl AggregateExpr for Sum {
131131 is_row_accumulator_support_dtype ( & self . data_type )
132132 }
133133
134- fn supports_bounded_execution ( & self ) -> bool {
135- true
136- }
137-
138134 fn create_row_accumulator (
139135 & self ,
140136 start_index : usize ,
@@ -361,6 +357,10 @@ impl Accumulator for SumAccumulator {
361357 }
362358 }
363359
360+ fn supports_retract_batch ( & self ) -> bool {
361+ true
362+ }
363+
364364 fn size ( & self ) -> usize {
365365 std:: mem:: size_of_val ( self ) - std:: mem:: size_of_val ( & self . sum ) + self . sum . size ( )
366366 }
Original file line number Diff line number Diff line change @@ -155,8 +155,7 @@ impl WindowExpr for PlainAggregateWindowExpr {
155155 }
156156
157157 fn uses_bounded_memory ( & self ) -> bool {
158- self . aggregate . supports_bounded_execution ( )
159- && !self . window_frame . end_bound . is_unbounded ( )
158+ !self . window_frame . end_bound . is_unbounded ( )
160159 }
161160}
162161
Original file line number Diff line number Diff line change @@ -122,7 +122,7 @@ impl WindowExpr for BuiltInWindowExpr {
122122 } else if evaluator. include_rank ( ) {
123123 let columns = self . sort_columns ( batch) ?;
124124 let sort_partition_points = evaluate_partition_ranges ( num_rows, & columns) ?;
125- evaluator. evaluate_with_rank_all ( num_rows, & sort_partition_points)
125+ evaluator. evaluate_all_with_rank ( num_rows, & sort_partition_points)
126126 } else {
127127 let ( values, _) = self . get_values_orderbys ( batch) ?;
128128 evaluator. evaluate_all ( & values, num_rows)
Original file line number Diff line number Diff line change @@ -70,7 +70,7 @@ impl BuiltInWindowFunctionExpr for CumeDist {
7070pub ( crate ) struct CumeDistEvaluator ;
7171
7272impl PartitionEvaluator for CumeDistEvaluator {
73- fn evaluate_with_rank_all (
73+ fn evaluate_all_with_rank (
7474 & self ,
7575 num_rows : usize ,
7676 ranks_in_partition : & [ Range < usize > ] ,
@@ -109,7 +109,7 @@ mod tests {
109109 ) -> Result < ( ) > {
110110 let result = expr
111111 . create_evaluator ( ) ?
112- . evaluate_with_rank_all ( num_rows, & ranks) ?;
112+ . evaluate_all_with_rank ( num_rows, & ranks) ?;
113113 let result = as_float64_array ( & result) ?;
114114 let result = result. values ( ) ;
115115 assert_eq ! ( expected, * result) ;
Original file line number Diff line number Diff line change @@ -69,7 +69,7 @@ use std::ops::Range;
6969///
7070/// # Stateless `PartitionEvaluator`
7171///
72- /// In this case, either [`Self::evaluate_all`] or [`Self::evaluate_with_rank_all `] is called with values for the
72+ /// In this case, either [`Self::evaluate_all`] or [`Self::evaluate_all_with_rank `] is called with values for the
7373/// entire partition.
7474///
7575/// # Stateful `PartitionEvaluator`
@@ -221,7 +221,7 @@ pub trait PartitionEvaluator: Debug + Send {
221221 ) )
222222 }
223223
224- /// [`PartitionEvaluator::evaluate_with_rank_all `] is called for window
224+ /// [`PartitionEvaluator::evaluate_all_with_rank `] is called for window
225225 /// functions that only need the rank of a row within its window
226226 /// frame.
227227 ///
@@ -248,7 +248,7 @@ pub trait PartitionEvaluator: Debug + Send {
248248 /// (3,4),
249249 /// ]
250250 /// ```
251- fn evaluate_with_rank_all (
251+ fn evaluate_all_with_rank (
252252 & self ,
253253 _num_rows : usize ,
254254 _ranks_in_partition : & [ Range < usize > ] ,
@@ -278,7 +278,7 @@ pub trait PartitionEvaluator: Debug + Send {
278278
279279 /// Can this function be evaluated with (only) rank
280280 ///
281- /// If `include_rank` is true, implement [`PartitionEvaluator::evaluate_with_rank_all `]
281+ /// If `include_rank` is true, implement [`PartitionEvaluator::evaluate_all_with_rank `]
282282 fn include_rank ( & self ) -> bool {
283283 false
284284 }
Original file line number Diff line number Diff line change @@ -159,7 +159,7 @@ impl PartitionEvaluator for RankEvaluator {
159159 }
160160 }
161161
162- fn evaluate_with_rank_all (
162+ fn evaluate_all_with_rank (
163163 & self ,
164164 num_rows : usize ,
165165 ranks_in_partition : & [ Range < usize > ] ,
@@ -236,7 +236,7 @@ mod tests {
236236 ) -> Result < ( ) > {
237237 let result = expr
238238 . create_evaluator ( ) ?
239- . evaluate_with_rank_all ( num_rows, & ranks) ?;
239+ . evaluate_all_with_rank ( num_rows, & ranks) ?;
240240 let result = as_float64_array ( & result) ?;
241241 let result = result. values ( ) ;
242242 assert_eq ! ( expected, * result) ;
@@ -248,7 +248,7 @@ mod tests {
248248 ranks : Vec < Range < usize > > ,
249249 expected : Vec < u64 > ,
250250 ) -> Result < ( ) > {
251- let result = expr. create_evaluator ( ) ?. evaluate_with_rank_all ( 8 , & ranks) ?;
251+ let result = expr. create_evaluator ( ) ?. evaluate_all_with_rank ( 8 , & ranks) ?;
252252 let result = as_uint64_array ( & result) ?;
253253 let result = result. values ( ) ;
254254 assert_eq ! ( expected, * result) ;
You can’t perform that action at this time.
0 commit comments