@@ -699,15 +699,13 @@ impl LogicalPlan {
699699 } ) )
700700 }
701701 LogicalPlan :: Union ( Union { inputs, schema } ) => {
702- let input_schema = inputs[ 0 ] . schema ( ) ;
703- // If inputs are not pruned do not change schema
704- // TODO this seems wrong (shouldn't we always use the schema of the input?)
705- let schema = if schema. fields ( ) . len ( ) == input_schema. fields ( ) . len ( ) {
706- Arc :: clone ( & schema)
702+ let first_input_schema = inputs[ 0 ] . schema ( ) ;
703+ if schema. fields ( ) . len ( ) == first_input_schema. fields ( ) . len ( ) {
704+ // If inputs are not pruned do not change schema
705+ Ok ( LogicalPlan :: Union ( Union { inputs, schema } ) )
707706 } else {
708- Arc :: clone ( input_schema)
709- } ;
710- Ok ( LogicalPlan :: Union ( Union { inputs, schema } ) )
707+ Ok ( LogicalPlan :: Union ( Union :: try_new ( inputs) ?) )
708+ }
711709 }
712710 LogicalPlan :: Distinct ( distinct) => {
713711 let distinct = match distinct {
@@ -2645,6 +2643,107 @@ pub struct Union {
26452643 pub schema : DFSchemaRef ,
26462644}
26472645
2646+ impl Union {
2647+ /// Constructs new Union instance deriving schema from inputs.
2648+ fn try_new ( inputs : Vec < Arc < LogicalPlan > > ) -> Result < Self > {
2649+ let schema = Self :: derive_schema_from_inputs ( & inputs, false ) ?;
2650+ Ok ( Union { inputs, schema } )
2651+ }
2652+
2653+ /// Constructs new Union instance deriving schema from inputs.
2654+ /// Inputs do not have to have matching types and produced schema will
2655+ /// take type from the first input.
2656+ // TODO (https://github.com/apache/datafusion/issues/14380): Avoid creating uncoerced union at all.
2657+ pub fn try_new_with_loose_types ( inputs : Vec < Arc < LogicalPlan > > ) -> Result < Self > {
2658+ let schema = Self :: derive_schema_from_inputs ( & inputs, true ) ?;
2659+ Ok ( Union { inputs, schema } )
2660+ }
2661+
2662+ /// Constructs new Union instance deriving schema from inputs.
2663+ ///
2664+ /// `loose_types` if true, inputs do not have to have matching types and produced schema will
2665+ /// take type from the first input. TODO (<https://github.com/apache/datafusion/issues/14380>) this is not necessarily reasonable behavior.
2666+ fn derive_schema_from_inputs (
2667+ inputs : & [ Arc < LogicalPlan > ] ,
2668+ loose_types : bool ,
2669+ ) -> Result < DFSchemaRef > {
2670+ if inputs. len ( ) < 2 {
2671+ return plan_err ! ( "UNION requires at least two inputs" ) ;
2672+ }
2673+ let first_schema = inputs[ 0 ] . schema ( ) ;
2674+ let fields_count = first_schema. fields ( ) . len ( ) ;
2675+ for input in inputs. iter ( ) . skip ( 1 ) {
2676+ if fields_count != input. schema ( ) . fields ( ) . len ( ) {
2677+ return plan_err ! (
2678+ "UNION queries have different number of columns: \
2679+ left has {} columns whereas right has {} columns",
2680+ fields_count,
2681+ input. schema( ) . fields( ) . len( )
2682+ ) ;
2683+ }
2684+ }
2685+
2686+ let union_fields = ( 0 ..fields_count)
2687+ . map ( |i| {
2688+ let fields = inputs
2689+ . iter ( )
2690+ . map ( |input| input. schema ( ) . field ( i) )
2691+ . collect :: < Vec < _ > > ( ) ;
2692+ let first_field = fields[ 0 ] ;
2693+ let name = first_field. name ( ) ;
2694+ let data_type = if loose_types {
2695+ // TODO apply type coercion here, or document why it's better to defer
2696+ // temporarily use the data type from the left input and later rely on the analyzer to
2697+ // coerce the two schemas into a common one.
2698+ first_field. data_type ( )
2699+ } else {
2700+ fields. iter ( ) . skip ( 1 ) . try_fold (
2701+ first_field. data_type ( ) ,
2702+ |acc, field| {
2703+ if acc != field. data_type ( ) {
2704+ return plan_err ! (
2705+ "UNION field {i} have different type in inputs: \
2706+ left has {} whereas right has {}",
2707+ first_field. data_type( ) ,
2708+ field. data_type( )
2709+ ) ;
2710+ }
2711+ Ok ( acc)
2712+ } ,
2713+ ) ?
2714+ } ;
2715+ let nullable = fields. iter ( ) . any ( |field| field. is_nullable ( ) ) ;
2716+ let mut field = Field :: new ( name, data_type. clone ( ) , nullable) ;
2717+ let field_metadata =
2718+ intersect_maps ( fields. iter ( ) . map ( |field| field. metadata ( ) ) ) ;
2719+ field. set_metadata ( field_metadata) ;
2720+ // TODO reusing table reference from the first schema is probably wrong
2721+ let table_reference = first_schema. qualified_field ( i) . 0 . cloned ( ) ;
2722+ Ok ( ( table_reference, Arc :: new ( field) ) )
2723+ } )
2724+ . collect :: < Result < _ > > ( ) ?;
2725+ let union_schema_metadata =
2726+ intersect_maps ( inputs. iter ( ) . map ( |input| input. schema ( ) . metadata ( ) ) ) ;
2727+
2728+ // Functional Dependencies doesn't preserve after UNION operation
2729+ let schema = DFSchema :: new_with_metadata ( union_fields, union_schema_metadata) ?;
2730+ let schema = Arc :: new ( schema) ;
2731+
2732+ Ok ( schema)
2733+ }
2734+ }
2735+
2736+ fn intersect_maps < ' a > (
2737+ inputs : impl IntoIterator < Item = & ' a HashMap < String , String > > ,
2738+ ) -> HashMap < String , String > {
2739+ let mut inputs = inputs. into_iter ( ) ;
2740+ let mut merged: HashMap < String , String > = inputs. next ( ) . cloned ( ) . unwrap_or_default ( ) ;
2741+ for input in inputs {
2742+ merged. retain ( |k, v| input. get ( k) == Some ( v) ) ;
2743+ }
2744+ merged
2745+ }
2746+
26482747// Manual implementation needed because of `schema` field. Comparison excludes this field.
26492748impl PartialOrd for Union {
26502749 fn partial_cmp ( & self , other : & Self ) -> Option < Ordering > {
0 commit comments