Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-4466: [Rust] [DataFusion] Add support for Parquet data source #3851

Closed
wants to merge 38 commits into from

Conversation

andygrove
Copy link
Member

I'm sure I'll need some guidance on this one from @sunchao or @liurenjie1024 but I am keen to get parquet support added for primitive types so that I can actually use DataFusion and Arrow in production at some point.

@andygrove
Copy link
Member Author

@liurenjie1024 should i use the row reader for now ?

@andygrove
Copy link
Member Author

row_iter seems like safest path for now but I don't know how to check for null values

@andygrove
Copy link
Member Author

switched back to column readers ... fixed bugs ... ready for first review (this is still WIP)

@andygrove andygrove changed the title ARROW-4466: [Rust] [DataFusion] Add support for Parquet data source [WIP] ARROW-4466: [Rust] [DataFusion] Add support for Parquet data source Mar 10, 2019
@andygrove andygrove requested a review from kszucs March 10, 2019 16:59
@@ -751,6 +752,22 @@ impl Schema {
"fields": self.fields.iter().map(|field| field.to_json()).collect::<Vec<Value>>(),
})
}

/// Create a new schema by applying a projection to this schema's fields
pub fn projection(&self, projection: &Vec<usize>) -> Result<Arc<Schema>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we use &[usize] instead of &Vec<usize>?


