@@ -28,13 +28,14 @@ use arrow::datatypes::{DataType, Field, Schema};
2828use  arrow:: record_batch:: RecordBatch ; 
2929
3030use  parquet:: basic; 
31+ use  parquet:: column:: reader:: * ; 
32+ use  parquet:: data_type:: ByteArray ; 
3133use  parquet:: file:: reader:: * ; 
32- use  parquet:: record:: { Row ,  RowAccessor } ; 
3334use  parquet:: schema:: types:: Type ; 
3435
3536use  crate :: datasource:: { RecordBatchIterator ,  Table } ; 
3637use  crate :: execution:: error:: { ExecutionError ,  Result } ; 
37- use  arrow:: builder:: { BinaryBuilder ,  Float32Builder ,   Float64Builder ,  Int32Builder } ; 
38+ use  arrow:: builder:: { BinaryBuilder ,  Float64Builder ,  Int32Builder } ; 
3839
3940pub  struct  ParquetTable  { 
4041    filename :  String , 
@@ -54,7 +55,6 @@ impl ParquetTable {
5455} 
5556
5657impl  Table  for  ParquetTable  { 
57- 
5858    fn  schema ( & self )  -> & Arc < Schema >  { 
5959        & self . schema 
6060    } 
@@ -78,10 +78,13 @@ pub struct ParquetFile {
7878    projection :  Vec < usize > , 
7979    batch_size :  usize , 
8080    current_row_group :  Option < Box < RowGroupReader > > , 
81+     column_readers :  Vec < ColumnReader > , 
8182} 
8283
8384impl  ParquetFile  { 
8485    pub  fn  open ( file :  File ,  projection :  Option < Vec < usize > > )  -> Result < Self >  { 
86+         println ! ( "open()" ) ; 
87+ 
8588        let  reader = SerializedFileReader :: new ( file) . unwrap ( ) ; 
8689
8790        let  metadata = reader. metadata ( ) ; 
@@ -103,13 +106,22 @@ impl ParquetFile {
103106                    } 
104107                } ; 
105108
109+                 let  projected_fields:  Vec < Field >  = projection
110+                     . iter ( ) 
111+                     . map ( |i| schema. fields ( ) [ * i] . clone ( ) ) 
112+                     . collect ( ) ; 
113+ 
114+                 let  projected_schema = Arc :: new ( Schema :: new ( projected_fields) ) ; 
115+                 println ! ( "projected schema: {:?}" ,  projected_schema) ; 
116+ 
106117                Ok ( ParquetFile  { 
107118                    reader :  reader, 
108119                    row_group_index :  0 , 
109-                     schema :  Arc :: new ( schema ) , 
120+                     schema :  projected_schema , 
110121                    projection, 
111122                    batch_size :  64  *  1024 , 
112123                    current_row_group :  None , 
124+                     column_readers :  vec ! [ ] , 
113125                } ) 
114126            } 
115127            _ => Err ( ExecutionError :: General ( 
@@ -120,7 +132,17 @@ impl ParquetFile {
120132
121133    fn  load_next_row_group ( & mut  self )  { 
122134        if  self . row_group_index  < self . reader . num_row_groups ( )  { 
135+             //println!("Loading row group {} of {}", self.row_group_index, self.reader.num_row_groups()); 
123136            let  reader = self . reader . get_row_group ( self . row_group_index ) . unwrap ( ) ; 
137+ 
138+             self . column_readers  = vec ! [ ] ; 
139+ 
140+             for  i in  & self . projection  { 
141+                 //TODO validate index in bounds 
142+                 self . column_readers 
143+                     . push ( reader. get_column_reader ( * i) . unwrap ( ) ) ; 
144+             } 
145+ 
124146            self . current_row_group  = Some ( reader) ; 
125147            self . row_group_index  += 1 ; 
126148        }  else  { 
@@ -129,86 +151,134 @@ impl ParquetFile {
129151    } 
130152
131153    fn  load_batch ( & mut  self )  -> Result < Option < RecordBatch > >  { 
154+         println ! ( "load_batch()" ) ; 
132155        match  & self . current_row_group  { 
133156            Some ( reader)  => { 
134-                 // read batch of rows into memory 
157+                 let  mut  batch:  Vec < Arc < Array > >  = Vec :: with_capacity ( reader. num_columns ( ) ) ; 
158+                 let  mut  row_count = 0 ; 
159+                 for  i in  0 ..self . column_readers . len ( )  { 
160+                     let  array:  Arc < Array >  = match  self . column_readers [ i]  { 
161+                         ColumnReader :: BoolColumnReader ( ref  mut  _r)  => { 
162+                             return  Err ( ExecutionError :: NotImplemented ( 
163+                                 "unsupported column reader type (BOOL)" . to_string ( ) , 
164+                             ) ) ; 
165+                         } 
166+                         ColumnReader :: Int32ColumnReader ( ref  mut  r)  => { 
167+                             let  mut  read_buffer:  Vec < i32 >  =
168+                                 Vec :: with_capacity ( self . batch_size ) ; 
135169
136-                 //                let parquet_projection = self.projection.iter().map(|i| reader.metadata().schema_descr().column(*i)).collect(); 
170+                             for  _ in  0 ..self . batch_size  { 
171+                                 read_buffer. push ( 0 ) ; 
172+                             } 
137173
138-                 let   mut  row_iter = reader . get_row_iter ( None ) . unwrap ( ) ;   //TODO projection push down 
139-                 let   mut  rows :   Vec < Row >  =  Vec :: with_capacity ( self . batch_size ) ; 
140-                 while   let   Some ( row )  = row_iter . next ( )   { 
141-                     if  rows . len ( )  ==  self . batch_size   { 
142-                         break ; 
143-                     } 
144-                     rows . push ( row ) ; 
145-                 } 
146-                 println ! ( "Loaded  {} rows into memory " ,  rows . len ( ) ) ; 
147- 
148-                 // convert to columnar batch 
149-                 let   mut  batch :   Vec < Arc < Array > >  = 
150-                     Vec :: with_capacity ( self . projection . len ( ) ) ; 
151-                 for  i  in   & self . projection   { 
152-                     let  array :   Arc < Array >  =  match   self . schema . field ( * i ) . data_type ( )  { 
153-                         DataType :: Int32  =>  { 
154-                             let   mut  builder =  Int32Builder :: new ( rows . len ( ) ) ; 
155-                             for  row  in   & rows  { 
156-                                 //TODO null handling 
157-                                 builder . append_value ( row . get_int ( * i ) . unwrap ( ) ) . unwrap ( ) ; 
174+                              match  r . read_batch ( 
175+                                  self . batch_size , 
176+                                  None , 
177+                                  None , 
178+                                  & mut  read_buffer , 
179+                              )   { 
180+                                  //TODO this isn't handling null values 
181+                                  Ok ( ( count ,  _ ) )  =>  { 
182+                                      println ! ( "Read  {} rows" ,  count ) ; 
183+                                      let   mut  builder =  Int32Builder :: new ( count ) ; 
184+                                     builder . append_slice ( & read_buffer [ 0 ..count ] ) . unwrap ( ) ; 
185+                                     row_count = count ; 
186+                                      Arc :: new ( builder . finish ( ) ) 
187+                                  } 
188+                                 _ =>  { 
189+                                      return   Err ( ExecutionError :: NotImplemented ( format ! ( 
190+                                          "Error reading parquet batch (column {})" , 
191+                                         i 
192+                                      ) ) ) ; 
193+                                 } 
158194                            } 
159-                             Arc :: new ( builder. finish ( ) ) 
160195                        } 
161-                         DataType :: Float32  => { 
162-                             let  mut  builder = Float32Builder :: new ( rows. len ( ) ) ; 
163-                             for  row in  & rows { 
164-                                 //TODO null handling 
165-                                 builder. append_value ( row. get_float ( * i) . unwrap ( ) ) . unwrap ( ) ; 
166-                             } 
167-                             Arc :: new ( builder. finish ( ) ) 
196+                         ColumnReader :: Int64ColumnReader ( ref  mut  _r)  => { 
197+                             return  Err ( ExecutionError :: NotImplemented ( 
198+                                 "unsupported column reader type (INT64)" . to_string ( ) , 
199+                             ) ) ; 
168200                        } 
169-                         DataType :: Float64  => { 
170-                             let  mut  builder = Float64Builder :: new ( rows. len ( ) ) ; 
171-                             for  row in  & rows { 
172-                                 //TODO null handling 
173-                                 builder
174-                                     . append_value ( row. get_double ( * i) . unwrap ( ) ) 
175-                                     . unwrap ( ) ; 
176-                             } 
177-                             Arc :: new ( builder. finish ( ) ) 
201+                         ColumnReader :: Int96ColumnReader ( ref  mut  _r)  => { 
202+                             return  Err ( ExecutionError :: NotImplemented ( 
203+                                 "unsupported column reader type (INT96)" . to_string ( ) , 
204+                             ) ) ; 
178205                        } 
179-                         DataType :: Utf8  => { 
180-                             let  mut  builder = BinaryBuilder :: new ( rows. len ( ) ) ; 
181-                             for  row in  & rows { 
182-                                 //TODO null handling 
183-                                 let  bytes = row. get_bytes ( * i) . unwrap ( ) ; 
184-                                 builder
185-                                     . append_string ( 
186-                                         & String :: from_utf8 ( bytes. data ( ) . to_vec ( ) ) 
187-                                             . unwrap ( ) , 
188-                                     ) 
189-                                     . unwrap ( ) ; 
206+                         ColumnReader :: FloatColumnReader ( ref  mut  _r)  => { 
207+                             return  Err ( ExecutionError :: NotImplemented ( 
208+                                 "unsupported column reader type (FLOAT)" . to_string ( ) , 
209+                             ) ) ; 
210+                         } 
211+                         ColumnReader :: DoubleColumnReader ( ref  mut  r)  => { 
212+                             let  mut  builder = Float64Builder :: new ( self . batch_size ) ; 
213+                             let  mut  read_buffer:  Vec < f64 >  =
214+                                 Vec :: with_capacity ( self . batch_size ) ; 
215+                             match  r. read_batch ( 
216+                                 self . batch_size , 
217+                                 None , 
218+                                 None , 
219+                                 & mut  read_buffer, 
220+                             )  { 
221+                                 //TODO this isn't handling null values 
222+                                 Ok ( ( count,  _) )  => { 
223+                                     builder. append_slice ( & read_buffer) . unwrap ( ) ; 
224+                                     row_count = count; 
225+                                     Arc :: new ( builder. finish ( ) ) 
226+                                 } 
227+                                 _ => { 
228+                                     return  Err ( ExecutionError :: NotImplemented ( format ! ( 
229+                                         "Error reading parquet batch (column {})" , 
230+                                         i
231+                                     ) ) ) ; 
232+                                 } 
190233                            } 
191-                             Arc :: new ( builder. finish ( ) ) 
192234                        } 
193-                         other => { 
194-                             return  Err ( ExecutionError :: NotImplemented ( format ! ( 
195-                                 "unsupported column reader type ({:?})" , 
196-                                 other
197-                             ) ) ) ; 
235+                         ColumnReader :: FixedLenByteArrayColumnReader ( ref  mut  _r)  => { 
236+                             return  Err ( ExecutionError :: NotImplemented ( 
237+                                 "unsupported column reader type (FixedLenByteArray)" 
238+                                     . to_string ( ) , 
239+                             ) ) ; 
240+                         } 
241+                         ColumnReader :: ByteArrayColumnReader ( ref  mut  r)  => { 
242+                             let  mut  b:  Vec < ByteArray >  =
243+                                 Vec :: with_capacity ( self . batch_size ) ; 
244+                             for  _ in  0 ..self . batch_size  { 
245+                                 b. push ( ByteArray :: default ( ) ) ; 
246+                             } 
247+                             match  r. read_batch ( self . batch_size ,  None ,  None ,  & mut  b)  { 
248+                                 //TODO this isn't handling null values 
249+                                 Ok ( ( count,  _) )  => { 
250+                                     row_count = count; 
251+                                     //TODO this is horribly inefficient 
252+                                     let  mut  builder = BinaryBuilder :: new ( row_count) ; 
253+                                     for  j in  0 ..row_count { 
254+                                         let  foo = b[ j] . slice ( 0 ,  b[ j] . len ( ) ) ; 
255+                                         let  bytes:  & [ u8 ]  = foo. data ( ) ; 
256+                                         let  str =
257+                                             String :: from_utf8 ( bytes. to_vec ( ) ) . unwrap ( ) ; 
258+                                         builder. append_string ( & str) . unwrap ( ) ; 
259+                                     } 
260+                                     Arc :: new ( builder. finish ( ) ) 
261+                                 } 
262+                                 _ => { 
263+                                     return  Err ( ExecutionError :: NotImplemented ( format ! ( 
264+                                         "Error reading parquet batch (column {})" , 
265+                                         i
266+                                     ) ) ) ; 
267+                                 } 
268+                             } 
198269                        } 
199270                    } ; 
271+ 
272+                     println ! ( "Adding array to batch" ) ; 
200273                    batch. push ( array) ; 
201274                } 
202275
203-                 println ! ( "Loaded batch of {} rows" ,  rows . len ( ) ) ; 
276+                 println ! ( "Loaded batch of {} rows" ,  row_count ) ; 
204277
205-                 if  rows . len ( )  == 0  { 
278+                 if  row_count  == 0  { 
206279                    Ok ( None ) 
207280                }  else  { 
208-                     Ok ( Some ( RecordBatch :: try_new ( 
209-                         self . schema . projection ( & self . projection ) ?, 
210-                         batch, 
211-                     ) ?) ) 
281+                     Ok ( Some ( RecordBatch :: try_new ( self . schema . clone ( ) ,  batch) ?) ) 
212282                } 
213283            } 
214284            _ => Ok ( None ) , 
@@ -342,6 +412,7 @@ mod tests {
342412    } 
343413
344414    fn  load_table ( name :  & str )  -> Box < Table >  { 
415+         println ! ( "load_table" ) ; 
345416        let  testdata = env:: var ( "PARQUET_TEST_DATA" ) . unwrap ( ) ; 
346417        let  filename = format ! ( "{}/{}" ,  testdata,  name) ; 
347418        let  table = ParquetTable :: new ( & filename) ; 
0 commit comments