Skip to content

Commit 41945b9

Browse files
Merge pull request #1 from alamb/alamb/merging_test
Add round trip parquet testing
2 parents e81f1df + ad335ed commit 41945b9

File tree

1 file changed

+107
-5
lines changed
  • datafusion/src/physical_plan/file_format

1 file changed

+107
-5
lines changed

datafusion/src/physical_plan/file_format/parquet.rs

Lines changed: 107 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -520,24 +520,126 @@ fn read_partition(
520520

521521
#[cfg(test)]
522522
mod tests {
523-
use crate::datasource::{
523+
use crate::{datasource::{
524524
file_format::{parquet::ParquetFormat, FileFormat},
525525
object_store::local::{
526526
local_object_reader_stream, local_unpartitioned_file, LocalFileSystem,
527527
},
528-
};
528+
}, assert_batches_sorted_eq, physical_plan::collect};
529529

530530
use super::*;
531-
use arrow::datatypes::{DataType, Field};
531+
use arrow::{datatypes::{DataType, Field}, array::{StringArray, Int8Array, Int64Array}};
532532
use futures::StreamExt;
533533
use parquet::{
534534
basic::Type as PhysicalType,
535-
file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics},
536-
schema::types::SchemaDescPtr,
535+
file::{metadata::RowGroupMetaData, statistics::Statistics as ParquetStatistics, properties::WriterProperties},
536+
schema::types::SchemaDescPtr, arrow::ArrowWriter,
537537
};
538538

539+
540+
/// writes each RecordBatch as an individual parquet file and then
541+
/// reads it back in to the named location.
542+
async fn round_trip_to_parquet(batches: Vec<RecordBatch>, projection: Option<Vec<usize>>) -> Vec<RecordBatch> {
543+
// When vec is dropped, temp files are deleted
544+
let files: Vec<_> = batches.into_iter()
545+
.map(|batch| {
546+
let output = tempfile::NamedTempFile::new().expect("creating temp file");
547+
548+
let props = WriterProperties::builder().build();
549+
let file: std::fs::File = (*output.as_file()).try_clone().expect("cloning file descriptor");
550+
let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).expect("creating writer");
551+
552+
writer.write(&batch).expect("Writing batch");
553+
writer.close().unwrap();
554+
output
555+
})
556+
.collect();
557+
558+
let file_names: Vec<_> = files.iter()
559+
.map(|t| t.path().to_string_lossy().to_string())
560+
.collect();
561+
562+
// Now, read the files back in
563+
let file_groups: Vec<_> = file_names.iter()
564+
.map(|name| local_unpartitioned_file(name.clone()))
565+
.collect();
566+
567+
// Infer the schema
568+
let file_schema = ParquetFormat::default()
569+
.infer_schema(local_object_reader_stream(file_names))
570+
.await
571+
.expect("inferring schema");
572+
573+
// prepare the scan
574+
let parquet_exec = ParquetExec::new(
575+
FileScanConfig {
576+
object_store: Arc::new(LocalFileSystem {}),
577+
file_groups: vec![file_groups],
578+
file_schema,
579+
statistics: Statistics::default(),
580+
projection,
581+
limit: None,
582+
table_partition_cols: vec![],
583+
},
584+
None,
585+
);
586+
587+
let runtime = Arc::new(RuntimeEnv::default());
588+
collect(Arc::new(parquet_exec) , runtime)
589+
.await
590+
.expect("reading parquet data")
591+
}
592+
593+
// Add a new column with the specified field name to the RecordBatch
594+
fn add_to_batch(batch: &RecordBatch, field_name: &str, array: ArrayRef) -> RecordBatch {
595+
let mut fields = batch.schema().fields().clone();
596+
fields.push(Field::new(field_name, array.data_type().clone(), true));
597+
let schema = Arc::new(Schema::new(fields));
598+
599+
let mut columns = batch.columns().to_vec();
600+
columns.push(array);
601+
RecordBatch::try_new(schema, columns).expect("error; creating record batch")
602+
}
603+
604+
#[tokio::test]
605+
async fn evolved_schema() {
606+
let c1: ArrayRef = Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
607+
// batch1: c1(string)
608+
let batch1 = add_to_batch(&RecordBatch::new_empty(Arc::new(Schema::new(vec![]))), "c1", c1);
609+
610+
// batch2: c1(string) and c2(int64)
611+
let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
612+
let batch2 = add_to_batch(&batch1, "c2", c2);
613+
614+
// batch3: c1(string) and c3(int8)
615+
let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
616+
let batch3 = add_to_batch(&batch1, "c3", c3);
617+
618+
// read/write them files:
619+
let read = round_trip_to_parquet(vec![batch1, batch2, batch3], None).await;
620+
let expected = vec![
621+
"+-----+----+----+",
622+
"| c1 | c2 | c3 |",
623+
"+-----+----+----+",
624+
"| | | |",
625+
"| | | 20 |",
626+
"| | 2 | |",
627+
"| Foo | | |",
628+
"| Foo | | 10 |",
629+
"| Foo | 1 | |",
630+
"| bar | | |",
631+
"| bar | | |",
632+
"| bar | | |",
633+
"+-----+----+----+",
634+
];
635+
assert_batches_sorted_eq!(expected, &read);
636+
}
637+
638+
539639
#[tokio::test]
540640
async fn parquet_exec_with_evolved_schema() -> Result<()> {
641+
642+
541643
let runtime = Arc::new(RuntimeEnv::default());
542644
let testdata = crate::test_util::parquet_test_data();
543645
let part1 = format!("{}/schema_evolution/part1.parquet", testdata);

0 commit comments

Comments
 (0)