Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 107 additions & 5 deletions datafusion/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -520,24 +520,126 @@ fn read_partition(

#[cfg(test)]
mod tests {
use crate::datasource::{
use crate::{datasource::{
file_format::{parquet::ParquetFormat, FileFormat},
object_store::local::{
local_object_reader_stream, local_unpartitioned_file, LocalFileSystem,
},
};
}, assert_batches_sorted_eq, physical_plan::collect};

use super::*;
use arrow::datatypes::{DataType, Field};
use arrow::{datatypes::{DataType, Field}, array::{StringArray, Int8Array, Int64Array}};
use futures::StreamExt;
use parquet::{
basic::Type as PhysicalType,
file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics},
schema::types::SchemaDescPtr,
file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics, properties::WriterProperties},
schema::types::SchemaDescPtr, arrow::ArrowWriter,
};


/// writes each RecordBatch as an individual parquet file and then
/// reads it back in to the named location.
async fn round_trip_to_parquet(batches: Vec<RecordBatch>, projection: Option<Vec<usize>>) -> Vec<RecordBatch> {
// When vec is dropped, temp files are deleted
let files: Vec<_> = batches.into_iter()
.map(|batch| {
let output = tempfile::NamedTempFile::new().expect("creating temp file");

let props = WriterProperties::builder().build();
let file: std::fs::File = (*output.as_file()).try_clone().expect("cloning file descriptor");
let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).expect("creating writer");

writer.write(&batch).expect("Writing batch");
writer.close().unwrap();
output
})
.collect();

let file_names: Vec<_> = files.iter()
.map(|t| t.path().to_string_lossy().to_string())
.collect();

// Now, read the files back in
let file_groups: Vec<_> = file_names.iter()
.map(|name| local_unpartitioned_file(name.clone()))
.collect();

// Infer the schema
let file_schema = ParquetFormat::default()
.infer_schema(local_object_reader_stream(file_names))
.await
.expect("inferring schema");

// prepare the scan
let parquet_exec = ParquetExec::new(
FileScanConfig {
object_store: Arc::new(LocalFileSystem {}),
file_groups: vec![file_groups],
file_schema,
statistics: Statistics::default(),
projection,
limit: None,
table_partition_cols: vec![],
},
None,
);

let runtime = Arc::new(RuntimeEnv::default());
collect(Arc::new(parquet_exec) , runtime)
.await
.expect("reading parquet data")
}

// Add a new column with the specified field name to the RecordBatch
fn add_to_batch(batch: &RecordBatch, field_name: &str, array: ArrayRef) -> RecordBatch {
let mut fields = batch.schema().fields().clone();
fields.push(Field::new(field_name, array.data_type().clone(), true));
let schema = Arc::new(Schema::new(fields));

let mut columns = batch.columns().to_vec();
columns.push(array);
RecordBatch::try_new(schema, columns).expect("error; creating record batch")
}

#[tokio::test]
async fn evolved_schema() {
let c1: ArrayRef = Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
// batch1: c1(string)
let batch1 = add_to_batch(&RecordBatch::new_empty(Arc::new(Schema::new(vec![]))), "c1", c1);

// batch2: c1(string) and c2(int64)
let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
let batch2 = add_to_batch(&batch1, "c2", c2);

// batch3: c1(string) and c3(int8)
let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
let batch3 = add_to_batch(&batch1, "c3", c3);

// read/write them files:
let read = round_trip_to_parquet(vec![batch1, batch2, batch3], None).await;
let expected = vec![
"+-----+----+----+",
"| c1 | c2 | c3 |",
"+-----+----+----+",
"| | | |",
"| | | 20 |",
"| | 2 | |",
"| Foo | | |",
"| Foo | | 10 |",
"| Foo | 1 | |",
"| bar | | |",
"| bar | | |",
"| bar | | |",
"+-----+----+----+",
];
assert_batches_sorted_eq!(expected, &read);
}


#[tokio::test]
async fn parquet_exec_with_evolved_schema() -> Result<()> {


let runtime = Arc::new(RuntimeEnv::default());
let testdata = crate::test_util::parquet_test_data();
let part1 = format!("{}/schema_evolution/part1.parquet", testdata);
Expand Down