@@ -621,8 +621,10 @@ fn get_batch_reader<'a>(handle: jlong) -> Result<&'a mut ParquetRecordBatchReade
621621    Ok ( & mut  get_batch_context ( handle) ?. batch_reader ) 
622622} 
623623
624+ /// # Safety 
625+ /// This function is inherently unsafe since it deals with raw pointers passed from JNI. 
624626#[ no_mangle]  
625- pub  extern  "system"  fn  Java_org_apache_comet_parquet_Native_initRecordBatchReader ( 
627+ pub  unsafe   extern  "system"  fn  Java_org_apache_comet_parquet_Native_initRecordBatchReader ( 
626628    e :  JNIEnv , 
627629    _jclass :  JClass , 
628630    file_path :  jstring , 
@@ -646,62 +648,66 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBatchReade
646648            . unwrap ( ) 
647649            . with_batch_size ( 8192 ) ;  // TODO: (ARROW NATIVE) Use batch size configured in JVM 
648650
651+         let  num_row_groups; 
652+         let  mut  total_rows:  i64  = 0 ; 
649653        //TODO: (ARROW NATIVE) if we can get the ParquetMetadata serialized, we need not do this. 
650-         let  metadata = builder. metadata ( ) . clone ( ) ; 
651- 
652-         let  mut  columns_to_read:  Vec < usize >  = Vec :: new ( ) ; 
653-         let  columns_to_read_array = JObjectArray :: from_raw ( required_columns) ; 
654-         let  array_len = env. get_array_length ( & columns_to_read_array) ?; 
655-         let  mut  required_columns:  Vec < String >  = Vec :: new ( ) ; 
656-         for  i in  0 ..array_len { 
657-             let  p:  JString  = env
658-                 . get_object_array_element ( & columns_to_read_array,  i) ?
659-                 . into ( ) ; 
660-             required_columns. push ( env. get_string ( & p) ?. into ( ) ) ; 
661-         } 
662-         for  ( i,  col)  in  metadata
663-             . file_metadata ( ) 
664-             . schema_descr ( ) 
665-             . columns ( ) 
666-             . iter ( ) 
667-             . enumerate ( ) 
668654        { 
669-             for  ( _,  required)  in  required_columns. iter ( ) . enumerate ( )  { 
670-                 if  col. name ( ) . to_uppercase ( ) . eq ( & required. to_uppercase ( ) )  { 
671-                     columns_to_read. push ( i) ; 
672-                     break ; 
655+             let  metadata = builder. metadata ( ) ; 
656+ 
657+             let  mut  columns_to_read:  Vec < usize >  = Vec :: new ( ) ; 
658+             let  columns_to_read_array = JObjectArray :: from_raw ( required_columns) ; 
659+             let  array_len = env. get_array_length ( & columns_to_read_array) ?; 
660+             let  mut  required_columns:  Vec < String >  = Vec :: new ( ) ; 
661+             for  i in  0 ..array_len { 
662+                 let  p:  JString  = env
663+                     . get_object_array_element ( & columns_to_read_array,  i) ?
664+                     . into ( ) ; 
665+                 required_columns. push ( env. get_string ( & p) ?. into ( ) ) ; 
666+             } 
667+             for  ( i,  col)  in  metadata
668+                 . file_metadata ( ) 
669+                 . schema_descr ( ) 
670+                 . columns ( ) 
671+                 . iter ( ) 
672+                 . enumerate ( ) 
673+             { 
674+                 for  required in  required_columns. iter ( )  { 
675+                     if  col. name ( ) . to_uppercase ( ) . eq ( & required. to_uppercase ( ) )  { 
676+                         columns_to_read. push ( i) ; 
677+                         break ; 
678+                     } 
673679                } 
674680            } 
675-         } 
676-         //TODO: (ARROW NATIVE) make this work for complex types (especially deeply nested structs) 
677-         let  mask = ProjectionMask :: leaves ( metadata. file_metadata ( ) . schema_descr ( ) ,  columns_to_read) ; 
678-         // Set projection mask to read only root columns 1 and 2. 
679-         builder = builder. with_projection ( mask) ; 
680- 
681-         let  mut  row_groups_to_read:  Vec < usize >  = Vec :: new ( ) ; 
682-         let  mut  total_rows:  i64  = 0 ; 
683-         // get row groups - 
684-         for  ( i,  rg)  in  metadata. row_groups ( ) . into_iter ( ) . enumerate ( )  { 
685-             let  rg_start = rg. file_offset ( ) . unwrap ( ) ; 
686-             let  rg_end = rg_start + rg. compressed_size ( ) ; 
687-             if  rg_start >= start && rg_end <= start + length { 
688-                 row_groups_to_read. push ( i) ; 
689-                 total_rows += rg. num_rows ( ) ; 
681+             //TODO: (ARROW NATIVE) make this work for complex types (especially deeply nested structs) 
682+             let  mask =
683+                 ProjectionMask :: leaves ( metadata. file_metadata ( ) . schema_descr ( ) ,  columns_to_read) ; 
684+             // Set projection mask to read only root columns 1 and 2. 
685+ 
686+             let  mut  row_groups_to_read:  Vec < usize >  = Vec :: new ( ) ; 
687+             // get row groups - 
688+             for  ( i,  rg)  in  metadata. row_groups ( ) . iter ( ) . enumerate ( )  { 
689+                 let  rg_start = rg. file_offset ( ) . unwrap ( ) ; 
690+                 let  rg_end = rg_start + rg. compressed_size ( ) ; 
691+                 if  rg_start >= start && rg_end <= start + length { 
692+                     row_groups_to_read. push ( i) ; 
693+                     total_rows += rg. num_rows ( ) ; 
694+                 } 
690695            } 
696+             num_row_groups = row_groups_to_read. len ( ) ; 
697+             builder = builder
698+                 . with_projection ( mask) 
699+                 . with_row_groups ( row_groups_to_read. clone ( ) ) 
691700        } 
692701
693702        // Build a sync parquet reader. 
694-         let  batch_reader = builder
695-             . with_row_groups ( row_groups_to_read. clone ( ) ) 
696-             . build ( ) 
697-             . unwrap ( ) ; 
703+         let  batch_reader = builder. build ( ) . unwrap ( ) ; 
698704
699705        let  ctx = BatchContext  { 
700706            batch_reader, 
701707            current_batch :  None , 
702708            reader_state :  ParquetReaderState :: Init , 
703-             num_row_groups :  row_groups_to_read . len ( )  as  i32 , 
704-             total_rows :  total_rows , 
709+             num_row_groups :  num_row_groups  as  i32 , 
710+             total_rows, 
705711        } ; 
706712        let  res = Box :: new ( ctx) ; 
707713        Ok ( Box :: into_raw ( res)  as  i64 ) 
0 commit comments