@@ -22,7 +22,7 @@ mod test {
2222 use datafusion:: prelude:: SessionContext ;
2323 use datafusion_catalog:: TableProvider ;
2424 use datafusion_common:: stats:: Precision ;
25- use datafusion_common:: { ScalarValue , Statistics } ;
25+ use datafusion_common:: { ColumnStatistics , ScalarValue , Statistics } ;
2626 use datafusion_execution:: config:: SessionConfig ;
2727 use datafusion_expr_common:: operator:: Operator ;
2828 use datafusion_physical_expr:: expressions:: { binary, lit, Column } ;
@@ -35,6 +35,18 @@ mod test {
3535 use datafusion_physical_plan:: ExecutionPlan ;
3636 use std:: sync:: Arc ;
3737
38+ /// Creates a test table with statistics from the test data directory.
39+ ///
40+ /// This function:
41+ /// - Creates an external table from './tests/data/test_statistics_per_partition'
42+ /// - If we set the `target_partition` to `2, the data contains 2 partitions, each with 2 rows
43+ /// - Each partition has an "id" column (INT) with the following values:
44+ /// - First partition: [3, 4]
45+ /// - Second partition: [1, 2]
46+ /// - Each row is 110 bytes in size
47+ ///
48+ /// @param target_partition Optional parameter to set the target partitions
49+ /// @return ExecutionPlan representing the scan of the table with statistics
3850 async fn generate_listing_table_with_statistics (
3951 target_partition : Option < usize > ,
4052 ) -> Arc < dyn ExecutionPlan > {
@@ -63,45 +75,52 @@ mod test {
6375 . unwrap ( )
6476 }
6577
66- fn check_unchanged_statistics ( statistics : Vec < Statistics > ) {
67- // Check the statistics of each partition
68- for stat in & statistics {
69- assert_eq ! ( stat. num_rows, Precision :: Exact ( 2 ) ) ;
70- // First column (id) should have non-null values
71- assert_eq ! ( stat. column_statistics[ 0 ] . null_count, Precision :: Exact ( 0 ) ) ;
78+ /// Helper function to create expected statistics for a partition with Int32 column
79+ fn create_partition_statistics (
80+ num_rows : usize ,
81+ total_byte_size : usize ,
82+ min_value : i32 ,
83+ max_value : i32 ,
84+ include_date_column : bool ,
85+ ) -> Statistics {
86+ let mut column_stats = vec ! [ ColumnStatistics {
87+ null_count: Precision :: Exact ( 0 ) ,
88+ max_value: Precision :: Exact ( ScalarValue :: Int32 ( Some ( max_value) ) ) ,
89+ min_value: Precision :: Exact ( ScalarValue :: Int32 ( Some ( min_value) ) ) ,
90+ sum_value: Precision :: Absent ,
91+ distinct_count: Precision :: Absent ,
92+ } ] ;
93+
94+ if include_date_column {
95+ column_stats. push ( ColumnStatistics {
96+ null_count : Precision :: Absent ,
97+ max_value : Precision :: Absent ,
98+ min_value : Precision :: Absent ,
99+ sum_value : Precision :: Absent ,
100+ distinct_count : Precision :: Absent ,
101+ } ) ;
72102 }
73103
74- // Verify specific id values for each partition
75- assert_eq ! (
76- statistics[ 0 ] . column_statistics[ 0 ] . max_value,
77- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 4 ) ) )
78- ) ;
79- assert_eq ! (
80- statistics[ 0 ] . column_statistics[ 0 ] . min_value,
81- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 3 ) ) )
82- ) ;
83- assert_eq ! (
84- statistics[ 1 ] . column_statistics[ 0 ] . max_value,
85- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 2 ) ) )
86- ) ;
87- assert_eq ! (
88- statistics[ 1 ] . column_statistics[ 0 ] . min_value,
89- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 1 ) ) )
90- ) ;
104+ Statistics {
105+ num_rows : Precision :: Exact ( num_rows) ,
106+ total_byte_size : Precision :: Exact ( total_byte_size) ,
107+ column_statistics : column_stats,
108+ }
91109 }
92110
93111 #[ tokio:: test]
94112 async fn test_statistics_by_partition_of_data_source ( ) -> datafusion_common:: Result < ( ) >
95113 {
96114 let scan = generate_listing_table_with_statistics ( Some ( 2 ) ) . await ;
97115 let statistics = scan. statistics_by_partition ( ) ?;
116+ let expected_statistic_partition_1 =
117+ create_partition_statistics ( 2 , 110 , 3 , 4 , true ) ;
118+ let expected_statistic_partition_2 =
119+ create_partition_statistics ( 2 , 110 , 1 , 2 , true ) ;
98120 // Check the statistics of each partition
99121 assert_eq ! ( statistics. len( ) , 2 ) ;
100- for stat in & statistics {
101- assert_eq ! ( stat. column_statistics. len( ) , 2 ) ;
102- assert_eq ! ( stat. total_byte_size, Precision :: Exact ( 110 ) ) ;
103- }
104- check_unchanged_statistics ( statistics) ;
122+ assert_eq ! ( statistics[ 0 ] , expected_statistic_partition_1) ;
123+ assert_eq ! ( statistics[ 1 ] , expected_statistic_partition_2) ;
105124 Ok ( ( ) )
106125 }
107126
@@ -114,11 +133,14 @@ mod test {
114133 vec ! [ ( Arc :: new( Column :: new( "id" , 0 ) ) , "id" . to_string( ) ) ] ;
115134 let projection = ProjectionExec :: try_new ( exprs, scan) ?;
116135 let statistics = projection. statistics_by_partition ( ) ?;
117- for stat in & statistics {
118- assert_eq ! ( stat. column_statistics. len( ) , 1 ) ;
119- assert_eq ! ( stat. total_byte_size, Precision :: Exact ( 8 ) ) ;
120- }
121- check_unchanged_statistics ( statistics) ;
136+ let expected_statistic_partition_1 =
137+ create_partition_statistics ( 2 , 8 , 3 , 4 , false ) ;
138+ let expected_statistic_partition_2 =
139+ create_partition_statistics ( 2 , 8 , 1 , 2 , false ) ;
140+ // Check the statistics of each partition
141+ assert_eq ! ( statistics. len( ) , 2 ) ;
142+ assert_eq ! ( statistics[ 0 ] , expected_statistic_partition_1) ;
143+ assert_eq ! ( statistics[ 1 ] , expected_statistic_partition_2) ;
122144 Ok ( ( ) )
123145 }
124146
@@ -138,55 +160,20 @@ mod test {
138160 ) ;
139161 let mut sort_exec = Arc :: new ( sort. clone ( ) ) ;
140162 let statistics = sort_exec. statistics_by_partition ( ) ?;
163+ let expected_statistic_partition =
164+ create_partition_statistics ( 4 , 220 , 1 , 4 , true ) ;
141165 assert_eq ! ( statistics. len( ) , 1 ) ;
142- assert_eq ! ( statistics[ 0 ] . num_rows, Precision :: Exact ( 4 ) ) ;
143- assert_eq ! ( statistics[ 0 ] . column_statistics. len( ) , 2 ) ;
144- assert_eq ! ( statistics[ 0 ] . total_byte_size, Precision :: Exact ( 220 ) ) ;
145- assert_eq ! (
146- statistics[ 0 ] . column_statistics[ 0 ] . null_count,
147- Precision :: Exact ( 0 )
148- ) ;
149- assert_eq ! (
150- statistics[ 0 ] . column_statistics[ 0 ] . max_value,
151- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 4 ) ) )
152- ) ;
153- assert_eq ! (
154- statistics[ 0 ] . column_statistics[ 0 ] . min_value,
155- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 1 ) ) )
156- ) ;
166+ assert_eq ! ( statistics[ 0 ] , expected_statistic_partition) ;
167+
157168 sort_exec = Arc :: new ( sort. with_preserve_partitioning ( true ) ) ;
169+ let expected_statistic_partition_1 =
170+ create_partition_statistics ( 2 , 110 , 3 , 4 , true ) ;
171+ let expected_statistic_partition_2 =
172+ create_partition_statistics ( 2 , 110 , 1 , 2 , true ) ;
158173 let statistics = sort_exec. statistics_by_partition ( ) ?;
159174 assert_eq ! ( statistics. len( ) , 2 ) ;
160- assert_eq ! ( statistics[ 0 ] . num_rows, Precision :: Exact ( 2 ) ) ;
161- assert_eq ! ( statistics[ 1 ] . num_rows, Precision :: Exact ( 2 ) ) ;
162- assert_eq ! ( statistics[ 0 ] . column_statistics. len( ) , 2 ) ;
163- assert_eq ! ( statistics[ 1 ] . column_statistics. len( ) , 2 ) ;
164- assert_eq ! ( statistics[ 0 ] . total_byte_size, Precision :: Exact ( 110 ) ) ;
165- assert_eq ! ( statistics[ 1 ] . total_byte_size, Precision :: Exact ( 110 ) ) ;
166- assert_eq ! (
167- statistics[ 0 ] . column_statistics[ 0 ] . null_count,
168- Precision :: Exact ( 0 )
169- ) ;
170- assert_eq ! (
171- statistics[ 0 ] . column_statistics[ 0 ] . max_value,
172- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 4 ) ) )
173- ) ;
174- assert_eq ! (
175- statistics[ 0 ] . column_statistics[ 0 ] . min_value,
176- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 3 ) ) )
177- ) ;
178- assert_eq ! (
179- statistics[ 1 ] . column_statistics[ 0 ] . null_count,
180- Precision :: Exact ( 0 )
181- ) ;
182- assert_eq ! (
183- statistics[ 1 ] . column_statistics[ 0 ] . max_value,
184- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 2 ) ) )
185- ) ;
186- assert_eq ! (
187- statistics[ 1 ] . column_statistics[ 0 ] . min_value,
188- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 1 ) ) )
189- ) ;
175+ assert_eq ! ( statistics[ 0 ] , expected_statistic_partition_1) ;
176+ assert_eq ! ( statistics[ 1 ] , expected_statistic_partition_2) ;
190177 Ok ( ( ) )
191178 }
192179
@@ -202,48 +189,33 @@ mod test {
202189 ) ?;
203190 let filter: Arc < dyn ExecutionPlan > =
204191 Arc :: new ( FilterExec :: try_new ( predicate, scan) ?) ;
205- let _full_statistics = filter. statistics ( ) ?;
206- // The full statistics is invalid, at least, we can improve the selectivity estimation of the filter
207- /*
208- Statistics {
209- num_rows: Inexact(0),
210- total_byte_size: Inexact(0),
211- column_statistics: [
212- ColumnStatistics {
213- null_count: Exact(0),
214- max_value: Exact(NULL),
215- min_value: Exact(NULL),
216- sum_value: Exact(NULL),
217- distinct_count: Exact(0),
218- },
219- ColumnStatistics {
220- null_count: Exact(0),
221- max_value: Exact(NULL),
222- min_value: Exact(NULL),
223- sum_value: Exact(NULL),
224- distinct_count: Exact(0),
225- },
226- ],
227- }
228- */
192+ let full_statistics = filter. statistics ( ) ?;
193+ let expected_full_statistic = Statistics {
194+ num_rows : Precision :: Inexact ( 0 ) ,
195+ total_byte_size : Precision :: Inexact ( 0 ) ,
196+ column_statistics : vec ! [
197+ ColumnStatistics {
198+ null_count: Precision :: Exact ( 0 ) ,
199+ max_value: Precision :: Exact ( ScalarValue :: Null ) ,
200+ min_value: Precision :: Exact ( ScalarValue :: Null ) ,
201+ sum_value: Precision :: Exact ( ScalarValue :: Null ) ,
202+ distinct_count: Precision :: Exact ( 0 ) ,
203+ } ,
204+ ColumnStatistics {
205+ null_count: Precision :: Exact ( 0 ) ,
206+ max_value: Precision :: Exact ( ScalarValue :: Null ) ,
207+ min_value: Precision :: Exact ( ScalarValue :: Null ) ,
208+ sum_value: Precision :: Exact ( ScalarValue :: Null ) ,
209+ distinct_count: Precision :: Exact ( 0 ) ,
210+ } ,
211+ ] ,
212+ } ;
213+ assert_eq ! ( full_statistics, expected_full_statistic) ;
214+
229215 let statistics = filter. statistics_by_partition ( ) ?;
230- // Also the statistics of each partition is also invalid due to above
231- // But we can ensure the current behavior by tests
232216 assert_eq ! ( statistics. len( ) , 2 ) ;
233- for stat in & statistics {
234- assert_eq ! ( stat. column_statistics. len( ) , 2 ) ;
235- assert_eq ! ( stat. total_byte_size, Precision :: Inexact ( 0 ) ) ;
236- assert_eq ! ( stat. num_rows, Precision :: Inexact ( 0 ) ) ;
237- assert_eq ! ( stat. column_statistics[ 0 ] . null_count, Precision :: Exact ( 0 ) ) ;
238- assert_eq ! (
239- stat. column_statistics[ 0 ] . max_value,
240- Precision :: Exact ( ScalarValue :: Null )
241- ) ;
242- assert_eq ! (
243- stat. column_statistics[ 0 ] . min_value,
244- Precision :: Exact ( ScalarValue :: Null )
245- ) ;
246- }
217+ assert_eq ! ( statistics[ 0 ] , expected_full_statistic) ;
218+ assert_eq ! ( statistics[ 1 ] , expected_full_statistic) ;
247219 Ok ( ( ) )
248220 }
249221
@@ -254,63 +226,18 @@ mod test {
254226 let statistics = union_exec. statistics_by_partition ( ) ?;
255227 // Check that we have 4 partitions (2 from each scan)
256228 assert_eq ! ( statistics. len( ) , 4 ) ;
257-
229+ let expected_statistic_partition_1 =
230+ create_partition_statistics ( 2 , 110 , 3 , 4 , true ) ;
231+ let expected_statistic_partition_2 =
232+ create_partition_statistics ( 2 , 110 , 1 , 2 , true ) ;
258233 // Verify first partition (from first scan)
259- assert_eq ! ( statistics[ 0 ] . num_rows, Precision :: Exact ( 2 ) ) ;
260- assert_eq ! ( statistics[ 0 ] . total_byte_size, Precision :: Exact ( 110 ) ) ;
261- assert_eq ! ( statistics[ 0 ] . column_statistics. len( ) , 2 ) ;
262- assert_eq ! (
263- statistics[ 0 ] . column_statistics[ 0 ] . null_count,
264- Precision :: Exact ( 0 )
265- ) ;
266- assert_eq ! (
267- statistics[ 0 ] . column_statistics[ 0 ] . max_value,
268- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 4 ) ) )
269- ) ;
270- assert_eq ! (
271- statistics[ 0 ] . column_statistics[ 0 ] . min_value,
272- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 3 ) ) )
273- ) ;
274-
234+ assert_eq ! ( statistics[ 0 ] , expected_statistic_partition_1) ;
275235 // Verify second partition (from first scan)
276- assert_eq ! ( statistics[ 1 ] . num_rows, Precision :: Exact ( 2 ) ) ;
277- assert_eq ! ( statistics[ 1 ] . total_byte_size, Precision :: Exact ( 110 ) ) ;
278- assert_eq ! (
279- statistics[ 1 ] . column_statistics[ 0 ] . null_count,
280- Precision :: Exact ( 0 )
281- ) ;
282- assert_eq ! (
283- statistics[ 1 ] . column_statistics[ 0 ] . max_value,
284- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 2 ) ) )
285- ) ;
286- assert_eq ! (
287- statistics[ 1 ] . column_statistics[ 0 ] . min_value,
288- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 1 ) ) )
289- ) ;
290-
236+ assert_eq ! ( statistics[ 1 ] , expected_statistic_partition_2) ;
291237 // Verify third partition (from second scan - same as first partition)
292- assert_eq ! ( statistics[ 2 ] . num_rows, Precision :: Exact ( 2 ) ) ;
293- assert_eq ! ( statistics[ 2 ] . total_byte_size, Precision :: Exact ( 110 ) ) ;
294- assert_eq ! (
295- statistics[ 2 ] . column_statistics[ 0 ] . max_value,
296- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 4 ) ) )
297- ) ;
298- assert_eq ! (
299- statistics[ 2 ] . column_statistics[ 0 ] . min_value,
300- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 3 ) ) )
301- ) ;
302-
238+ assert_eq ! ( statistics[ 2 ] , expected_statistic_partition_1) ;
303239 // Verify fourth partition (from second scan - same as second partition)
304- assert_eq ! ( statistics[ 3 ] . num_rows, Precision :: Exact ( 2 ) ) ;
305- assert_eq ! ( statistics[ 3 ] . total_byte_size, Precision :: Exact ( 110 ) ) ;
306- assert_eq ! (
307- statistics[ 3 ] . column_statistics[ 0 ] . max_value,
308- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 2 ) ) )
309- ) ;
310- assert_eq ! (
311- statistics[ 3 ] . column_statistics[ 0 ] . min_value,
312- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 1 ) ) )
313- ) ;
240+ assert_eq ! ( statistics[ 3 ] , expected_statistic_partition_2) ;
314241
315242 Ok ( ( ) )
316243 }
0 commit comments