@@ -28,10 +28,13 @@ use datafusion::prelude::{SessionConfig, SessionContext};
2828use datafusion_common:: tree_node:: { Transformed , TransformedResult , TreeNode } ;
2929use datafusion_common:: DataFusionError ;
3030use datafusion_common:: { ColumnStatistics , ScalarValue } ;
31+ use datafusion_datasource:: file:: FileSource ;
32+ use datafusion_datasource:: file_scan_config:: FileScanConfigBuilder ;
3133use datafusion_datasource:: schema_adapter:: {
3234 DefaultSchemaAdapterFactory , SchemaAdapter , SchemaAdapterFactory , SchemaMapper ,
3335} ;
3436use datafusion_datasource:: ListingTableUrl ;
37+ use datafusion_datasource_parquet:: source:: ParquetSource ;
3538use datafusion_execution:: object_store:: ObjectStoreUrl ;
3639use datafusion_physical_expr:: expressions:: { self , Column } ;
3740use datafusion_physical_expr:: PhysicalExpr ;
@@ -372,3 +375,177 @@ async fn test_custom_schema_adapter_and_custom_expression_adapter() {
372375 ] ;
373376 assert_batches_eq ! ( expected, & batches) ;
374377}
378+
379+ /// A test schema adapter factory that adds prefix to column names
380+ #[ derive( Debug ) ]
381+ struct PrefixAdapterFactory {
382+ prefix : String ,
383+ }
384+
385+ impl SchemaAdapterFactory for PrefixAdapterFactory {
386+ fn create (
387+ & self ,
388+ projected_table_schema : SchemaRef ,
389+ _table_schema : SchemaRef ,
390+ ) -> Box < dyn SchemaAdapter > {
391+ Box :: new ( PrefixAdapter {
392+ input_schema : projected_table_schema,
393+ prefix : self . prefix . clone ( ) ,
394+ } )
395+ }
396+ }
397+
398+ /// A test schema adapter that adds prefix to column names
399+ #[ derive( Debug ) ]
400+ struct PrefixAdapter {
401+ input_schema : SchemaRef ,
402+ prefix : String ,
403+ }
404+
405+ impl SchemaAdapter for PrefixAdapter {
406+ fn map_column_index ( & self , index : usize , file_schema : & Schema ) -> Option < usize > {
407+ let field = self . input_schema . field ( index) ;
408+ file_schema. fields . find ( field. name ( ) ) . map ( |( i, _) | i)
409+ }
410+
411+ fn map_schema (
412+ & self ,
413+ file_schema : & Schema ,
414+ ) -> Result < ( Arc < dyn SchemaMapper > , Vec < usize > ) > {
415+ let mut projection = Vec :: with_capacity ( file_schema. fields ( ) . len ( ) ) ;
416+ for ( file_idx, file_field) in file_schema. fields ( ) . iter ( ) . enumerate ( ) {
417+ if self . input_schema . fields ( ) . find ( file_field. name ( ) ) . is_some ( ) {
418+ projection. push ( file_idx) ;
419+ }
420+ }
421+
422+ // Create a schema mapper that adds a prefix to column names
423+ #[ derive( Debug ) ]
424+ struct PrefixSchemaMapping {
425+ // Keep only the prefix field which is actually used in the implementation
426+ prefix : String ,
427+ }
428+
429+ impl SchemaMapper for PrefixSchemaMapping {
430+ fn map_batch ( & self , batch : RecordBatch ) -> Result < RecordBatch > {
431+ // Create a new schema with prefixed field names
432+ let prefixed_fields: Vec < Field > = batch
433+ . schema ( )
434+ . fields ( )
435+ . iter ( )
436+ . map ( |field| {
437+ Field :: new (
438+ format ! ( "{}{}" , self . prefix, field. name( ) ) ,
439+ field. data_type ( ) . clone ( ) ,
440+ field. is_nullable ( ) ,
441+ )
442+ } )
443+ . collect ( ) ;
444+ let prefixed_schema = Arc :: new ( Schema :: new ( prefixed_fields) ) ;
445+
446+ // Create a new batch with the prefixed schema but the same data
447+ let options = RecordBatchOptions :: default ( ) ;
448+ RecordBatch :: try_new_with_options (
449+ prefixed_schema,
450+ batch. columns ( ) . to_vec ( ) ,
451+ & options,
452+ )
453+ . map_err ( |e| DataFusionError :: ArrowError ( Box :: new ( e) , None ) )
454+ }
455+
456+ fn map_column_statistics (
457+ & self ,
458+ stats : & [ ColumnStatistics ] ,
459+ ) -> Result < Vec < ColumnStatistics > > {
460+ // For testing, just return the input statistics
461+ Ok ( stats. to_vec ( ) )
462+ }
463+ }
464+
465+ Ok ( (
466+ Arc :: new ( PrefixSchemaMapping {
467+ prefix : self . prefix . clone ( ) ,
468+ } ) ,
469+ projection,
470+ ) )
471+ }
472+ }
473+
474+ #[ test]
475+ fn test_apply_schema_adapter_with_factory ( ) {
476+ // Create a schema
477+ let schema = Arc :: new ( Schema :: new ( vec ! [
478+ Field :: new( "id" , DataType :: Int32 , false ) ,
479+ Field :: new( "name" , DataType :: Utf8 , true ) ,
480+ ] ) ) ;
481+
482+ // Create a parquet source
483+ let source = ParquetSource :: default ( ) ;
484+
485+ // Create a file scan config with source that has a schema adapter factory
486+ let factory = Arc :: new ( PrefixAdapterFactory {
487+ prefix : "test_" . to_string ( ) ,
488+ } ) ;
489+
490+ let file_source = source. clone ( ) . with_schema_adapter_factory ( factory) . unwrap ( ) ;
491+
492+ let config = FileScanConfigBuilder :: new (
493+ ObjectStoreUrl :: local_filesystem ( ) ,
494+ schema. clone ( ) ,
495+ file_source,
496+ )
497+ . build ( ) ;
498+
499+ // Apply schema adapter to a new source
500+ let result_source = source. apply_schema_adapter ( & config) . unwrap ( ) ;
501+
502+ // Verify the adapter was applied
503+ assert ! ( result_source. schema_adapter_factory( ) . is_some( ) ) ;
504+
505+ // Create adapter and test it produces expected schema
506+ let adapter_factory = result_source. schema_adapter_factory ( ) . unwrap ( ) ;
507+ let adapter = adapter_factory. create ( schema. clone ( ) , schema. clone ( ) ) ;
508+
509+ // Create a dummy batch to test the schema mapping
510+ let dummy_batch = RecordBatch :: new_empty ( schema. clone ( ) ) ;
511+
512+ // Get the file schema (which is the same as the table schema in this test)
513+ let ( mapper, _) = adapter. map_schema ( & schema) . unwrap ( ) ;
514+
515+ // Apply the mapping to get the output schema
516+ let mapped_batch = mapper. map_batch ( dummy_batch) . unwrap ( ) ;
517+ let output_schema = mapped_batch. schema ( ) ;
518+
519+ // Check the column names have the prefix
520+ assert_eq ! ( output_schema. field( 0 ) . name( ) , "test_id" ) ;
521+ assert_eq ! ( output_schema. field( 1 ) . name( ) , "test_name" ) ;
522+ }
523+
524+ #[ test]
525+ fn test_apply_schema_adapter_without_factory ( ) {
526+ // Create a schema
527+ let schema = Arc :: new ( Schema :: new ( vec ! [
528+ Field :: new( "id" , DataType :: Int32 , false ) ,
529+ Field :: new( "name" , DataType :: Utf8 , true ) ,
530+ ] ) ) ;
531+
532+ // Create a parquet source
533+ let source = ParquetSource :: default ( ) ;
534+
535+ // Convert to Arc<dyn FileSource>
536+ let file_source: Arc < dyn FileSource > = Arc :: new ( source. clone ( ) ) ;
537+
538+ // Create a file scan config without a schema adapter factory
539+ let config = FileScanConfigBuilder :: new (
540+ ObjectStoreUrl :: local_filesystem ( ) ,
541+ schema. clone ( ) ,
542+ file_source,
543+ )
544+ . build ( ) ;
545+
546+ // Apply schema adapter function - should pass through the source unchanged
547+ let result_source = source. apply_schema_adapter ( & config) . unwrap ( ) ;
548+
549+ // Verify no adapter was applied
550+ assert ! ( result_source. schema_adapter_factory( ) . is_none( ) ) ;
551+ }
0 commit comments