@@ -948,118 +948,116 @@ impl PhysicalPlanner {
948948                    Arc :: new ( SortExec :: new ( LexOrdering :: new ( exprs?) ,  child) . with_fetch ( fetch) ) , 
949949                ) ) 
950950            } 
951-             OpStruct :: Scan ( scan)  => { 
951+             OpStruct :: NativeScan ( scan)  => { 
952952                let  data_types = scan. fields . iter ( ) . map ( to_arrow_datatype) . collect_vec ( ) ; 
953953
954-                 if  scan. source  == "CometScan parquet  (unknown)"  { 
955-                     let  data_schema = parse_message_type ( & scan. data_schema ) . unwrap ( ) ; 
956-                     let  required_schema = parse_message_type ( & scan. required_schema ) . unwrap ( ) ; 
957-                     println ! ( "data_schema: {:?}" ,  data_schema) ; 
958-                     println ! ( "required_schema: {:?}" ,  required_schema) ; 
959- 
960-                     let  data_schema_descriptor =
961-                         parquet:: schema:: types:: SchemaDescriptor :: new ( Arc :: new ( data_schema) ) ; 
962-                     let  data_schema_arrow = Arc :: new ( 
963-                         parquet:: arrow:: schema:: parquet_to_arrow_schema ( 
964-                             & data_schema_descriptor, 
965-                             None , 
966-                         ) 
967-                         . unwrap ( ) , 
968-                     ) ; 
969-                     println ! ( "data_schema_arrow: {:?}" ,  data_schema_arrow) ; 
970- 
971-                     let  required_schema_descriptor =
972-                         parquet:: schema:: types:: SchemaDescriptor :: new ( Arc :: new ( required_schema) ) ; 
973-                     let  required_schema_arrow = Arc :: new ( 
974-                         parquet:: arrow:: schema:: parquet_to_arrow_schema ( 
975-                             & required_schema_descriptor, 
976-                             None , 
977-                         ) 
954+                 println ! ( "NATIVE: SCAN: {:?}" ,  scan) ; 
955+                 let  data_schema = parse_message_type ( & * scan. data_schema ) . unwrap ( ) ; 
956+                 let  required_schema = parse_message_type ( & * scan. required_schema ) . unwrap ( ) ; 
957+                 println ! ( "data_schema: {:?}" ,  data_schema) ; 
958+                 println ! ( "required_schema: {:?}" ,  required_schema) ; 
959+ 
960+                 let  data_schema_descriptor =
961+                     parquet:: schema:: types:: SchemaDescriptor :: new ( Arc :: new ( data_schema) ) ; 
962+                 let  data_schema_arrow = Arc :: new ( 
963+                     parquet:: arrow:: schema:: parquet_to_arrow_schema ( & data_schema_descriptor,  None ) 
978964                        . unwrap ( ) , 
979-                     ) ; 
980-                     println ! ( "required_schema_arrow: {:?}" ,  required_schema_arrow) ; 
965+                 ) ; 
966+                 println ! ( "data_schema_arrow: {:?}" ,  data_schema_arrow) ; 
967+ 
968+                 let  required_schema_descriptor =
969+                     parquet:: schema:: types:: SchemaDescriptor :: new ( Arc :: new ( required_schema) ) ; 
970+                 let  required_schema_arrow = Arc :: new ( 
971+                     parquet:: arrow:: schema:: parquet_to_arrow_schema ( 
972+                         & required_schema_descriptor, 
973+                         None , 
974+                     ) 
975+                     . unwrap ( ) , 
976+                 ) ; 
977+                 println ! ( "required_schema_arrow: {:?}" ,  required_schema_arrow) ; 
981978
982-                      assert ! ( !required_schema_arrow. fields. is_empty( ) ) ; 
979+                 assert ! ( !required_schema_arrow. fields. is_empty( ) ) ; 
983980
984-                      let  mut  projection_vector:  Vec < usize >  =
985-                          Vec :: with_capacity ( required_schema_arrow. fields . len ( ) ) ; 
986-                      // TODO: could be faster with a hashmap rather than iterating over data_schema_arrow with index_of. 
987-                      required_schema_arrow. fields . iter ( ) . for_each ( |field| { 
988-                          projection_vector. push ( data_schema_arrow. index_of ( field. name ( ) ) . unwrap ( ) ) ; 
989-                      } ) ; 
990-                      println ! ( "projection_vector: {:?}" ,  projection_vector) ; 
981+                 let  mut  projection_vector:  Vec < usize >  =
982+                     Vec :: with_capacity ( required_schema_arrow. fields . len ( ) ) ; 
983+                 // TODO: could be faster with a hashmap rather than iterating over data_schema_arrow with index_of. 
984+                 required_schema_arrow. fields . iter ( ) . for_each ( |field| { 
985+                     projection_vector. push ( data_schema_arrow. index_of ( field. name ( ) ) . unwrap ( ) ) ; 
986+                 } ) ; 
987+                 println ! ( "projection_vector: {:?}" ,  projection_vector) ; 
991988
992-                      assert_eq ! ( projection_vector. len( ) ,  required_schema_arrow. fields. len( ) ) ; 
989+                 assert_eq ! ( projection_vector. len( ) ,  required_schema_arrow. fields. len( ) ) ; 
993990
994-                     // Convert the Spark expressions to Physical expressions 
995-                     let  data_filters:  Result < Vec < Arc < dyn  PhysicalExpr > > ,  ExecutionError >  = scan
996-                         . data_filters 
997-                         . iter ( ) 
998-                         . map ( |expr| self . create_expr ( expr,  Arc :: clone ( & required_schema_arrow) ) ) 
999-                         . collect ( ) ; 
1000- 
1001-                     // Create a conjunctive form of the vector because ParquetExecBuilder takes 
1002-                     // a single expression 
1003-                     let  data_filters = data_filters?; 
1004-                     let  test_data_filters =
1005-                         data_filters. clone ( ) . into_iter ( ) . reduce ( |left,  right| { 
1006-                             Arc :: new ( BinaryExpr :: new ( 
1007-                                 left, 
1008-                                 datafusion:: logical_expr:: Operator :: And , 
1009-                                 right, 
1010-                             ) ) 
1011-                         } ) ; 
1012- 
1013-                     println ! ( "data_filters: {:?}" ,  data_filters) ; 
1014-                     println ! ( "test_data_filters: {:?}" ,  test_data_filters) ; 
1015- 
1016-                     let  object_store_url = ObjectStoreUrl :: local_filesystem ( ) ; 
1017-                     let  paths:  Vec < Url >  = scan
1018-                         . path 
1019-                         . iter ( ) 
1020-                         . map ( |path| Url :: parse ( path) . unwrap ( ) ) 
1021-                         . collect ( ) ; 
991+                 // Convert the Spark expressions to Physical expressions 
992+                 let  data_filters:  Result < Vec < Arc < dyn  PhysicalExpr > > ,  ExecutionError >  = scan
993+                     . data_filters 
994+                     . iter ( ) 
995+                     . map ( |expr| self . create_expr ( expr,  Arc :: clone ( & required_schema_arrow) ) ) 
996+                     . collect ( ) ; 
1022997
1023-                     let  object_store = object_store:: local:: LocalFileSystem :: new ( ) ; 
1024-                     // register the object store with the runtime environment 
1025-                     let  url = Url :: try_from ( "file://" ) . unwrap ( ) ; 
1026-                     self . session_ctx 
1027-                         . runtime_env ( ) 
1028-                         . register_object_store ( & url,  Arc :: new ( object_store) ) ; 
998+                 // Create a conjunctive form of the vector because ParquetExecBuilder takes 
999+                 // a single expression 
1000+                 let  data_filters = data_filters?; 
1001+                 let  test_data_filters = data_filters. clone ( ) . into_iter ( ) . reduce ( |left,  right| { 
1002+                     Arc :: new ( BinaryExpr :: new ( 
1003+                         left, 
1004+                         datafusion:: logical_expr:: Operator :: And , 
1005+                         right, 
1006+                     ) ) 
1007+                 } ) ; 
10291008
1030-                     let  files:  Vec < PartitionedFile >  = paths
1031-                         . iter ( ) 
1032-                         . map ( |path| PartitionedFile :: from_path ( path. path ( ) . to_string ( ) ) . unwrap ( ) ) 
1033-                         . collect ( ) ; 
1009+                 println ! ( "data_filters: {:?}" ,  data_filters) ; 
1010+                 println ! ( "test_data_filters: {:?}" ,  test_data_filters) ; 
10341011
1035-                     // partition the files 
1036-                     // TODO really should partition the row groups 
1012+                 let  object_store_url = ObjectStoreUrl :: local_filesystem ( ) ; 
1013+                 let  paths:  Vec < Url >  = scan
1014+                     . path 
1015+                     . iter ( ) 
1016+                     . map ( |path| Url :: parse ( path) . unwrap ( ) ) 
1017+                     . collect ( ) ; 
10371018
1038-                     let  mut  file_groups = vec ! [ vec![ ] ;  partition_count] ; 
1039-                     files. iter ( ) . enumerate ( ) . for_each ( |( idx,  file) | { 
1040-                         file_groups[ idx % partition_count] . push ( file. clone ( ) ) ; 
1041-                     } ) ; 
1019+                 let  object_store = object_store:: local:: LocalFileSystem :: new ( ) ; 
1020+                 // register the object store with the runtime environment 
1021+                 let  url = Url :: try_from ( "file://" ) . unwrap ( ) ; 
1022+                 self . session_ctx 
1023+                     . runtime_env ( ) 
1024+                     . register_object_store ( & url,  Arc :: new ( object_store) ) ; 
10421025
1043-                      let  file_scan_config = 
1044-                          FileScanConfig :: new ( object_store_url ,   Arc :: clone ( & data_schema_arrow ) ) 
1045-                              . with_file_groups ( file_groups ) 
1046-                              . with_projection ( Some ( projection_vector ) ) ; 
1026+                 let  files :   Vec < PartitionedFile >  = paths 
1027+                     . iter ( ) 
1028+                     . map ( |path|  PartitionedFile :: from_path ( path . path ( ) . to_string ( ) ) . unwrap ( ) ) 
1029+                     . collect ( ) ; 
10471030
1048-                     let  mut  table_parquet_options = TableParquetOptions :: new ( ) ; 
1049-                     table_parquet_options. global . pushdown_filters  = true ; 
1050-                     table_parquet_options. global . reorder_filters  = true ; 
1031+                 // partition the files 
1032+                 // TODO really should partition the row groups 
10511033
1052-                     let  mut  builder = ParquetExecBuilder :: new ( file_scan_config) 
1053-                         . with_table_parquet_options ( table_parquet_options) ; 
1034+                 let  mut  file_groups = vec ! [ vec![ ] ;  partition_count] ; 
1035+                 files. iter ( ) . enumerate ( ) . for_each ( |( idx,  file) | { 
1036+                     file_groups[ idx % partition_count] . push ( file. clone ( ) ) ; 
1037+                 } ) ; 
10541038
1055-                     if  let  Some ( filter)  = test_data_filters { 
1056-                         builder = builder. with_predicate ( filter) ; 
1057-                     } 
1039+                 let  file_scan_config =
1040+                     FileScanConfig :: new ( object_store_url,  Arc :: clone ( & data_schema_arrow) ) 
1041+                         . with_file_groups ( file_groups) 
1042+                         . with_projection ( Some ( projection_vector) ) ; 
10581043
1059-                     let  scan = builder. build ( ) ; 
1060-                     return  Ok ( ( vec ! [ ] ,  Arc :: new ( scan) ) ) ; 
1044+                 let  mut  table_parquet_options = TableParquetOptions :: new ( ) ; 
1045+                 table_parquet_options. global . pushdown_filters  = true ; 
1046+                 table_parquet_options. global . reorder_filters  = true ; 
1047+ 
1048+                 let  mut  builder = ParquetExecBuilder :: new ( file_scan_config) 
1049+                     . with_table_parquet_options ( table_parquet_options) ; 
1050+ 
1051+                 if  let  Some ( filter)  = test_data_filters { 
1052+                     builder = builder. with_predicate ( filter) ; 
10611053                } 
10621054
1055+                 let  scan = builder. build ( ) ; 
1056+                 return  Ok ( ( vec ! [ ] ,  Arc :: new ( scan) ) ) ; 
1057+             } 
1058+             OpStruct :: Scan ( scan)  => { 
1059+                 let  data_types = scan. fields . iter ( ) . map ( to_arrow_datatype) . collect_vec ( ) ; 
1060+ 
10631061                // If it is not test execution context for unit test, we should have at least one 
10641062                // input source 
10651063                if  self . exec_context_id  != TEST_EXEC_CONTEXT_ID  && inputs. is_empty ( )  { 
0 commit comments