Skip to content

Commit

Permalink
Fix avro tests (#2570) (#2571)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold authored May 18, 2022
1 parent 5399f1c commit a08d26e
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ jobs:
run: |
export ARROW_TEST_DATA=$(pwd)/testing/data
export PARQUET_TEST_DATA=$(pwd)/parquet-testing/data
cargo test
cargo test --features avro
# test datafusion examples
cd datafusion-examples
cargo run --example csv_sql
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,7 @@ fn flatten_string_values(values: &[&Value]) -> Vec<Option<String>> {
values
.iter()
.flat_map(|row| {
let row = maybe_resolve_union(row);
if let Value::Array(values) = row {
values
.iter()
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ mod tests {

use super::*;
use crate::datasource::listing::local_unpartitioned_file;
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{
BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
Expand All @@ -99,10 +98,10 @@ mod tests {
async fn read_small_batches() -> Result<()> {
let config = SessionConfig::new().with_batch_size(2);
let ctx = SessionContext::with_config(config);
let task_ctx = session_ctx.task_ctx();
let task_ctx = ctx.task_ctx();
let projection = None;
let exec = get_exec("alltypes_plain.avro", &projection, None).await?;
let stream = exec.execute(0, task_ctx).await?;
let stream = exec.execute(0, task_ctx)?;

let tt_batches = stream
.map(|batch| {
Expand Down
39 changes: 26 additions & 13 deletions datafusion/core/src/physical_plan/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Execution plan for reading line-delimited Avro files
#[cfg(feature = "avro")]
use crate::avro_to_arrow;
use crate::error::{DataFusionError, Result};
use crate::error::Result;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
Expand Down Expand Up @@ -98,7 +98,7 @@ impl ExecutionPlan for AvroExec {
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
Err(DataFusionError::NotImplemented(
Err(crate::error::DataFusionError::NotImplemented(
"Cannot execute avro plan without avro feature enabled".to_string(),
))
}
Expand Down Expand Up @@ -165,17 +165,17 @@ impl ExecutionPlan for AvroExec {
#[cfg(test)]
#[cfg(feature = "avro")]
mod tests {
use crate::datasource::object_store::local::{
local_object_reader_stream, LocalFileSystem,
};
use crate::datasource::{
file_format::{avro::AvroFormat, FileFormat},
listing::local_unpartitioned_file,
};
use crate::prelude::SessionContext;
use crate::scalar::ScalarValue;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_data_access::object_store::local::{
local_object_reader_stream, LocalFileSystem,
};
use futures::StreamExt;
use sqlparser::ast::ObjectType::Schema;

use super::*;

Expand All @@ -196,7 +196,11 @@ mod tests {
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);

let mut results = avro_exec.execute(0).await.expect("plan execution failed");
let ctx = SessionContext::new();
let mut results = avro_exec
.execute(0, ctx.task_ctx())
.expect("plan execution failed");

let batch = results
.next()
.await
Expand Down Expand Up @@ -237,27 +241,32 @@ mod tests {
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{}/avro/alltypes_plain.avro", testdata);
let actual_schema = AvroFormat {}
.infer_schema(local_object_reader_stream(vec![filename]))
.infer_schema(local_object_reader_stream(vec![filename.clone()]))
.await?;

let mut fields = actual_schema.fields().clone();
fields.push(Field::new("missing_col", DataType::Int32, true));

let file_schema = Arc::new(Schema::new(fields));
// Include the missing column in the projection
let projection = Some(vec![0, 1, 2, actual_schema.fields().len()]);

let avro_exec = AvroExec::new(FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_groups: vec![vec![local_unpartitioned_file(filename.clone())]],
file_schema,
statistics: Statistics::default(),
// Include the missing column in the projection
projection: Some(vec![0, 1, 2, file_schema.fields().len()]),
projection,
limit: None,
table_partition_cols: vec![],
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);

let mut results = avro_exec.execute(0).await.expect("plan execution failed");
let ctx = SessionContext::new();
let mut results = avro_exec
.execute(0, ctx.task_ctx())
.expect("plan execution failed");

let batch = results
.next()
.await
Expand Down Expand Up @@ -310,14 +319,18 @@ mod tests {
projection: Some(vec![0, 1, file_schema.fields().len(), 2]),
object_store: Arc::new(LocalFileSystem {}),
file_groups: vec![vec![partitioned_file]],
file_schema: file_schema,
file_schema,
statistics: Statistics::default(),
limit: None,
table_partition_cols: vec!["date".to_owned()],
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);

let mut results = avro_exec.execute(0).await.expect("plan execution failed");
let ctx = SessionContext::new();
let mut results = avro_exec
.execute(0, ctx.task_ctx())
.expect("plan execution failed");

let batch = results
.next()
.await
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ async fn avro_single_nan_schema() {
let plan = ctx.create_logical_plan(sql).unwrap();
let plan = ctx.optimize(&plan).unwrap();
let plan = ctx.create_physical_plan(&plan).await.unwrap();
let runtime = ctx.state.lock().runtime_env.clone();
let runtime = ctx.task_ctx();
let results = collect(plan, runtime).await.unwrap();
for batch in results {
assert_eq!(1, batch.num_rows());
Expand Down

0 comments on commit a08d26e

Please sign in to comment.