From 25d34ac9f7d735dd8d1fe051cd552a7ac17757f4 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 14 Mar 2019 07:37:07 -0600 Subject: [PATCH] Make INT32/64/96 handling consistent with C++ implementation --- rust/datafusion/src/datasource/parquet.rs | 17 +++++----- rust/parquet/src/reader/schema.rs | 38 +++++------------------ 2 files changed, 17 insertions(+), 38 deletions(-) diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs index e74645f67d20c..a5804ac91964f 100644 --- a/rust/datafusion/src/datasource/parquet.rs +++ b/rust/datafusion/src/datasource/parquet.rs @@ -22,7 +22,7 @@ use std::string::String; use std::sync::{Arc, Mutex}; use arrow::array::{Array, PrimitiveArray}; -use arrow::builder::{BinaryBuilder, Int64Builder, PrimitiveBuilder}; +use arrow::builder::{BinaryBuilder, PrimitiveBuilder}; use arrow::datatypes::*; use arrow::record_batch::RecordBatch; @@ -33,6 +33,7 @@ use parquet::reader::schema::parquet_to_arrow_schema; use crate::datasource::{RecordBatchIterator, ScanResult, Table}; use crate::execution::error::{ExecutionError, Result}; +use arrow::builder::TimestampNanosecondBuilder; pub struct ParquetTable { filename: String, @@ -283,7 +284,8 @@ impl ParquetFile { &mut read_buffer, )?; - let mut builder = Int64Builder::new(levels_read); + let mut builder = + TimestampNanosecondBuilder::new(levels_read); let mut value_index = 0; for i in 0..levels_read { if def_levels[i] > 0 { @@ -378,11 +380,10 @@ impl RecordBatchIterator for ParquetFile { #[cfg(test)] mod tests { use super::*; - use arrow::array::BooleanArray; - use arrow::array::Float32Array; - use arrow::array::Float64Array; - use arrow::array::Int64Array; - use arrow::array::{BinaryArray, Int32Array}; + use arrow::array::{ + BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array, + TimestampNanosecondArray, + }; use std::env; #[test] @@ -466,7 +467,7 @@ mod tests { let array = batch .column(0) .as_any() - .downcast_ref::() + .downcast_ref::() .unwrap(); let mut values: Vec = vec![]; for i in 0..batch.num_rows() { diff --git a/rust/parquet/src/reader/schema.rs b/rust/parquet/src/reader/schema.rs index 2e02c6c8476c7..5af07be7460f6 100644 --- a/rust/parquet/src/reader/schema.rs +++ b/rust/parquet/src/reader/schema.rs @@ -176,12 +176,12 @@ impl ParquetTypeConverter { fn to_primitive_type_inner(&self) -> Result { match self.schema.get_physical_type() { PhysicalType::BOOLEAN => Ok(DataType::Boolean), - PhysicalType::INT32 => self.to_int32(), - PhysicalType::INT64 => self.to_int64(), - PhysicalType::INT96 => self.to_int96(), + PhysicalType::INT32 => self.from_int32(), + PhysicalType::INT64 => self.from_int64(), + PhysicalType::INT96 => Ok(DataType::Timestamp(TimeUnit::Nanosecond)), PhysicalType::FLOAT => Ok(DataType::Float32), PhysicalType::DOUBLE => Ok(DataType::Float64), - PhysicalType::BYTE_ARRAY => self.to_byte_array(), + PhysicalType::BYTE_ARRAY => self.from_byte_array(), other => Err(ArrowError(format!( "Unable to convert parquet physical type {}", other @@ -189,7 +189,7 @@ impl ParquetTypeConverter { } } - fn to_int32(&self) -> Result { + fn from_int32(&self) -> Result { match self.schema.get_basic_info().logical_type() { LogicalType::NONE => Ok(DataType::Int32), LogicalType::UINT_8 => Ok(DataType::UInt8), @@ -199,7 +199,6 @@ impl ParquetTypeConverter { LogicalType::INT_16 => Ok(DataType::Int16), LogicalType::INT_32 => Ok(DataType::Int32), LogicalType::DATE => Ok(DataType::Date32(DateUnit::Millisecond)), - LogicalType::TIME_MICROS => Ok(DataType::Time32(TimeUnit::Microsecond)), LogicalType::TIME_MILLIS => Ok(DataType::Time32(TimeUnit::Millisecond)), other => Err(ArrowError(format!( "Unable to convert parquet INT32 logical type {}", @@ -208,14 +207,12 @@ impl ParquetTypeConverter { } } - fn to_int64(&self) -> Result { + fn from_int64(&self) -> Result { match self.schema.get_basic_info().logical_type() { LogicalType::NONE => Ok(DataType::Int64), LogicalType::INT_64 => Ok(DataType::Int64), LogicalType::UINT_64 => Ok(DataType::UInt64), - LogicalType::DATE => Ok(DataType::Date64(DateUnit::Millisecond)), LogicalType::TIME_MICROS => Ok(DataType::Time64(TimeUnit::Microsecond)), - LogicalType::TIME_MILLIS => Ok(DataType::Time64(TimeUnit::Millisecond)), LogicalType::TIMESTAMP_MICROS => { Ok(DataType::Timestamp(TimeUnit::Microsecond)) } @@ -229,31 +226,12 @@ impl ParquetTypeConverter { } } - fn to_int96(&self) -> Result { - match self.schema.get_basic_info().logical_type() { - LogicalType::NONE => Ok(DataType::Int64), - LogicalType::DATE => Ok(DataType::Date64(DateUnit::Millisecond)), - LogicalType::TIME_MICROS => Ok(DataType::Time64(TimeUnit::Microsecond)), - LogicalType::TIME_MILLIS => Ok(DataType::Time64(TimeUnit::Millisecond)), - LogicalType::TIMESTAMP_MICROS => { - Ok(DataType::Timestamp(TimeUnit::Microsecond)) - } - LogicalType::TIMESTAMP_MILLIS => { - Ok(DataType::Timestamp(TimeUnit::Millisecond)) - } - other => Err(ArrowError(format!( - "Unable to convert parquet INT96 logical type {}", - other - ))), - } - } - - fn to_byte_array(&self) -> Result { + fn from_byte_array(&self) -> Result { match self.schema.get_basic_info().logical_type() { LogicalType::NONE => Ok(DataType::Utf8), LogicalType::UTF8 => Ok(DataType::Utf8), other => Err(ArrowError(format!( - "Unable to convert parquet logical type {}", + "Unable to convert parquet BYTE_ARRAY logical type {}", other ))), }