diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 4a4e32f90a16..ce1bc32f8feb 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -115,7 +115,7 @@ jobs: run: | export ARROW_TEST_DATA=$(pwd)/testing/data export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data - cargo test + cargo test --features avro # test datafusion examples cd datafusion-examples cargo run --example csv_sql diff --git a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs index 9d5552954f53..dfaa98555a6e 100644 --- a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs +++ b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs @@ -861,6 +861,7 @@ fn flatten_string_values(values: &[&Value]) -> Vec> { values .iter() .flat_map(|row| { + let row = maybe_resolve_union(row); if let Value::Array(values) = row { values .iter() diff --git a/datafusion/core/src/datasource/file_format/avro.rs b/datafusion/core/src/datasource/file_format/avro.rs index 954c28a77326..8820662063bc 100644 --- a/datafusion/core/src/datasource/file_format/avro.rs +++ b/datafusion/core/src/datasource/file_format/avro.rs @@ -87,7 +87,6 @@ mod tests { use super::*; use crate::datasource::listing::local_unpartitioned_file; - use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use crate::prelude::{SessionConfig, SessionContext}; use arrow::array::{ BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array, @@ -99,10 +98,10 @@ mod tests { async fn read_small_batches() -> Result<()> { let config = SessionConfig::new().with_batch_size(2); let ctx = SessionContext::with_config(config); - let task_ctx = session_ctx.task_ctx(); + let task_ctx = ctx.task_ctx(); let projection = None; let exec = get_exec("alltypes_plain.avro", &projection, None).await?; - let stream = exec.execute(0, task_ctx).await?; + let stream = exec.execute(0, task_ctx)?; let tt_batches = stream .map(|batch| { diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs index eed0161a2ea7..a25ad60e1483 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/physical_plan/file_format/avro.rs @@ -18,7 +18,7 @@ //! Execution plan for reading line-delimited Avro files #[cfg(feature = "avro")] use crate::avro_to_arrow; -use crate::error::{DataFusionError, Result}; +use crate::error::Result; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, @@ -98,7 +98,7 @@ impl ExecutionPlan for AvroExec { _partition: usize, _context: Arc, ) -> Result { - Err(DataFusionError::NotImplemented( + Err(crate::error::DataFusionError::NotImplemented( "Cannot execute avro plan without avro feature enabled".to_string(), )) } @@ -165,17 +165,17 @@ impl ExecutionPlan for AvroExec { #[cfg(test)] #[cfg(feature = "avro")] mod tests { - use crate::datasource::object_store::local::{ - local_object_reader_stream, LocalFileSystem, - }; use crate::datasource::{ file_format::{avro::AvroFormat, FileFormat}, listing::local_unpartitioned_file, }; + use crate::prelude::SessionContext; use crate::scalar::ScalarValue; use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_data_access::object_store::local::{ + local_object_reader_stream, LocalFileSystem, + }; use futures::StreamExt; - use sqlparser::ast::ObjectType::Schema; use super::*; @@ -196,7 +196,11 @@ mod tests { }); assert_eq!(avro_exec.output_partitioning().partition_count(), 1); - let mut results = avro_exec.execute(0).await.expect("plan execution failed"); + let ctx = SessionContext::new(); + let mut results = avro_exec + .execute(0, ctx.task_ctx()) + .expect("plan execution failed"); + let batch = results .next() .await @@ -237,27 +241,32 @@ mod tests { let testdata = crate::test_util::arrow_test_data(); let filename = format!("{}/avro/alltypes_plain.avro", testdata); let actual_schema = AvroFormat {} - .infer_schema(local_object_reader_stream(vec![filename])) + .infer_schema(local_object_reader_stream(vec![filename.clone()])) .await?; let mut fields = actual_schema.fields().clone(); fields.push(Field::new("missing_col", DataType::Int32, true)); let file_schema = Arc::new(Schema::new(fields)); + // Include the missing column in the projection + let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]); let avro_exec = AvroExec::new(FileScanConfig { object_store: Arc::new(LocalFileSystem {}), file_groups: vec![vec![local_unpartitioned_file(filename.clone())]], file_schema, statistics: Statistics::default(), - // Include the missing column in the projection - projection: Some(vec![0, 1, 2, file_schema.fields().len()]), + projection, limit: None, table_partition_cols: vec![], }); assert_eq!(avro_exec.output_partitioning().partition_count(), 1); - let mut results = avro_exec.execute(0).await.expect("plan execution failed"); + let ctx = SessionContext::new(); + let mut results = avro_exec + .execute(0, ctx.task_ctx()) + .expect("plan execution failed"); + let batch = results .next() .await @@ -310,14 +319,18 @@ mod tests { projection: Some(vec![0, 1, file_schema.fields().len(), 2]), object_store: Arc::new(LocalFileSystem {}), file_groups: vec![vec![partitioned_file]], - file_schema: file_schema, + file_schema, statistics: Statistics::default(), limit: None, table_partition_cols: vec!["date".to_owned()], }); assert_eq!(avro_exec.output_partitioning().partition_count(), 1); - let mut results = avro_exec.execute(0).await.expect("plan execution failed"); + let ctx = SessionContext::new(); + let mut results = avro_exec + .execute(0, ctx.task_ctx()) + .expect("plan execution failed"); + let batch = results .next() .await diff --git a/datafusion/core/tests/sql/avro.rs b/datafusion/core/tests/sql/avro.rs index f5c25dbadce0..0102cf3b7cb7 100644 --- a/datafusion/core/tests/sql/avro.rs +++ b/datafusion/core/tests/sql/avro.rs @@ -124,7 +124,7 @@ async fn avro_single_nan_schema() { let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); let plan = ctx.create_physical_plan(&plan).await.unwrap(); - let runtime = ctx.state.lock().runtime_env.clone(); + let runtime = ctx.task_ctx(); let results = collect(plan, runtime).await.unwrap(); for batch in results { assert_eq!(1, batch.num_rows());