@@ -85,6 +85,13 @@ use datafusion::physical_expr::window::WindowExpr;
8585use datafusion:: physical_expr:: LexOrdering ;
8686
8787use crate :: parquet:: parquet_exec:: init_datasource_exec;
88+ use arrow:: array:: {
89+ BinaryBuilder , BooleanArray , Date32Array , Decimal128Array , Float32Array , Float64Array ,
90+ Int16Array , Int32Array , Int64Array , Int8Array , NullArray , StringBuilder ,
91+ TimestampMicrosecondArray ,
92+ } ;
93+ use arrow:: buffer:: BooleanBuffer ;
94+ use datafusion:: common:: utils:: SingleRowListArrayBuilder ;
8895use datafusion:: physical_plan:: coalesce_batches:: CoalesceBatchesExec ;
8996use datafusion:: physical_plan:: filter:: FilterExec as DataFusionFilterExec ;
9097use datafusion_comet_proto:: spark_operator:: SparkFilePartition ;
@@ -474,6 +481,125 @@ impl PhysicalPlanner {
474481 ) ) )
475482 }
476483 }
484+ } ,
485+ Value :: ListVal ( values) => {
486+ if let DataType :: List ( f) = data_type {
487+ match f. data_type ( ) {
488+ DataType :: Null => {
489+ SingleRowListArrayBuilder :: new ( Arc :: new ( NullArray :: new ( values. clone ( ) . null_mask . len ( ) ) ) )
490+ . build_list_scalar ( )
491+ }
492+ DataType :: Boolean => {
493+ let vals = values. clone ( ) ;
494+ SingleRowListArrayBuilder :: new ( Arc :: new ( BooleanArray :: new ( BooleanBuffer :: from ( vals. boolean_values ) , Some ( vals. null_mask . into ( ) ) ) ) )
495+ . build_list_scalar ( )
496+ }
497+ DataType :: Int8 => {
498+ let vals = values. clone ( ) ;
499+ SingleRowListArrayBuilder :: new ( Arc :: new ( Int8Array :: new ( vals. byte_values . iter ( ) . map ( |& x| x as i8 ) . collect :: < Vec < _ > > ( ) . into ( ) , Some ( vals. null_mask . into ( ) ) ) ) )
500+ . build_list_scalar ( )
501+ }
502+ DataType :: Int16 => {
503+ let vals = values. clone ( ) ;
504+ SingleRowListArrayBuilder :: new ( Arc :: new ( Int16Array :: new ( vals. short_values . iter ( ) . map ( |& x| x as i16 ) . collect :: < Vec < _ > > ( ) . into ( ) , Some ( vals. null_mask . into ( ) ) ) ) )
505+ . build_list_scalar ( )
506+ }
507+ DataType :: Int32 => {
508+ let vals = values. clone ( ) ;
509+ SingleRowListArrayBuilder :: new ( Arc :: new ( Int32Array :: new ( vals. int_values . into ( ) , Some ( vals. null_mask . into ( ) ) ) ) )
510+ . build_list_scalar ( )
511+ }
512+ DataType :: Int64 => {
513+ let vals = values. clone ( ) ;
514+ SingleRowListArrayBuilder :: new ( Arc :: new ( Int64Array :: new ( vals. long_values . into ( ) , Some ( vals. null_mask . into ( ) ) ) ) )
515+ . build_list_scalar ( )
516+ }
517+ DataType :: Float32 => {
518+ let vals = values. clone ( ) ;
519+ SingleRowListArrayBuilder :: new ( Arc :: new ( Float32Array :: new ( vals. float_values . into ( ) , Some ( vals. null_mask . into ( ) ) ) ) )
520+ . build_list_scalar ( )
521+ }
522+ DataType :: Float64 => {
523+ let vals = values. clone ( ) ;
524+ SingleRowListArrayBuilder :: new ( Arc :: new ( Float64Array :: new ( vals. double_values . into ( ) , Some ( vals. null_mask . into ( ) ) ) ) )
525+ . build_list_scalar ( )
526+ }
527+ DataType :: Timestamp ( TimeUnit :: Microsecond , None ) => {
528+ let vals = values. clone ( ) ;
529+ SingleRowListArrayBuilder :: new ( Arc :: new ( TimestampMicrosecondArray :: new ( vals. long_values . into ( ) , Some ( vals. null_mask . into ( ) ) ) ) )
530+ . build_list_scalar ( )
531+ }
532+ DataType :: Timestamp ( TimeUnit :: Microsecond , Some ( tz) ) => {
533+ let vals = values. clone ( ) ;
534+ SingleRowListArrayBuilder :: new ( Arc :: new ( TimestampMicrosecondArray :: new ( vals. long_values . into ( ) , Some ( vals. null_mask . into ( ) ) ) . with_timezone ( Arc :: clone ( tz) ) ) )
535+ . build_list_scalar ( )
536+ }
537+ DataType :: Date32 => {
538+ let vals = values. clone ( ) ;
539+ SingleRowListArrayBuilder :: new ( Arc :: new ( Date32Array :: new ( vals. int_values . into ( ) , Some ( vals. null_mask . into ( ) ) ) ) )
540+ . build_list_scalar ( )
541+ }
542+ DataType :: Binary => {
543+ // Using a builder as it is cumbersome to create BinaryArray from a vector with nulls
544+ // and calculate correct offsets
545+ let vals = values. clone ( ) ;
546+ let item_capacity = vals. string_values . len ( ) ;
547+ let data_capacity = vals. string_values . first ( ) . map ( |s| s. len ( ) * item_capacity) . unwrap_or ( 0 ) ;
548+ let mut arr = BinaryBuilder :: with_capacity ( item_capacity, data_capacity) ;
549+
550+ for ( i, v) in vals. bytes_values . into_iter ( ) . enumerate ( ) {
551+ if vals. null_mask [ i] {
552+ arr. append_value ( v) ;
553+ } else {
554+ arr. append_null ( ) ;
555+ }
556+ }
557+
558+ SingleRowListArrayBuilder :: new ( Arc :: new ( arr. finish ( ) ) )
559+ . build_list_scalar ( )
560+ }
561+ DataType :: Utf8 => {
562+ // Using a builder as it is cumbersome to create StringArray from a vector with nulls
563+ // and calculate correct offsets
564+ let vals = values. clone ( ) ;
565+ let item_capacity = vals. string_values . len ( ) ;
566+ let data_capacity = vals. string_values . first ( ) . map ( |s| s. len ( ) * item_capacity) . unwrap_or ( 0 ) ;
567+ let mut arr = StringBuilder :: with_capacity ( item_capacity, data_capacity) ;
568+
569+ for ( i, v) in vals. string_values . into_iter ( ) . enumerate ( ) {
570+ if vals. null_mask [ i] {
571+ arr. append_value ( v) ;
572+ } else {
573+ arr. append_null ( ) ;
574+ }
575+ }
576+
577+ SingleRowListArrayBuilder :: new ( Arc :: new ( arr. finish ( ) ) )
578+ . build_list_scalar ( )
579+ }
580+ DataType :: Decimal128 ( p, s) => {
581+ let vals = values. clone ( ) ;
582+ SingleRowListArrayBuilder :: new ( Arc :: new ( Decimal128Array :: new ( vals. decimal_values . into_iter ( ) . map ( |v| {
583+ let big_integer = BigInt :: from_signed_bytes_be ( & v) ;
584+ big_integer. to_i128 ( ) . ok_or_else ( || {
585+ GeneralError ( format ! (
586+ "Cannot parse {big_integer:?} as i128 for Decimal literal"
587+ ) )
588+ } ) . unwrap ( )
589+ } ) . collect :: < Vec < _ > > ( ) . into ( ) , Some ( vals. null_mask . into ( ) ) ) . with_precision_and_scale ( * p, * s) ?) ) . build_list_scalar ( )
590+ }
591+ dt => {
592+ return Err ( GeneralError ( format ! (
593+ "DataType::List literal does not support {dt:?} type"
594+ ) ) )
595+ }
596+ }
597+
598+ } else {
599+ return Err ( GeneralError ( format ! (
600+ "Expected DataType::List but got {data_type:?}"
601+ ) ) )
602+ }
477603 }
478604 }
479605 } ;
@@ -1300,6 +1426,7 @@ impl PhysicalPlanner {
13001426 // The `ScanExec` operator will take actual arrays from Spark during execution
13011427 let scan =
13021428 ScanExec :: new ( self . exec_context_id , input_source, & scan. source , data_types) ?;
1429+
13031430 Ok ( (
13041431 vec ! [ scan. clone( ) ] ,
13051432 Arc :: new ( SparkPlan :: new ( spark_plan. plan_id , Arc :: new ( scan) , vec ! [ ] ) ) ,
@@ -2322,7 +2449,6 @@ impl PhysicalPlanner {
23222449 other => other,
23232450 } ;
23242451 let func = self . session_ctx . udf ( fun_name) ?;
2325-
23262452 let coerced_types = func
23272453 . coerce_types ( & input_expr_types)
23282454 . unwrap_or_else ( |_| input_expr_types. clone ( ) ) ;
0 commit comments