@@ -42,7 +42,7 @@ use arrow::datatypes::{
4242 Float32Type as ArrowFloat32Type , Float64Type as ArrowFloat64Type ,
4343 Int16Type as ArrowInt16Type , Int32Type as ArrowInt32Type ,
4444 Int64Type as ArrowInt64Type , Int8Type as ArrowInt8Type , IntervalUnit , Schema ,
45- Time32MillisecondType as ArrowTime32MillisecondType ,
45+ SchemaRef , Time32MillisecondType as ArrowTime32MillisecondType ,
4646 Time32SecondType as ArrowTime32SecondType ,
4747 Time64MicrosecondType as ArrowTime64MicrosecondType ,
4848 Time64NanosecondType as ArrowTime64NanosecondType , TimeUnit as ArrowTimeUnit ,
@@ -91,7 +91,7 @@ pub use byte_array::make_byte_array_reader;
9191pub use byte_array_dictionary:: make_byte_array_dictionary_reader;
9292
9393/// Array reader reads parquet data into arrow array.
94- pub trait ArrayReader {
94+ pub trait ArrayReader : Send {
9595 fn as_any ( & self ) -> & dyn Any ;
9696
9797 /// Returns the arrow type of this array reader.
@@ -117,6 +117,26 @@ pub trait ArrayReader {
117117 fn get_rep_levels ( & self ) -> Option < & [ i16 ] > ;
118118}
119119
120+ /// A collection of row groups
121+ pub trait RowGroupCollection {
122+ /// Get schema of parquet file.
123+ fn schema ( & self ) -> Result < SchemaDescPtr > ;
124+
125+ /// Returns an iterator over the column chunks for particular column
126+ fn column_chunks ( & self , i : usize ) -> Result < Box < dyn PageIterator > > ;
127+ }
128+
129+ impl RowGroupCollection for Arc < dyn FileReader > {
130+ fn schema ( & self ) -> Result < SchemaDescPtr > {
131+ Ok ( self . metadata ( ) . file_metadata ( ) . schema_descr_ptr ( ) )
132+ }
133+
134+ fn column_chunks ( & self , column_index : usize ) -> Result < Box < dyn PageIterator > > {
135+ let iterator = FilePageIterator :: new ( column_index, Arc :: clone ( self ) ) ?;
136+ Ok ( Box :: new ( iterator) )
137+ }
138+ }
139+
120140/// Uses `record_reader` to read up to `batch_size` records from `pages`
121141///
122142/// Returns the number of records read, which can be less than batch_size if
@@ -478,7 +498,7 @@ where
478498impl < T , C > ArrayReader for ComplexObjectArrayReader < T , C >
479499where
480500 T : DataType ,
481- C : Converter < Vec < Option < T :: T > > , ArrayRef > + ' static ,
501+ C : Converter < Vec < Option < T :: T > > , ArrayRef > + Send + ' static ,
482502{
483503 fn as_any ( & self ) -> & dyn Any {
484504 self
@@ -1311,9 +1331,9 @@ impl ArrayReader for StructArrayReader {
13111331/// Create array reader from parquet schema, column indices, and parquet file reader.
13121332pub fn build_array_reader < T > (
13131333 parquet_schema : SchemaDescPtr ,
1314- arrow_schema : Schema ,
1334+ arrow_schema : SchemaRef ,
13151335 column_indices : T ,
1316- file_reader : Arc < dyn FileReader > ,
1336+ row_groups : Box < dyn RowGroupCollection > ,
13171337) -> Result < Box < dyn ArrayReader > >
13181338where
13191339 T : IntoIterator < Item = usize > ,
@@ -1351,13 +1371,8 @@ where
13511371 fields : filtered_root_fields,
13521372 } ;
13531373
1354- ArrayReaderBuilder :: new (
1355- Arc :: new ( proj) ,
1356- Arc :: new ( arrow_schema) ,
1357- Arc :: new ( leaves) ,
1358- file_reader,
1359- )
1360- . build_array_reader ( )
1374+ ArrayReaderBuilder :: new ( Arc :: new ( proj) , arrow_schema, Arc :: new ( leaves) , row_groups)
1375+ . build_array_reader ( )
13611376}
13621377
13631378/// Used to build array reader.
@@ -1367,7 +1382,7 @@ struct ArrayReaderBuilder {
13671382 // Key: columns that need to be included in final array builder
13681383 // Value: column index in schema
13691384 columns_included : Arc < HashMap < * const Type , usize > > ,
1370- file_reader : Arc < dyn FileReader > ,
1385+ row_groups : Box < dyn RowGroupCollection > ,
13711386}
13721387
13731388/// Used in type visitor.
@@ -1667,13 +1682,13 @@ impl<'a> ArrayReaderBuilder {
16671682 root_schema : TypePtr ,
16681683 arrow_schema : Arc < Schema > ,
16691684 columns_included : Arc < HashMap < * const Type , usize > > ,
1670- file_reader : Arc < dyn FileReader > ,
1685+ file_reader : Box < dyn RowGroupCollection > ,
16711686 ) -> Self {
16721687 Self {
16731688 root_schema,
16741689 arrow_schema,
16751690 columns_included,
1676- file_reader,
1691+ row_groups : file_reader,
16771692 }
16781693 }
16791694
@@ -1707,10 +1722,10 @@ impl<'a> ArrayReaderBuilder {
17071722 context. rep_level ,
17081723 context. path . clone ( ) ,
17091724 ) ) ;
1710- let page_iterator = Box :: new ( FilePageIterator :: new (
1711- self . columns_included [ & ( cur_type . as_ref ( ) as * const Type ) ] ,
1712- self . file_reader . clone ( ) ,
1713- ) ? ) ;
1725+
1726+ let page_iterator = self
1727+ . row_groups
1728+ . column_chunks ( self . columns_included [ & ( cur_type . as_ref ( ) as * const Type ) ] ) ? ;
17141729
17151730 let arrow_type: Option < ArrowType > = self
17161731 . get_arrow_field ( & cur_type, context)
@@ -2823,7 +2838,8 @@ mod tests {
28232838 #[ test]
28242839 fn test_create_array_reader ( ) {
28252840 let file = get_test_file ( "nulls.snappy.parquet" ) ;
2826- let file_reader = Arc :: new ( SerializedFileReader :: new ( file) . unwrap ( ) ) ;
2841+ let file_reader: Arc < dyn FileReader > =
2842+ Arc :: new ( SerializedFileReader :: new ( file) . unwrap ( ) ) ;
28272843
28282844 let file_metadata = file_reader. metadata ( ) . file_metadata ( ) ;
28292845 let arrow_schema = parquet_to_arrow_schema (
@@ -2834,9 +2850,9 @@ mod tests {
28342850
28352851 let array_reader = build_array_reader (
28362852 file_reader. metadata ( ) . file_metadata ( ) . schema_descr_ptr ( ) ,
2837- arrow_schema,
2853+ Arc :: new ( arrow_schema) ,
28382854 vec ! [ 0usize ] . into_iter ( ) ,
2839- file_reader,
2855+ Box :: new ( file_reader) ,
28402856 )
28412857 . unwrap ( ) ;
28422858
0 commit comments