Skip to content

Commit

Permalink
If statistics of column Max/Min value does not exists in parquet file…
Browse files Browse the repository at this point in the history
…, sent Min/Max to None (#2671)

* If statistics of column Max/Min value does not exists in parquet file, set the Max/Min Accumulator to None.

* add more data types for parquet max/min test case
  • Loading branch information
HuSen8891 authored Jun 2, 2022
1 parent 49e072a commit 4b3eb1c
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 5 deletions.
24 changes: 23 additions & 1 deletion datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,11 @@ fn summarize_min_max(
}
}
}
return;
}
}
max_values[i] = None;
min_values[i] = None;
}
ParquetStatistics::Int32(s) => {
if let DataType::Int32 = fields[i].data_type() {
Expand All @@ -189,8 +192,11 @@ fn summarize_min_max(
}
}
}
return;
}
}
max_values[i] = None;
min_values[i] = None;
}
ParquetStatistics::Int64(s) => {
if let DataType::Int64 = fields[i].data_type() {
Expand All @@ -217,8 +223,11 @@ fn summarize_min_max(
}
}
}
return;
}
}
max_values[i] = None;
min_values[i] = None;
}
ParquetStatistics::Float(s) => {
if let DataType::Float32 = fields[i].data_type() {
Expand All @@ -243,8 +252,11 @@ fn summarize_min_max(
}
}
}
return;
}
}
max_values[i] = None;
min_values[i] = None;
}
ParquetStatistics::Double(s) => {
if let DataType::Float64 = fields[i].data_type() {
Expand All @@ -269,10 +281,16 @@ fn summarize_min_max(
}
}
}
return;
}
}
max_values[i] = None;
min_values[i] = None;
}
_ => {
max_values[i] = None;
min_values[i] = None;
}
_ => {}
}
}

Expand Down Expand Up @@ -344,6 +362,10 @@ fn fetch_statistics(
table_idx,
stats,
)
} else {
// If none statistics of current column exists, set the Max/Min Accumulator to None.
max_values[table_idx] = None;
min_values[table_idx] = None;
}
} else {
*null_cnt += num_rows as usize;
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ pub async fn get_statistics_with_limit(
max_values[i] = None;
}
}
} else {
max_values[i] = None;
}
}

Expand All @@ -89,6 +91,8 @@ pub async fn get_statistics_with_limit(
min_values[i] = None;
}
}
} else {
min_values[i] = None;
}
}
}
Expand Down
100 changes: 96 additions & 4 deletions datafusion/core/tests/sql/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,105 @@ async fn schema_merge_ignores_metadata() {
// (no errors)
let ctx = SessionContext::new();
let df = ctx
.read_parquet(
table_dir.to_str().unwrap().to_string(),
ParquetReadOptions::default(),
)
.read_parquet(table_dir.to_str().unwrap(), ParquetReadOptions::default())
.await
.unwrap();
let result = df.collect().await.unwrap();

assert_eq!(result[0].schema().metadata(), result[1].schema().metadata());
}

#[tokio::test]
async fn parquet_query_with_max_min() {
let tmp_dir = TempDir::new().unwrap();
let table_dir = tmp_dir.path().join("parquet_test");
let table_path = Path::new(&table_dir);

let fields = vec![
Field::new("c1", DataType::Int32, true),
Field::new("c2", DataType::Utf8, true),
Field::new("c3", DataType::Int64, true),
Field::new("c4", DataType::Date32, true),
];

let schema = Arc::new(Schema::new(fields.clone()));

if let Ok(()) = fs::create_dir(table_path) {
let filename = "foo.parquet";
let path = table_path.join(&filename);
let file = fs::File::create(path).unwrap();
let mut writer =
ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(), None)
.unwrap();

// create mock record batch
let c1s = Arc::new(Int32Array::from_slice(&[1, 2, 3]));
let c2s = Arc::new(StringArray::from_slice(&["aaa", "bbb", "ccc"]));
let c3s = Arc::new(Int64Array::from_slice(&[100, 200, 300]));
let c4s = Arc::new(Date32Array::from(vec![Some(1), Some(2), Some(3)]));
let rec_batch =
RecordBatch::try_new(schema.clone(), vec![c1s, c2s, c3s, c4s]).unwrap();

writer.write(&rec_batch).unwrap();
writer.close().unwrap();
}

// query parquet
let ctx = SessionContext::new();

ctx.register_parquet(
"foo",
&format!("{}/foo.parquet", table_dir.to_str().unwrap()),
ParquetReadOptions::default(),
)
.await
.unwrap();

let sql = "SELECT max(c1) FROM foo";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+-------------+",
"| MAX(foo.c1) |",
"+-------------+",
"| 3 |",
"+-------------+",
];

assert_batches_eq!(expected, &actual);

let sql = "SELECT min(c2) FROM foo";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+-------------+",
"| MIN(foo.c2) |",
"+-------------+",
"| aaa |",
"+-------------+",
];

assert_batches_eq!(expected, &actual);

let sql = "SELECT max(c3) FROM foo";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+-------------+",
"| MAX(foo.c3) |",
"+-------------+",
"| 300 |",
"+-------------+",
];

assert_batches_eq!(expected, &actual);

let sql = "SELECT min(c4) FROM foo";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
"+-------------+",
"| MIN(foo.c4) |",
"+-------------+",
"| 1970-01-02 |",
"+-------------+",
];

assert_batches_eq!(expected, &actual);
}

0 comments on commit 4b3eb1c

Please sign in to comment.