Skip to content

Commit

Permalink
implement more parquet column types and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Mar 14, 2019
1 parent 5ce3086 commit b4981ed
Showing 1 changed file with 222 additions and 34 deletions.
256 changes: 222 additions & 34 deletions rust/datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ use parquet::schema::types::Type;

use crate::datasource::{RecordBatchIterator, Table};
use crate::execution::error::{ExecutionError, Result};
use arrow::builder::{BinaryBuilder, Float64Builder, Int32Builder};
use arrow::builder::BooleanBuilder;
use arrow::builder::Int64Builder;
use arrow::builder::{BinaryBuilder, Float32Builder, Float64Builder, Int32Builder};

pub struct ParquetTable {
filename: String,
Expand Down Expand Up @@ -112,7 +114,6 @@ impl ParquetFile {
.collect();

let projected_schema = Arc::new(Schema::new(projected_fields));
println!("projected schema: {:?}", projected_schema);

Ok(ParquetFile {
reader: reader,
Expand All @@ -132,7 +133,6 @@ impl ParquetFile {

fn load_next_row_group(&mut self) {
if self.row_group_index < self.reader.num_row_groups() {
//println!("Loading row group {} of {}", self.row_group_index, self.reader.num_row_groups());
let reader = self.reader.get_row_group(self.row_group_index).unwrap();

self.column_readers = vec![];
Expand All @@ -151,17 +151,40 @@ impl ParquetFile {
}

fn load_batch(&mut self) -> Result<Option<RecordBatch>> {
println!("load_batch()");
match &self.current_row_group {
Some(reader) => {
let mut batch: Vec<Arc<Array>> = Vec::with_capacity(reader.num_columns());
let mut row_count = 0;
for i in 0..self.column_readers.len() {
let array: Arc<Array> = match self.column_readers[i] {
ColumnReader::BoolColumnReader(ref mut _r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (BOOL)".to_string(),
));
ColumnReader::BoolColumnReader(ref mut r) => {
let mut read_buffer: Vec<bool> =
Vec::with_capacity(self.batch_size);

for _ in 0..self.batch_size {
read_buffer.push(false);
}

match r.read_batch(
self.batch_size,
None,
None,
&mut read_buffer,
) {
//TODO this isn't handling null values
Ok((count, _)) => {
let mut builder = BooleanBuilder::new(count);
builder.append_slice(&read_buffer[0..count]).unwrap();
row_count = count;
Arc::new(builder.finish())
}
Err(e) => {
return Err(ExecutionError::NotImplemented(format!(
"Error reading parquet batch (column {}): {:?}",
i, e
)));
}
}
}
ColumnReader::Int32ColumnReader(ref mut r) => {
let mut read_buffer: Vec<i32> =
Expand All @@ -179,39 +202,87 @@ impl ParquetFile {
) {
//TODO this isn't handling null values
Ok((count, _)) => {
println!("Read {} rows", count);
let mut builder = Int32Builder::new(count);
builder.append_slice(&read_buffer[0..count]).unwrap();
row_count = count;
Arc::new(builder.finish())
}
_ => {
Err(e) => {
return Err(ExecutionError::NotImplemented(format!(
"Error reading parquet batch (column {})",
i
"Error reading parquet batch (column {}): {:?}",
i, e
)));
}
}
}
ColumnReader::Int64ColumnReader(ref mut _r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (INT64)".to_string(),
));
ColumnReader::Int64ColumnReader(ref mut r) => {
let mut read_buffer: Vec<i64> =
Vec::with_capacity(self.batch_size);

for _ in 0..self.batch_size {
read_buffer.push(0);
}

match r.read_batch(
self.batch_size,
None,
None,
&mut read_buffer,
) {
//TODO this isn't handling null values
Ok((count, _)) => {
let mut builder = Int64Builder::new(count);
builder.append_slice(&read_buffer[0..count]).unwrap();
row_count = count;
Arc::new(builder.finish())
}
Err(e) => {
return Err(ExecutionError::NotImplemented(format!(
"Error reading parquet batch (column {}): {:?}",
i, e
)));
}
}
}
ColumnReader::Int96ColumnReader(ref mut _r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (INT96)".to_string(),
));
}
ColumnReader::FloatColumnReader(ref mut _r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (FLOAT)".to_string(),
));
ColumnReader::FloatColumnReader(ref mut r) => {
let mut builder = Float32Builder::new(self.batch_size);
let mut read_buffer: Vec<f32> =
Vec::with_capacity(self.batch_size);
for _ in 0..self.batch_size {
read_buffer.push(0.0);
}
match r.read_batch(
self.batch_size,
None,
None,
&mut read_buffer,
) {
//TODO this isn't handling null values
Ok((count, _)) => {
builder.append_slice(&read_buffer[0..count]).unwrap();
row_count = count;
Arc::new(builder.finish())
}
Err(e) => {
return Err(ExecutionError::NotImplemented(format!(
"Error reading parquet batch (column {}): {:?}",
i, e
)));
}
}
}
ColumnReader::DoubleColumnReader(ref mut r) => {
let mut builder = Float64Builder::new(self.batch_size);
let mut read_buffer: Vec<f64> =
Vec::with_capacity(self.batch_size);
for _ in 0..self.batch_size {
read_buffer.push(0.0);
}
match r.read_batch(
self.batch_size,
None,
Expand All @@ -220,14 +291,14 @@ impl ParquetFile {
) {
//TODO this isn't handling null values
Ok((count, _)) => {
builder.append_slice(&read_buffer).unwrap();
builder.append_slice(&read_buffer[0..count]).unwrap();
row_count = count;
Arc::new(builder.finish())
}
_ => {
Err(e) => {
return Err(ExecutionError::NotImplemented(format!(
"Error reading parquet batch (column {})",
i
"Error reading parquet batch (column {}): {:?}",
i, e
)));
}
}
Expand All @@ -251,11 +322,13 @@ impl ParquetFile {
//TODO this is horribly inefficient
let mut builder = BinaryBuilder::new(row_count);
for j in 0..row_count {
let foo = b[j].slice(0, b[j].len());
let bytes: &[u8] = foo.data();
let str =
String::from_utf8(bytes.to_vec()).unwrap();
builder.append_string(&str).unwrap();
let slice = b[j].slice(0, b[j].len());
builder
.append_string(
&String::from_utf8(slice.data().to_vec())
.unwrap(),
)
.unwrap();
}
Arc::new(builder.finish())
}
Expand Down Expand Up @@ -354,11 +427,43 @@ impl RecordBatchIterator for ParquetFile {
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::BooleanArray;
use arrow::array::Float32Array;
use arrow::array::Float64Array;
use arrow::array::Int64Array;
use arrow::array::{BinaryArray, Int32Array};
use std::env;

#[test]
fn read_read_i32_column() {
fn read_bool_alltypes_plain_parquet() {
let table = load_table("alltypes_plain.parquet");

let projection = Some(vec![1]);
let scan = table.scan(&projection, 1024).unwrap();
let mut it = scan.borrow_mut();
let batch = it.next().unwrap().unwrap();

assert_eq!(1, batch.num_columns());
assert_eq!(8, batch.num_rows());

let array = batch
.column(0)
.as_any()
.downcast_ref::<BooleanArray>()
.unwrap();
let mut values: Vec<bool> = vec![];
for i in 0..batch.num_rows() {
values.push(array.value(i));
}

assert_eq!(
"[true, false, true, false, true, false, true, false]",
format!("{:?}", values)
);
}

#[test]
fn read_i32_alltypes_plain_parquet() {
let table = load_table("alltypes_plain.parquet");

let projection = Some(vec![0]);
Expand All @@ -383,7 +488,63 @@ mod tests {
}

#[test]
fn read_read_string_column() {
fn read_f32_alltypes_plain_parquet() {
let table = load_table("alltypes_plain.parquet");

let projection = Some(vec![6]);
let scan = table.scan(&projection, 1024).unwrap();
let mut it = scan.borrow_mut();
let batch = it.next().unwrap().unwrap();

assert_eq!(1, batch.num_columns());
assert_eq!(8, batch.num_rows());

let array = batch
.column(0)
.as_any()
.downcast_ref::<Float32Array>()
.unwrap();
let mut values: Vec<f32> = vec![];
for i in 0..batch.num_rows() {
values.push(array.value(i));
}

assert_eq!(
"[0.0, 1.1, 0.0, 1.1, 0.0, 1.1, 0.0, 1.1]",
format!("{:?}", values)
);
}

#[test]
fn read_f64_alltypes_plain_parquet() {
let table = load_table("alltypes_plain.parquet");

let projection = Some(vec![7]);
let scan = table.scan(&projection, 1024).unwrap();
let mut it = scan.borrow_mut();
let batch = it.next().unwrap().unwrap();

assert_eq!(1, batch.num_columns());
assert_eq!(8, batch.num_rows());

let array = batch
.column(0)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
let mut values: Vec<f64> = vec![];
for i in 0..batch.num_rows() {
values.push(array.value(i));
}

assert_eq!(
"[0.0, 10.1, 0.0, 10.1, 0.0, 10.1, 0.0, 10.1]",
format!("{:?}", values)
);
}

#[test]
fn read_utf8_alltypes_plain_parquet() {
let table = load_table("alltypes_plain.parquet");

let projection = Some(vec![9]);
Expand All @@ -400,7 +561,7 @@ mod tests {
.downcast_ref::<BinaryArray>()
.unwrap();
let mut values: Vec<String> = vec![];
for i in 0..8 {
for i in 0..batch.num_rows() {
let str: String = String::from_utf8(array.value(i).to_vec()).unwrap();
values.push(str);
}
Expand All @@ -411,12 +572,39 @@ mod tests {
);
}

#[test]
fn read_int64_nullable_impala_parquet() {
let table = load_table("nullable.impala.parquet");

let projection = Some(vec![0]);
let scan = table.scan(&projection, 1024).unwrap();
let mut it = scan.borrow_mut();
let batch = it.next().unwrap().unwrap();

assert_eq!(1, batch.num_columns());
assert_eq!(7, batch.num_rows());

let array = batch
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
let mut values: Vec<i64> = vec![];
for i in 0..batch.num_rows() {
values.push(array.value(i));
}

assert_eq!("[1, 2, 3, 4, 5, 6, 7]", format!("{:?}", values));
}

fn load_table(name: &str) -> Box<Table> {
println!("load_table");
let testdata = env::var("PARQUET_TEST_DATA").unwrap();
let filename = format!("{}/{}", testdata, name);
let table = ParquetTable::new(&filename);
println!("{:?}", table.schema());
println!("Loading file {} with schema:", name);
for field in table.schema().fields() {
println!("\t{:?}", field);
}
Box::new(table)
}
}

0 comments on commit b4981ed

Please sign in to comment.