impl ParquetFile {
pub fn open(file: File, projection: Option<Vec<usize>>) -> Result<Self> {
println!("open()");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove these printlns?


impl ParquetTable {
pub fn new(filename: &str) -> Self {
let file = File::open(filename).unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it safe to unwrap here? what if the input file doesn't exist?

physical_type,
..
} => {
let arrow_type = match physical_type {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should look at logical type instead of physical type here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean basic_info.logical_type()? If so, the value is NONE for every column in the test parquet file

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the NONE case it should be converted to the corresponding type, e.g., PhysicalType::INT32 -> int, PhysicalType::INT64 -> long, etc. Again you can check here or here for reference.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Now using parquet::reader::schema::parquet_to_arrow_schema


Ok(Field::new(basic_info.name(), arrow_type, false))
}
Type::GroupType { basic_info, fields } => Ok(Field::new(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Group type can also represent list, map, etc. We should not convert them all to struct.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Now using parquet::reader::schema::parquet_to_arrow_schema and also added checks for non primitives and throw an Err in those cases.

}
}

fn load_next_row_group(&mut self) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should return a Result from this function? also in the else branch we should not just panic.


match r.read_batch(
self.batch_size,
None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not ignore definition levels and repetition levels. Otherwise nulls and nested data types may not handled properly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand what I need to do here. For now I think we should limit support to simple types so I'm not worried about nesting yet, but I do want to support null values.

Copy link
Member

@sunchao sunchao Mar 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, you should only pass in None for def_levels when you know that the column is required. Otherwise, you should pass in a mutable slice with batch_size length, which will be filled up with the def_levels for the values. Note that the number of values filled in by this method will always be equal or less than the batch_size.

For instance, if batch_size is 10, and there are 3 null values, then the def_levels will contain 10 entries while values will only contain 7 entries (occupy the first 7 slot for the input values slice - the rest will just be the default value).

After calling this method, you'll need to inspect the def_levels vector to find out the null values. A value is non-null iff the corresponding def_level equals to max_def_level.

An example can be found here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a simple POC, I think it's better to ignore nulls and nest types. For null elements you need an spaced reader.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the help. This makes sense. I am going to implement this using generics otherwise the code will be too verbose.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generics are too hard because Parquet and Arrow crates have different data types... I tried

trait ArrowReader<T> where T: ArrowPrimitiveType {
    fn read(&mut self, batch_size: usize, is_nullable: bool) -> Result<Arc<PrimitiveArray<T>>>;
}

impl<T,P> ArrowReader<T> for ColumnReaderImpl<P> where T: ArrowPrimitiveType, P: parquet::data_type::DataType {
    fn read(&mut self, batch_size: usize, is_nullable: bool) -> Result<Arc<PrimitiveArray<T>>> {

        // create read buffer
        let mut read_buffer: Vec<P::T> =
            Vec::with_capacity(batch_size);

        for _ in 0..batch_size {
            read_buffer.push(T::default_value());
        }

        let (values_read, _) = self.read_batch(
            batch_size,
            None,
            None,
            &mut read_buffer,
        )?;

        let mut builder = PrimitiveBuilder::<T>::new(values_read);
        builder.append_slice(&read_buffer[0..values_read])?;
        Ok(Arc::new(builder.finish()))
    }
}

but get errors like this

    = note: expected type `&[<T as arrow::datatypes::ArrowPrimitiveType>::Native]`
               found type `&[<P as parquet::data_type::DataType>::T]`

I guess I'll do macros for now.

Copy link
Contributor

@nevi-me nevi-me Mar 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @andygrove,

You could rewrite the above as:

trait ArrowReader<T> where T: ArrowPrimitiveType {
    fn read(&mut self, batch_size: usize, is_nullable: bool) -> Result<Arc<PrimitiveArray<T>>>;
}

impl<A,P> ArrowReader<A> for ColumnReaderImpl<P> 
where
    A: ArrowPrimitiveType, 
    P: parquet::data_type::DataType,
    // the problem's that we didn't have a trait bound that allows for converting between Parquet native and Arrow native
    P::T: std::convert::From<A::Native>,
    A::Native: std::convert::From<P::T>,
{
    fn read(&mut self, batch_size: usize, is_nullable: bool) -> Result<Arc<PrimitiveArray<A>>> {

        // create read buffer
        let mut read_buffer: Vec<P::T> =
            Vec::with_capacity(batch_size);

        for _ in 0..batch_size {
            // convert from Arrow native to Parquet native
            read_buffer.push(A::default_value().into());
        }

        let (values_read, _) = self.read_batch(
            batch_size,
            None,
            None,
            &mut read_buffer,
        )?;

        let mut builder = PrimitiveBuilder::<A>::new(values_read);
        // need to convert the vec of Parquet native types to Arrow native types
        // otherwise we could create std::convert::From<Vec<P::T>> to Vec<T::Native>
        let converted_buffer: Vec<A::Native> = read_buffer.into_iter().map(|v| v.into()).collect();
        builder.append_slice(&converted_buffer[0..values_read])?;
        Ok(Arc::new(builder.finish()))
    }
}

This compiles, but I don't know where in your code I'd put this so that I can test if it's doing the right thing.

@sunchao @liurenjie1024 might it benefit us to change the DataType::T below to DataType::Native like we have with Arrow? I had to change the T in the above code to A because P::T was confusing, where P::Native would have been easier to read.

// parquet/src/data_type.rs
macro_rules! make_type {
    ($name:ident, $physical_ty:path, $native_ty:ty, $size:expr) => {
        pub struct $name {}

        impl DataType for $name {
            type T = $native_ty;

            fn get_physical_type() -> Type {
                $physical_ty
            }

            fn get_type_size() -> usize {
                $size
            }
        }
    };
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andygrove here's the implementation as a PR in your fork:: andygrove#1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, thanks @nevi-me !

@andygrove
Copy link
Member Author

@sunchao I fixed the nits but I need some guidance on checking against logical types and how definition/repetition levels work when you have some time. Maybe you can point me to some code I can learn from?

@sunchao
Copy link
Member

sunchao commented Mar 10, 2019

@andygrove Yes. To convert group type, you can check here, which actually implements the conversion from Parquet to Arrow schema. For an introduction on definition/repetition levels, you can read this article, which I found pretty helpful.

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply. Left some comments, and for a simple implementation without nulls and nested types, you can refer to my previous implementation https://github.com/liurenjie1024/zeus/blob/arrow/zeus-server/src/storage/blizard_storage/reader.rs.


match r.read_batch(
self.batch_size,
None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a simple POC, I think it's better to ignore nulls and nest types. For null elements you need an spaced reader.

}
}

fn to_arrow(t: &Type) -> Result<Field> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can use the schema converter in parquet::reader::schema::parquet_to_arrow_schema?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -177,6 +177,7 @@ impl ParquetTypeConverter {
PhysicalType::BOOLEAN => Ok(DataType::Boolean),
PhysicalType::INT32 => self.to_int32(),
PhysicalType::INT64 => self.to_int64(),
PhysicalType::INT96 => self.to_int64(),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INT96 is for timestamp in nano and when reading to arrow I am converting to timestamp in ms

@andygrove
Copy link
Member Author

@sunchao @liurenjie1024 This is ready for another review ... I am checking if the field is nullable, and if so, I pass def levels.

If values_read == levels_read then there are no null values and I read as usual.

If there are null values I return an Err so I think this makes it safe for the first pass and we can add null support in a follow up PR.

I would also like to work with you both on a generic arrow reader.

fn create_binary_array(b: &Vec<ByteArray>, row_count: usize) -> Result<Arc<BinaryArray>> {
let mut builder = BinaryBuilder::new(b.len());
for j in 0..row_count {
let slice = b[j].slice(0, b[j].len());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to call the slice here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

None,
&mut read_buffer,
)?;
let mut builder = $BUILDER::new(levels_read);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we extract these common code out of the if statement:

let mut builder = $BUILDER::new(values_read);
builder.append_slice(&read_buffer[0..values_read])?;
Arc::new(builder.finish())

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored this to remove duplication - i was prematurely optimizing

macro_rules! read_column {
($SELF:ident, $R:ident, $INDEX:expr, $BUILDER:ident, $TY:ident, $DEFAULT:expr) => {{
//TODO: should be able to get num_rows in row group instead of defaulting to batch size
let mut read_buffer: Vec<$TY> = Vec::with_capacity($SELF.batch_size);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can replace this with:

let mut read_buffer: Vec<$TY> = vec![$DEFAULT; $SELF.batch_size];

/// Execute query and return result set as tab delimited string
fn execute(ctx: &mut ExecutionContext, sql: &str) -> String {
let results = ctx.sql(&sql, DEFAULT_BATCH_SIZE).unwrap();
let plan = ctx.create_logical_plan(&sql).unwrap();
println!("Plan: {:?}", plan);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove the println?

@andygrove
Copy link
Member Author

@sunchao @nevi-me @liurenjie1024 I cleaned the code up to remove duplication, added null support, and addressed other feedback.

I have been testing null support manually so far since there are no suitable test files in this repo. I will create a separate PR for creating a suitable file to test with.


use crate::datasource::{RecordBatchIterator, ScanResult, Table};
use crate::execution::error::{ExecutionError, Result};
use arrow::builder::{BinaryBuilder, Int64Builder};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we could condense some of these imports into fewer lines

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

&mut read_buffer,
)?;

let mut builder = Int64Builder::new(levels_read);
Copy link
Contributor

@nevi-me nevi-me Mar 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EDIT: You can ignore the below, doesn't work.


What do you think of using TimestampMillisecondBuilder for millisecond timestamp? It should be a drop-in replacement to Int64Builder

}

/// convert a parquet timestamp in nanoseconds to a timestamp with milliseconds
fn convert_int96_timestamp(v: &[u32]) -> i64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As DataType::Timestamp supports nanosecond precision, might it not be better to keep the resolution at nanosecond level and use a TimestampNanosecond? @xhochy do we have to worry about int96 conversion semantics from apache/parquet-format#49?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andygrove I'll submit another PR that addresses this, please see (https://gist.github.com/nevi-me/574038fdec8e9c207f661813789d58fb)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the fix: andygrove#2

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, INT96 is that weird. Please ensure that you don't write them by default ;)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @nevi-me and @xhochy !

@andygrove
Copy link
Member Author

@sunchao @liurenjie1024 @nevi-me Ready for re-review. I compared the schema converter with the C++ implementation and made it consistent.

)?;

let mut builder =
TimestampNanosecondBuilder::new(levels_read);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry about the back and forth @andygrove, the builder should be TimestampMillisecondBuilder in this instance as convert_int96_timestamp still returns milliseconds. Alternatively we could change seconds * MILLIS_PER_SECOND + nanoseconds / 1_000_000 to seconds * MILLIS_PER_SECOND * 1_000_000 + nanoseconds to keep nano precision (which I prefer).

The original JIRA ticket that deprecated INT96 (https://issues.apache.org/jira/browse/PARQUET-323) based it on the reason that nanosecond precision is 'rarely a real requirement'.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Thanks. Since the schema converter says that INT96 is timestamp in nanoseconds (just like the C++ impl) we should convert to nanoseconds, so I updated as you suggested.

let array = batch
.column(0)
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will downcast to millisecond per the related comment. As an aside, the nice thing with temporal arrays now is that we can do the below (which isn't necessary for this test):

let mut values = vec![];
for i in 0..batch.num_rows() {
    values.push(array.datetime(i));
}

assert_eq!("2001-03-31 12:00:00, ...", format!("{:?}", values);

@@ -751,6 +752,22 @@ impl Schema {
"fields": self.fields.iter().map(|field| field.to_json()).collect::<Vec<Value>>(),
})
}

/// Create a new schema by applying a projection to this schema's fields
pub fn projection(&self, projection: &[usize]) -> Result<Arc<Schema>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this belong in datafusion as a free function? It doesn't seem like this will be used within the arrow sub-crate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm 50/50 on this but I moved it to datafusion for now.

}

impl ParquetTable {
pub fn try_new(filename: &str) -> Result<Self> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe filename should be of type AsRef<Path> just like File::open? (probably does not need to be included in this PR)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I'd prefer to look at that as a separate PR as you suggested.

Copy link
Contributor

@nevi-me nevi-me left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @andygrove

@andygrove
Copy link
Member Author

Thanks @nevi-me.

@sunchao @paddyhoran I think this is ready to go now?

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @andygrove . Looks much better!

is_nullable: bool,
) -> Result<Arc<PrimitiveArray<A>>> {
// create read buffer
let mut read_buffer: Vec<P::T> = vec![A::default_value().into(); batch_size];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can replace A::default_value().into() with P::T::default() - save a into() call.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had problems making this change. Will look at this as separate PR too.

if self.row_group_index < self.reader.num_row_groups() {
let reader = self.reader.get_row_group(self.row_group_index)?;

self.column_readers = Vec::with_capacity(self.projection.len());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we call self.column_readers.clear() here?

row_group_index: 0,
projection_schema: projected_schema,
projection,
batch_size: 64 * 1024,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extract this as a constant.

is_nullable,
)?
}
ColumnReader::Int64ColumnReader(ref mut r) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we look at the logical type for the int64? it may need to be converted to timestamp or decimal. Same for int32.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's a good point.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. It's fine to do it in a separate PR though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would you feel about doing this as separate PRs after this is merged?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That's fine for me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. Once you and/or Paddy have approved this one, I'll merge and start on the date/time support.


fn next(&mut self) -> Result<Option<RecordBatch>> {
// advance the row group reader if necessary
if self.current_row_group.is_none() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at some point we may need to think about small row groups versus big batch size, so that we may need to read across row group boundaries.

})
}

fn load_next_row_group(&mut self) -> Result<()> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have a test for this case as well? i.e., loading multiple row groups.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@sunchao
Copy link
Member

sunchao commented Mar 15, 2019

LGTM - pending on CI.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants