Skip to content

Commit

Permalink
Make INT32/64/96 handling consistent with C++ implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Mar 14, 2019
1 parent 9b1308f commit 25d34ac
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 38 deletions.
17 changes: 9 additions & 8 deletions rust/datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::string::String;
use std::sync::{Arc, Mutex};

use arrow::array::{Array, PrimitiveArray};
use arrow::builder::{BinaryBuilder, Int64Builder, PrimitiveBuilder};
use arrow::builder::{BinaryBuilder, PrimitiveBuilder};
use arrow::datatypes::*;
use arrow::record_batch::RecordBatch;

Expand All @@ -33,6 +33,7 @@ use parquet::reader::schema::parquet_to_arrow_schema;

use crate::datasource::{RecordBatchIterator, ScanResult, Table};
use crate::execution::error::{ExecutionError, Result};
use arrow::builder::TimestampNanosecondBuilder;

pub struct ParquetTable {
filename: String,
Expand Down Expand Up @@ -283,7 +284,8 @@ impl ParquetFile {
&mut read_buffer,
)?;

let mut builder = Int64Builder::new(levels_read);
let mut builder =
TimestampNanosecondBuilder::new(levels_read);
let mut value_index = 0;
for i in 0..levels_read {
if def_levels[i] > 0 {
Expand Down Expand Up @@ -378,11 +380,10 @@ impl RecordBatchIterator for ParquetFile {
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::BooleanArray;
use arrow::array::Float32Array;
use arrow::array::Float64Array;
use arrow::array::Int64Array;
use arrow::array::{BinaryArray, Int32Array};
use arrow::array::{
BinaryArray, BooleanArray, Float32Array, Float64Array, Int32Array,
TimestampNanosecondArray,
};
use std::env;

#[test]
Expand Down Expand Up @@ -466,7 +467,7 @@ mod tests {
let array = batch
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap();
let mut values: Vec<i64> = vec![];
for i in 0..batch.num_rows() {
Expand Down
38 changes: 8 additions & 30 deletions rust/parquet/src/reader/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,20 +176,20 @@ impl ParquetTypeConverter {
fn to_primitive_type_inner(&self) -> Result<DataType> {
match self.schema.get_physical_type() {
PhysicalType::BOOLEAN => Ok(DataType::Boolean),
PhysicalType::INT32 => self.to_int32(),
PhysicalType::INT64 => self.to_int64(),
PhysicalType::INT96 => self.to_int96(),
PhysicalType::INT32 => self.from_int32(),
PhysicalType::INT64 => self.from_int64(),
PhysicalType::INT96 => Ok(DataType::Timestamp(TimeUnit::Nanosecond)),
PhysicalType::FLOAT => Ok(DataType::Float32),
PhysicalType::DOUBLE => Ok(DataType::Float64),
PhysicalType::BYTE_ARRAY => self.to_byte_array(),
PhysicalType::BYTE_ARRAY => self.from_byte_array(),
other => Err(ArrowError(format!(
"Unable to convert parquet physical type {}",
other
))),
}
}

fn to_int32(&self) -> Result<DataType> {
fn from_int32(&self) -> Result<DataType> {
match self.schema.get_basic_info().logical_type() {
LogicalType::NONE => Ok(DataType::Int32),
LogicalType::UINT_8 => Ok(DataType::UInt8),
Expand All @@ -199,7 +199,6 @@ impl ParquetTypeConverter {
LogicalType::INT_16 => Ok(DataType::Int16),
LogicalType::INT_32 => Ok(DataType::Int32),
LogicalType::DATE => Ok(DataType::Date32(DateUnit::Millisecond)),
LogicalType::TIME_MICROS => Ok(DataType::Time32(TimeUnit::Microsecond)),
LogicalType::TIME_MILLIS => Ok(DataType::Time32(TimeUnit::Millisecond)),
other => Err(ArrowError(format!(
"Unable to convert parquet INT32 logical type {}",
Expand All @@ -208,14 +207,12 @@ impl ParquetTypeConverter {
}
}

fn to_int64(&self) -> Result<DataType> {
fn from_int64(&self) -> Result<DataType> {
match self.schema.get_basic_info().logical_type() {
LogicalType::NONE => Ok(DataType::Int64),
LogicalType::INT_64 => Ok(DataType::Int64),
LogicalType::UINT_64 => Ok(DataType::UInt64),
LogicalType::DATE => Ok(DataType::Date64(DateUnit::Millisecond)),
LogicalType::TIME_MICROS => Ok(DataType::Time64(TimeUnit::Microsecond)),
LogicalType::TIME_MILLIS => Ok(DataType::Time64(TimeUnit::Millisecond)),
LogicalType::TIMESTAMP_MICROS => {
Ok(DataType::Timestamp(TimeUnit::Microsecond))
}
Expand All @@ -229,31 +226,12 @@ impl ParquetTypeConverter {
}
}

fn to_int96(&self) -> Result<DataType> {
match self.schema.get_basic_info().logical_type() {
LogicalType::NONE => Ok(DataType::Int64),
LogicalType::DATE => Ok(DataType::Date64(DateUnit::Millisecond)),
LogicalType::TIME_MICROS => Ok(DataType::Time64(TimeUnit::Microsecond)),
LogicalType::TIME_MILLIS => Ok(DataType::Time64(TimeUnit::Millisecond)),
LogicalType::TIMESTAMP_MICROS => {
Ok(DataType::Timestamp(TimeUnit::Microsecond))
}
LogicalType::TIMESTAMP_MILLIS => {
Ok(DataType::Timestamp(TimeUnit::Millisecond))
}
other => Err(ArrowError(format!(
"Unable to convert parquet INT96 logical type {}",
other
))),
}
}

fn to_byte_array(&self) -> Result<DataType> {
fn from_byte_array(&self) -> Result<DataType> {
match self.schema.get_basic_info().logical_type() {
LogicalType::NONE => Ok(DataType::Utf8),
LogicalType::UTF8 => Ok(DataType::Utf8),
other => Err(ArrowError(format!(
"Unable to convert parquet logical type {}",
"Unable to convert parquet BYTE_ARRAY logical type {}",
other
))),
}
Expand Down

0 comments on commit 25d34ac

Please sign in to comment.