@@ -171,7 +171,13 @@ impl Partition {
171171 trace ! ( "Listing partition {}" , self . path) ;
172172 let prefix = Some ( & self . path ) . filter ( |p| !p. as_ref ( ) . is_empty ( ) ) ;
173173 let result = store. list_with_delimiter ( prefix) . await ?;
174- self . files = Some ( result. objects ) ;
174+ self . files = Some (
175+ result
176+ . objects
177+ . into_iter ( )
178+ . filter ( |object_meta| object_meta. size > 0 )
179+ . collect ( ) ,
180+ ) ;
175181 Ok ( ( self , result. common_prefixes ) )
176182 }
177183}
@@ -418,6 +424,7 @@ pub async fn pruned_partition_list<'a>(
418424 table_path
419425 . list_all_files ( ctx, store, file_extension)
420426 . await ?
427+ . try_filter ( |object_meta| futures:: future:: ready ( object_meta. size > 0 ) )
421428 . map_ok ( |object_meta| object_meta. into ( ) ) ,
422429 ) ) ;
423430 }
@@ -566,6 +573,7 @@ mod tests {
566573 async fn test_pruned_partition_list_empty ( ) {
567574 let ( store, state) = make_test_store_and_state ( & [
568575 ( "tablepath/mypartition=val1/notparquetfile" , 100 ) ,
576+ ( "tablepath/mypartition=val1/ignoresemptyfile.parquet" , 0 ) ,
569577 ( "tablepath/file.parquet" , 100 ) ,
570578 ] ) ;
571579 let filter = Expr :: eq ( col ( "mypartition" ) , lit ( "val1" ) ) ;
@@ -590,6 +598,7 @@ mod tests {
590598 let ( store, state) = make_test_store_and_state ( & [
591599 ( "tablepath/mypartition=val1/file.parquet" , 100 ) ,
592600 ( "tablepath/mypartition=val2/file.parquet" , 100 ) ,
601+ ( "tablepath/mypartition=val1/ignoresemptyfile.parquet" , 0 ) ,
593602 ( "tablepath/mypartition=val1/other=val3/file.parquet" , 100 ) ,
594603 ] ) ;
595604 let filter = Expr :: eq ( col ( "mypartition" ) , lit ( "val1" ) ) ;
@@ -671,6 +680,107 @@ mod tests {
671680 ) ;
672681 }
673682
683+ /// Describe a partition as a (path, depth, files) tuple for easier assertions
684+ fn describe_partition ( partition : & Partition ) -> ( & str , usize , Vec < & str > ) {
685+ (
686+ partition. path . as_ref ( ) ,
687+ partition. depth ,
688+ partition
689+ . files
690+ . as_ref ( )
691+ . map ( |f| f. iter ( ) . map ( |f| f. location . filename ( ) . unwrap ( ) ) . collect ( ) )
692+ . unwrap_or_default ( ) ,
693+ )
694+ }
695+
696+ #[ tokio:: test]
697+ async fn test_list_partition ( ) {
698+ let ( store, _) = make_test_store_and_state ( & [
699+ ( "tablepath/part1=p1v1/file.parquet" , 100 ) ,
700+ ( "tablepath/part1=p1v2/part2=p2v1/file1.parquet" , 100 ) ,
701+ ( "tablepath/part1=p1v2/part2=p2v1/file2.parquet" , 100 ) ,
702+ ( "tablepath/part1=p1v3/part2=p2v1/file3.parquet" , 100 ) ,
703+ ( "tablepath/part1=p1v2/part2=p2v2/file4.parquet" , 100 ) ,
704+ ( "tablepath/part1=p1v2/part2=p2v2/empty.parquet" , 0 ) ,
705+ ] ) ;
706+
707+ let partitions = list_partitions (
708+ store. as_ref ( ) ,
709+ & ListingTableUrl :: parse ( "file:///tablepath/" ) . unwrap ( ) ,
710+ 0 ,
711+ None ,
712+ )
713+ . await
714+ . expect ( "listing partitions failed" ) ;
715+
716+ assert_eq ! (
717+ & partitions
718+ . iter( )
719+ . map( describe_partition)
720+ . collect:: <Vec <_>>( ) ,
721+ & vec![
722+ ( "tablepath" , 0 , vec![ ] ) ,
723+ ( "tablepath/part1=p1v1" , 1 , vec![ ] ) ,
724+ ( "tablepath/part1=p1v2" , 1 , vec![ ] ) ,
725+ ( "tablepath/part1=p1v3" , 1 , vec![ ] ) ,
726+ ]
727+ ) ;
728+
729+ let partitions = list_partitions (
730+ store. as_ref ( ) ,
731+ & ListingTableUrl :: parse ( "file:///tablepath/" ) . unwrap ( ) ,
732+ 1 ,
733+ None ,
734+ )
735+ . await
736+ . expect ( "listing partitions failed" ) ;
737+
738+ assert_eq ! (
739+ & partitions
740+ . iter( )
741+ . map( describe_partition)
742+ . collect:: <Vec <_>>( ) ,
743+ & vec![
744+ ( "tablepath" , 0 , vec![ ] ) ,
745+ ( "tablepath/part1=p1v1" , 1 , vec![ "file.parquet" ] ) ,
746+ ( "tablepath/part1=p1v2" , 1 , vec![ ] ) ,
747+ ( "tablepath/part1=p1v2/part2=p2v1" , 2 , vec![ ] ) ,
748+ ( "tablepath/part1=p1v2/part2=p2v2" , 2 , vec![ ] ) ,
749+ ( "tablepath/part1=p1v3" , 1 , vec![ ] ) ,
750+ ( "tablepath/part1=p1v3/part2=p2v1" , 2 , vec![ ] ) ,
751+ ]
752+ ) ;
753+
754+ let partitions = list_partitions (
755+ store. as_ref ( ) ,
756+ & ListingTableUrl :: parse ( "file:///tablepath/" ) . unwrap ( ) ,
757+ 2 ,
758+ None ,
759+ )
760+ . await
761+ . expect ( "listing partitions failed" ) ;
762+
763+ assert_eq ! (
764+ & partitions
765+ . iter( )
766+ . map( describe_partition)
767+ . collect:: <Vec <_>>( ) ,
768+ & vec![
769+ ( "tablepath" , 0 , vec![ ] ) ,
770+ ( "tablepath/part1=p1v1" , 1 , vec![ "file.parquet" ] ) ,
771+ ( "tablepath/part1=p1v2" , 1 , vec![ ] ) ,
772+ ( "tablepath/part1=p1v3" , 1 , vec![ ] ) ,
773+ (
774+ "tablepath/part1=p1v2/part2=p2v1" ,
775+ 2 ,
776+ vec![ "file1.parquet" , "file2.parquet" ]
777+ ) ,
778+ ( "tablepath/part1=p1v2/part2=p2v2" , 2 , vec![ "file4.parquet" ] ) ,
779+ ( "tablepath/part1=p1v3/part2=p2v1" , 2 , vec![ "file3.parquet" ] ) ,
780+ ]
781+ ) ;
782+ }
783+
674784 #[ test]
675785 fn test_parse_partitions_for_path ( ) {
676786 assert_eq ! (
0 commit comments