Skip to content

Commit d237d66

Browse files
committed
add test & fix single quote
1 parent 5516189 commit d237d66

File tree

2 files changed

+77
-33
lines changed

2 files changed

+77
-33
lines changed

datafusion-cli/src/functions.rs

Lines changed: 45 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use datafusion::execution::context::SessionState;
3030
use datafusion::logical_expr::Expr;
3131
use datafusion::physical_plan::memory::MemoryExec;
3232
use datafusion::physical_plan::ExecutionPlan;
33+
use datafusion::scalar::ScalarValue;
3334
use parquet::file::reader::FileReader;
3435
use parquet::file::serialized_reader::SerializedFileReader;
3536
use parquet::file::statistics::Statistics;
@@ -249,11 +250,17 @@ pub struct ParquetMetadataFunc {}
249250

250251
impl TableFunctionImpl for ParquetMetadataFunc {
251252
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
252-
let Some(Expr::Column(Column { name, .. })) = exprs.get(0) else {
253-
return plan_err!("parquet_metadata requires string argument as its input");
253+
let filename = match exprs.get(0) {
254+
Some(Expr::Literal(ScalarValue::Utf8(Some(s)))) => s, // single quote: parquet_metadata('x.parquet')
255+
Some(Expr::Column(Column { name, .. })) => name, // double quote: parquet_metadata("x.parquet")
256+
_ => {
257+
return plan_err!(
258+
"parquet_metadata requires string argument as its input"
259+
);
260+
}
254261
};
255262

256-
let file = File::open(name.clone())?;
263+
let file = File::open(filename.clone())?;
257264
let reader = SerializedFileReader::new(file)?;
258265
let metadata = reader.metadata();
259266

@@ -309,7 +316,7 @@ impl TableFunctionImpl for ParquetMetadataFunc {
309316
let mut total_uncompressed_size_arr = vec![];
310317
for (rg_idx, row_group) in metadata.row_groups().iter().enumerate() {
311318
for (col_idx, column) in row_group.columns().iter().enumerate() {
312-
filename_arr.push(name.clone());
319+
filename_arr.push(filename.clone());
313320
row_group_id_arr.push(rg_idx as i64);
314321
row_group_num_rows_arr.push(row_group.num_rows());
315322
row_group_num_columns_arr.push(row_group.num_columns() as i64);
@@ -320,38 +327,43 @@ impl TableFunctionImpl for ParquetMetadataFunc {
320327
path_in_schema_arr.push(column.column_path().to_string());
321328
type_arr.push(column.column_type().to_string());
322329
if let Some(s) = column.statistics() {
323-
let (min_val, max_val) = match s {
324-
Statistics::Boolean(val) => {
325-
(val.min().to_string(), val.max().to_string())
326-
}
327-
Statistics::Int32(val) => {
328-
(val.min().to_string(), val.max().to_string())
329-
}
330-
Statistics::Int64(val) => {
331-
(val.min().to_string(), val.max().to_string())
332-
}
333-
Statistics::Int96(val) => {
334-
(val.min().to_string(), val.max().to_string())
335-
}
336-
Statistics::Float(val) => {
337-
(val.min().to_string(), val.max().to_string())
338-
}
339-
Statistics::Double(val) => {
340-
(val.min().to_string(), val.max().to_string())
341-
}
342-
Statistics::ByteArray(val) => {
343-
(val.min().to_string(), val.max().to_string())
344-
}
345-
Statistics::FixedLenByteArray(val) => {
346-
(val.min().to_string(), val.max().to_string())
347-
}
330+
let (min_val, max_val) = if s.has_min_max_set() {
331+
let (min_val, max_val) = match s {
332+
Statistics::Boolean(val) => {
333+
(val.min().to_string(), val.max().to_string())
334+
}
335+
Statistics::Int32(val) => {
336+
(val.min().to_string(), val.max().to_string())
337+
}
338+
Statistics::Int64(val) => {
339+
(val.min().to_string(), val.max().to_string())
340+
}
341+
Statistics::Int96(val) => {
342+
(val.min().to_string(), val.max().to_string())
343+
}
344+
Statistics::Float(val) => {
345+
(val.min().to_string(), val.max().to_string())
346+
}
347+
Statistics::Double(val) => {
348+
(val.min().to_string(), val.max().to_string())
349+
}
350+
Statistics::ByteArray(val) => {
351+
(val.min().to_string(), val.max().to_string())
352+
}
353+
Statistics::FixedLenByteArray(val) => {
354+
(val.min().to_string(), val.max().to_string())
355+
}
356+
};
357+
(Some(min_val), Some(max_val))
358+
} else {
359+
(None, None)
348360
};
349-
stats_min_arr.push(Some(min_val.clone()));
350-
stats_max_arr.push(Some(max_val.clone()));
361+
stats_min_arr.push(min_val.clone());
362+
stats_max_arr.push(max_val.clone());
351363
stats_null_count_arr.push(Some(s.null_count() as i64));
352364
stats_distinct_count_arr.push(s.distinct_count().map(|c| c as i64));
353-
stats_min_value_arr.push(Some(min_val));
354-
stats_max_value_arr.push(Some(max_val));
365+
stats_min_value_arr.push(min_val);
366+
stats_max_value_arr.push(max_val);
355367
} else {
356368
stats_min_arr.push(None);
357369
stats_max_arr.push(None);

datafusion-cli/src/main.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,8 @@ fn extract_memory_pool_size(size: &str) -> Result<usize, String> {
331331

332332
#[cfg(test)]
333333
mod tests {
334+
use datafusion::assert_batches_eq;
335+
334336
use super::*;
335337

336338
fn assert_conversion(input: &str, expected: Result<usize, String>) {
@@ -388,4 +390,34 @@ mod tests {
388390

389391
Ok(())
390392
}
393+
394+
#[tokio::test]
395+
async fn test_parquet_metadata_works() -> Result<(), DataFusionError> {
396+
let ctx = SessionContext::new();
397+
ctx.register_udtf("parquet_metadata", Arc::new(ParquetMetadataFunc {}));
398+
399+
// input with single quote
400+
let sql =
401+
"SELECT * FROM parquet_metadata('../datafusion/core/tests/data/fixed_size_list_array.parquet')";
402+
let df = ctx.sql(&sql).await?;
403+
let rbs = df.collect().await?;
404+
405+
let excepted = [
406+
"+-------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+-----------+-----------+------------------+----------------------+-----------------+-----------------+-------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+",
407+
"| filename | row_group_id | row_group_num_rows | row_group_num_columns | row_group_bytes | column_id | file_offset | num_values | path_in_schema | type | stats_min | stats_max | stats_null_count | stats_distinct_count | stats_min_value | stats_max_value | compression | encodings | index_page_offset | dictionary_page_offset | data_page_offset | total_compressed_size | total_uncompressed_size |",
408+
"+-------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+-----------+-----------+------------------+----------------------+-----------------+-----------------+-------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+",
409+
"| ../datafusion/core/tests/data/fixed_size_list_array.parquet | 0 | 2 | 1 | 123 | 0 | 125 | 4 | \"f0.list.item\" | INT64 | 1 | 4 | 0 | | 1 | 4 | SNAPPY | [RLE_DICTIONARY, PLAIN, RLE] | | 4 | 46 | 121 | 123 |",
410+
"+-------------------------------------------------------------+--------------+--------------------+-----------------------+-----------------+-----------+-------------+------------+----------------+-------+-----------+-----------+------------------+----------------------+-----------------+-----------------+-------------+------------------------------+-------------------+------------------------+------------------+-----------------------+-------------------------+",
411+
];
412+
assert_batches_eq!(excepted, &rbs);
413+
414+
// input with double quote
415+
let sql =
416+
"SELECT * FROM parquet_metadata(\"../datafusion/core/tests/data/fixed_size_list_array.parquet\")";
417+
let df = ctx.sql(&sql).await?;
418+
let rbs = df.collect().await?;
419+
assert_batches_eq!(excepted, &rbs);
420+
421+
Ok(())
422+
}
391423
}

0 commit comments

Comments
 (0)