1717
1818#[ cfg( test) ]
1919mod test {
20- use std:: sync:: Arc ;
21- use datafusion:: datasource:: listing:: { ListingOptions , ListingTable , ListingTableConfig } ;
22- use datafusion:: execution:: SessionStateBuilder ;
20+ use arrow_schema:: { DataType , Field , Schema , SortOptions } ;
21+ use datafusion:: datasource:: listing:: ListingTable ;
2322 use datafusion:: prelude:: SessionContext ;
2423 use datafusion_catalog:: TableProvider ;
25- use datafusion_common:: config:: ConfigOptions ;
26- use datafusion_datasource:: ListingTableUrl ;
27- use datafusion_datasource:: source:: DataSourceExec ;
28- use datafusion_datasource_parquet:: ParquetFormat ;
24+ use datafusion_common:: stats:: Precision ;
25+ use datafusion_common:: { ScalarValue , Statistics } ;
2926 use datafusion_execution:: config:: SessionConfig ;
30- use datafusion_execution:: runtime_env:: RuntimeEnvBuilder ;
27+ use datafusion_expr_common:: operator:: Operator ;
28+ use datafusion_physical_expr:: expressions:: { binary, lit, Column } ;
29+ use datafusion_physical_expr_common:: physical_expr:: PhysicalExpr ;
30+ use datafusion_physical_expr_common:: sort_expr:: { LexOrdering , PhysicalSortExpr } ;
31+ use datafusion_physical_plan:: filter:: FilterExec ;
32+ use datafusion_physical_plan:: projection:: ProjectionExec ;
33+ use datafusion_physical_plan:: sorts:: sort:: SortExec ;
34+ use datafusion_physical_plan:: union:: UnionExec ;
3135 use datafusion_physical_plan:: ExecutionPlan ;
36+ use std:: sync:: Arc ;
3237
33- async fn generate_listing_table_with_statistics ( ) -> Arc < dyn ExecutionPlan > {
34- let testdata = datafusion:: test_util:: parquet_test_data ( ) ;
35- let filename = format ! ( "{}/{}" , testdata, "alltypes_tiny_pages.parquet" ) ;
36- let table_path = ListingTableUrl :: parse ( filename) . unwrap ( ) ;
37- let opt = ListingOptions :: new ( Arc :: new ( ParquetFormat :: default ( ) ) ) . with_collect_stat ( true ) ;
38- let rt = RuntimeEnvBuilder :: new ( )
39- . build_arc ( )
40- . expect ( "could not build runtime environment" ) ;
41-
42- let state = SessionContext :: new_with_config_rt ( SessionConfig :: default ( ) , rt) . state ( ) ;
43- let schema = opt
44- . infer_schema (
45- & SessionStateBuilder :: new ( ) . with_default_features ( ) . build ( ) ,
46- & table_path,
47- )
38+ async fn generate_listing_table_with_statistics (
39+ target_partition : Option < usize > ,
40+ ) -> Arc < dyn ExecutionPlan > {
41+ // Delete the existing data directory if it exists
42+ let data_dir = "./data/" ;
43+ let _ = std:: fs:: remove_dir_all ( data_dir) ;
44+ let mut session_config = SessionConfig :: new ( ) . with_collect_statistics ( true ) ;
45+ if let Some ( partition) = target_partition {
46+ session_config = session_config. with_target_partitions ( partition) ;
47+ }
48+ let ctx = SessionContext :: new_with_config ( session_config) ;
49+ // Create table with partition
50+ let create_table_sql = "CREATE EXTERNAL TABLE t1 (id INT not null, date DATE) STORED AS PARQUET LOCATION './data/' PARTITIONED BY (date) WITH ORDER (id ASC);" ;
51+ ctx. sql ( create_table_sql)
52+ . await
53+ . unwrap ( )
54+ . collect ( )
4855 . await
4956 . unwrap ( ) ;
50- let config = ListingTableConfig :: new ( table_path. clone ( ) )
51- . with_listing_options ( opt. clone ( ) )
52- . with_schema ( schema) ;
53- let table = ListingTable :: try_new ( config) . unwrap ( ) ;
54- let res= table. scan ( & state, None , & [ ] , None ) . await . unwrap ( ) ;
55- dbg ! ( & res. statistics( ) . unwrap( ) ) ;
56- dbg ! ( & res. statistics_by_partition( ) . unwrap( ) ) ;
57- let mut config = ConfigOptions :: new ( ) ;
58- config. set ( "datafusion.optimizer.repartition_file_min_size" , "10" ) . unwrap ( ) ;
59- let res = res. repartitioned ( 5 , & config) . unwrap ( ) . unwrap ( ) ;
60- dbg ! ( & res. statistics_by_partition( ) . unwrap( ) ) ;
61- res
57+ // Insert data into the table, will generate partition files with parquet format
58+ let insert_data = "INSERT INTO t1 VALUES (4, '2025-03-01'), (3, '2025-3-02'), (2, '2025-03-03'), (1, '2025-03-04');" ;
59+ ctx. sql ( insert_data) . await . unwrap ( ) . collect ( ) . await . unwrap ( ) ;
60+ let table = ctx. table_provider ( "t1" ) . await . unwrap ( ) ;
61+ let listing_table = table
62+ . as_any ( )
63+ . downcast_ref :: < ListingTable > ( )
64+ . unwrap ( )
65+ . clone ( ) ;
66+ listing_table
67+ . scan ( & ctx. state ( ) , None , & [ ] , None )
68+ . await
69+ . unwrap ( )
6270 }
6371
64- #[ tokio:: test]
65- async fn test_statistics_by_partition_of_data_source ( ) -> datafusion_common:: Result < ( ) > {
66- generate_listing_table_with_statistics ( ) . await ;
67- Ok ( ( ) )
68- }
72+ fn check_unchanged_statistics ( statistics : Vec < Statistics > ) {
73+ // Check the statistics of each partition
74+ for stat in & statistics {
75+ assert_eq ! ( stat. num_rows, Precision :: Exact ( 1 ) ) ;
76+ // First column (id) should have non-null values
77+ assert_eq ! ( stat. column_statistics[ 0 ] . null_count, Precision :: Exact ( 0 ) ) ;
78+ }
6979
70- #[ test]
71- fn test_statistics_by_partition_of_projection ( ) -> datafusion_common:: Result < ( ) > {
72- Ok ( ( ) )
80+ // Verify specific id values for each partition
81+ assert_eq ! (
82+ statistics[ 0 ] . column_statistics[ 0 ] . max_value,
83+ Precision :: Exact ( ScalarValue :: Int32 ( Some ( 4 ) ) )
84+ ) ;
85+ assert_eq ! (
86+ statistics[ 1 ] . column_statistics[ 0 ] . max_value,
87+ Precision :: Exact ( ScalarValue :: Int32 ( Some ( 3 ) ) )
88+ ) ;
89+ assert_eq ! (
90+ statistics[ 2 ] . column_statistics[ 0 ] . max_value,
91+ Precision :: Exact ( ScalarValue :: Int32 ( Some ( 2 ) ) )
92+ ) ;
93+ assert_eq ! (
94+ statistics[ 3 ] . column_statistics[ 0 ] . max_value,
95+ Precision :: Exact ( ScalarValue :: Int32 ( Some ( 1 ) ) )
96+ ) ;
7397 }
7498
75- #[ test]
76- fn test_statistics_by_partition_of_sort ( ) -> datafusion_common:: Result < ( ) > {
99+ #[ tokio:: test]
100+ async fn test_statistics_by_partition_of_data_source ( ) -> datafusion_common:: Result < ( ) >
101+ {
102+ let scan = generate_listing_table_with_statistics ( None ) . await ;
103+ let statistics = scan. statistics_by_partition ( ) ?;
104+ // Check the statistics of each partition
105+ assert_eq ! ( statistics. len( ) , 4 ) ;
106+ for stat in & statistics {
107+ assert_eq ! ( stat. column_statistics. len( ) , 2 ) ;
108+ assert_eq ! ( stat. total_byte_size, Precision :: Exact ( 55 ) ) ;
109+ }
110+ check_unchanged_statistics ( statistics) ;
77111 Ok ( ( ) )
78112 }
79113
80- #[ test]
81- fn test_statistics_by_partition_of_filter ( ) -> datafusion_common:: Result < ( ) > {
114+ #[ tokio:: test]
115+ async fn test_statistics_by_partition_of_projection ( ) -> datafusion_common:: Result < ( ) >
116+ {
117+ let scan = generate_listing_table_with_statistics ( None ) . await ;
118+ // Add projection execution plan
119+ let exprs: Vec < ( Arc < dyn PhysicalExpr > , String ) > =
120+ vec ! [ ( Arc :: new( Column :: new( "id" , 0 ) ) , "id" . to_string( ) ) ] ;
121+ let projection = ProjectionExec :: try_new ( exprs, scan) ?;
122+ let statistics = projection. statistics_by_partition ( ) ?;
123+ for stat in & statistics {
124+ assert_eq ! ( stat. column_statistics. len( ) , 1 ) ;
125+ assert_eq ! ( stat. total_byte_size, Precision :: Exact ( 4 ) ) ;
126+ }
127+ check_unchanged_statistics ( statistics) ;
82128 Ok ( ( ) )
83129 }
84130
85- #[ test]
86- fn test_statistics_by_partition_of_aggregate ( ) -> datafusion_common:: Result < ( ) > {
131+ #[ tokio:: test]
132+ async fn test_statistics_by_partition_of_sort ( ) -> datafusion_common:: Result < ( ) > {
133+ let scan = generate_listing_table_with_statistics ( Some ( 2 ) ) . await ;
134+ // Add sort execution plan
135+ let sort = SortExec :: new (
136+ LexOrdering :: new ( vec ! [ PhysicalSortExpr {
137+ expr: Arc :: new( Column :: new( "id" , 0 ) ) ,
138+ options: SortOptions {
139+ descending: false ,
140+ nulls_first: false ,
141+ } ,
142+ } ] ) ,
143+ scan,
144+ ) ;
145+ let mut sort_exec = Arc :: new ( sort. clone ( ) ) ;
146+ let statistics = sort_exec. statistics_by_partition ( ) ?;
147+ assert_eq ! ( statistics. len( ) , 1 ) ;
148+ assert_eq ! ( statistics[ 0 ] . num_rows, Precision :: Exact ( 4 ) ) ;
149+ assert_eq ! ( statistics[ 0 ] . column_statistics. len( ) , 2 ) ;
150+ assert_eq ! ( statistics[ 0 ] . total_byte_size, Precision :: Exact ( 220 ) ) ;
151+ assert_eq ! (
152+ statistics[ 0 ] . column_statistics[ 0 ] . null_count,
153+ Precision :: Exact ( 0 )
154+ ) ;
155+ assert_eq ! (
156+ statistics[ 0 ] . column_statistics[ 0 ] . max_value,
157+ Precision :: Exact ( ScalarValue :: Int32 ( Some ( 4 ) ) )
158+ ) ;
159+ assert_eq ! (
160+ statistics[ 0 ] . column_statistics[ 0 ] . min_value,
161+ Precision :: Exact ( ScalarValue :: Int32 ( Some ( 1 ) ) )
162+ ) ;
163+ sort_exec = Arc :: new ( sort. with_preserve_partitioning ( true ) ) ;
164+ let statistics = sort_exec. statistics_by_partition ( ) ?;
165+ dbg ! ( & statistics) ;
166+ assert_eq ! ( statistics. len( ) , 2 ) ;
167+ assert_eq ! ( statistics[ 0 ] . num_rows, Precision :: Exact ( 2 ) ) ;
168+ assert_eq ! ( statistics[ 1 ] . num_rows, Precision :: Exact ( 2 ) ) ;
169+ assert_eq ! ( statistics[ 0 ] . column_statistics. len( ) , 2 ) ;
170+ assert_eq ! ( statistics[ 1 ] . column_statistics. len( ) , 2 ) ;
171+ assert_eq ! ( statistics[ 0 ] . total_byte_size, Precision :: Exact ( 110 ) ) ;
172+ assert_eq ! ( statistics[ 1 ] . total_byte_size, Precision :: Exact ( 110 ) ) ;
173+ assert_eq ! (
174+ statistics[ 0 ] . column_statistics[ 0 ] . null_count,
175+ Precision :: Exact ( 0 )
176+ ) ;
177+ assert_eq ! (
178+ statistics[ 0 ] . column_statistics[ 0 ] . max_value,
179+ Precision :: Exact ( ScalarValue :: Int32 ( Some ( 4 ) ) )
180+ ) ;
181+ assert_eq ! (
182+ statistics[ 0 ] . column_statistics[ 0 ] . min_value,
183+ Precision :: Exact ( ScalarValue :: Int32 ( Some ( 3 ) ) )
184+ ) ;
185+ assert_eq ! (
186+ statistics[ 1 ] . column_statistics[ 0 ] . null_count,
187+ Precision :: Exact ( 0 )
188+ ) ;
189+ assert_eq ! (
190+ statistics[ 1 ] . column_statistics[ 0 ] . max_value,
191+ Precision :: Exact ( ScalarValue :: Int32 ( Some ( 2 ) ) )
192+ ) ;
193+ assert_eq ! (
194+ statistics[ 1 ] . column_statistics[ 0 ] . min_value,
195+ Precision :: Exact ( ScalarValue :: Int32 ( Some ( 1 ) ) )
196+ ) ;
87197 Ok ( ( ) )
88198 }
89199
90- #[ test]
91- fn test_statistic_by_partition_of_cross_join ( ) -> datafusion_common:: Result < ( ) > {
200+ #[ tokio:: test]
201+ async fn test_statistics_by_partition_of_filter ( ) -> datafusion_common:: Result < ( ) > {
202+ let scan = generate_listing_table_with_statistics ( None ) . await ;
203+ let schema = Schema :: new ( vec ! [ Field :: new( "id" , DataType :: Int32 , false ) ] ) ;
204+ let predicate = binary (
205+ Arc :: new ( Column :: new ( "id" , 0 ) ) ,
206+ Operator :: Lt ,
207+ lit ( 1i32 ) ,
208+ & schema,
209+ ) ?;
210+ let filter: Arc < dyn ExecutionPlan > =
211+ Arc :: new ( FilterExec :: try_new ( predicate, scan) ?) ;
212+ let _full_statistics = filter. statistics ( ) ?;
213+ // The full statistics is invalid, at least, we can improve the selectivity estimation of the filter
214+ /*
215+ Statistics {
216+ num_rows: Inexact(0),
217+ total_byte_size: Inexact(0),
218+ column_statistics: [
219+ ColumnStatistics {
220+ null_count: Exact(0),
221+ max_value: Exact(NULL),
222+ min_value: Exact(NULL),
223+ sum_value: Exact(NULL),
224+ distinct_count: Exact(0),
225+ },
226+ ColumnStatistics {
227+ null_count: Exact(0),
228+ max_value: Exact(NULL),
229+ min_value: Exact(NULL),
230+ sum_value: Exact(NULL),
231+ distinct_count: Exact(0),
232+ },
233+ ],
234+ }
235+ */
236+ let statistics = filter. statistics_by_partition ( ) ?;
237+ // Also the statistics of each partition is also invalid due to above
238+ // But we can ensure the current behavior by tests
239+ assert_eq ! ( statistics. len( ) , 4 ) ;
240+ for stat in & statistics {
241+ assert_eq ! ( stat. column_statistics. len( ) , 2 ) ;
242+ assert_eq ! ( stat. total_byte_size, Precision :: Inexact ( 0 ) ) ;
243+ assert_eq ! ( stat. num_rows, Precision :: Inexact ( 0 ) ) ;
244+ assert_eq ! ( stat. column_statistics[ 0 ] . null_count, Precision :: Exact ( 0 ) ) ;
245+ assert_eq ! (
246+ stat. column_statistics[ 0 ] . max_value,
247+ Precision :: Exact ( ScalarValue :: Null )
248+ ) ;
249+ assert_eq ! (
250+ stat. column_statistics[ 0 ] . min_value,
251+ Precision :: Exact ( ScalarValue :: Null )
252+ ) ;
253+ }
92254 Ok ( ( ) )
93255 }
94256
95- #[ test]
96- fn test_statistic_by_partition_of_union ( ) -> datafusion_common:: Result < ( ) > {
97- Ok ( ( ) )
98- }
257+ #[ tokio:: test]
258+ async fn test_statistic_by_partition_of_union ( ) -> datafusion_common:: Result < ( ) > {
259+ let scan = generate_listing_table_with_statistics ( Some ( 2 ) ) . await ;
260+ let union_exec = Arc :: new ( UnionExec :: new ( vec ! [ scan. clone( ) , scan] ) ) ;
261+ let statistics = union_exec. statistics_by_partition ( ) ?;
262+ // Check that we have 4 partitions (2 from each scan)
263+ assert_eq ! ( statistics. len( ) , 4 ) ;
99264
100- #[ test]
101- fn test_statistic_by_partition_of_smp ( ) -> datafusion_common:: Result < ( ) > {
102- Ok ( ( ) )
103- }
265+ // Verify first partition (from first scan)
266+ assert_eq ! ( statistics[ 0 ] . num_rows, Precision :: Exact ( 2 ) ) ;
267+ assert_eq ! ( statistics[ 0 ] . total_byte_size, Precision :: Exact ( 110 ) ) ;
268+ assert_eq ! ( statistics[ 0 ] . column_statistics. len( ) , 2 ) ;
269+ assert_eq ! (
270+ statistics[ 0 ] . column_statistics[ 0 ] . null_count,
271+ Precision :: Exact ( 0 )
272+ ) ;
273+ assert_eq ! (
274+ statistics[ 0 ] . column_statistics[ 0 ] . max_value,
275+ Precision :: Exact ( ScalarValue :: Int32 ( Some ( 4 ) ) )
276+ ) ;
277+ assert_eq ! (
278+ statistics[ 0 ] . column_statistics[ 0 ] . min_value,
279+ Precision :: Exact ( ScalarValue :: Int32 ( Some ( 3 ) ) )
280+ ) ;
104281
105- #[ test]
106- fn test_statistic_by_partition_of_limit ( ) -> datafusion_common:: Result < ( ) > {
107- Ok ( ( ) )
108- }
282+ // Verify second partition (from first scan)
283+ assert_eq ! ( statistics[ 1 ] . num_rows, Precision :: Exact ( 2 ) ) ;
284+ assert_eq ! ( statistics[ 1 ] . total_byte_size, Precision :: Exact ( 110 ) ) ;
285+ assert_eq ! (
286+ statistics[ 1 ] . column_statistics[ 0 ] . null_count,
287+ Precision :: Exact ( 0 )
288+ ) ;
289+ assert_eq ! (
290+ statistics[ 1 ] . column_statistics[ 0 ] . max_value,
291+ Precision :: Exact ( ScalarValue :: Int32 ( Some ( 2 ) ) )
292+ ) ;
293+ assert_eq ! (
294+ statistics[ 1 ] . column_statistics[ 0 ] . min_value,
295+ Precision :: Exact ( ScalarValue :: Int32 ( Some ( 1 ) ) )
296+ ) ;
297+
298+ // Verify third partition (from second scan - same as first partition)
299+ assert_eq ! ( statistics[ 2 ] . num_rows, Precision :: Exact ( 2 ) ) ;
300+ assert_eq ! ( statistics[ 2 ] . total_byte_size, Precision :: Exact ( 110 ) ) ;
301+ assert_eq ! (
302+ statistics[ 2 ] . column_statistics[ 0 ] . max_value,
303+ Precision :: Exact ( ScalarValue :: Int32 ( Some ( 4 ) ) )
304+ ) ;
305+ assert_eq ! (
306+ statistics[ 2 ] . column_statistics[ 0 ] . min_value,
307+ Precision :: Exact ( ScalarValue :: Int32 ( Some ( 3 ) ) )
308+ ) ;
309+
310+ // Verify fourth partition (from second scan - same as second partition)
311+ assert_eq ! ( statistics[ 3 ] . num_rows, Precision :: Exact ( 2 ) ) ;
312+ assert_eq ! ( statistics[ 3 ] . total_byte_size, Precision :: Exact ( 110 ) ) ;
313+ assert_eq ! (
314+ statistics[ 3 ] . column_statistics[ 0 ] . max_value,
315+ Precision :: Exact ( ScalarValue :: Int32 ( Some ( 2 ) ) )
316+ ) ;
317+ assert_eq ! (
318+ statistics[ 3 ] . column_statistics[ 0 ] . min_value,
319+ Precision :: Exact ( ScalarValue :: Int32 ( Some ( 1 ) ) )
320+ ) ;
321+
322+ // Delete the existing data directory if it exists
323+ let data_dir = "./data/" ;
324+ let _ = std:: fs:: remove_dir_all ( data_dir) ;
109325
110- #[ test]
111- fn test_statistic_by_partition_of_coalesce ( ) -> datafusion_common:: Result < ( ) > {
112326 Ok ( ( ) )
113327 }
114-
115- }
328+ }
0 commit comments