-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-4466: [Rust] [DataFusion] Add support for Parquet data source #3851
Conversation
@liurenjie1024 should i use the row reader for now ? |
|
switched back to column readers ... fixed bugs ... ready for first review (this is still WIP) |
rust/arrow/src/datatypes.rs
Outdated
@@ -751,6 +752,22 @@ impl Schema { | |||
"fields": self.fields.iter().map(|field| field.to_json()).collect::<Vec<Value>>(), | |||
}) | |||
} | |||
|
|||
/// Create a new schema by applying a projection to this schema's fields | |||
pub fn projection(&self, projection: &Vec<usize>) -> Result<Arc<Schema>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we use &[usize]
instead of &Vec<usize>
?
|
||
impl ParquetFile { | ||
pub fn open(file: File, projection: Option<Vec<usize>>) -> Result<Self> { | ||
println!("open()"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's remove these println
s?
|
||
impl ParquetTable { | ||
pub fn new(filename: &str) -> Self { | ||
let file = File::open(filename).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it safe to unwrap here? what if the input file doesn't exist?
physical_type, | ||
.. | ||
} => { | ||
let arrow_type = match physical_type { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should look at logical type instead of physical type here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean basic_info.logical_type()
? If so, the value is NONE
for every column in the test parquet file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Now using parquet::reader::schema::parquet_to_arrow_schema
|
||
Ok(Field::new(basic_info.name(), arrow_type, false)) | ||
} | ||
Type::GroupType { basic_info, fields } => Ok(Field::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Group type can also represent list, map, etc. We should not convert them all to struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Now using parquet::reader::schema::parquet_to_arrow_schema
and also added checks for non primitives and throw an Err
in those cases.
} | ||
} | ||
|
||
fn load_next_row_group(&mut self) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should return a Result
from this function? also in the else
branch we should not just panic
.
|
||
match r.read_batch( | ||
self.batch_size, | ||
None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not ignore definition levels and repetition levels. Otherwise nulls and nested data types may not handled properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really understand what I need to do here. For now I think we should limit support to simple types so I'm not worried about nesting yet, but I do want to support null values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, you should only pass in None
for def_levels
when you know that the column is required. Otherwise, you should pass in a mutable slice with batch_size
length, which will be filled up with the def_levels
for the values. Note that the number of values filled in by this method will always be equal or less than the batch_size
.
For instance, if batch_size
is 10, and there are 3 null values, then the def_levels
will contain 10 entries while values
will only contain 7 entries (occupy the first 7 slot for the input values
slice - the rest will just be the default value).
After calling this method, you'll need to inspect the def_levels
vector to find out the null values. A value is non-null iff the corresponding def_level
equals to max_def_level
.
An example can be found here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a simple POC, I think it's better to ignore nulls and nest types. For null elements you need an spaced reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the help. This makes sense. I am going to implement this using generics otherwise the code will be too verbose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generics are too hard because Parquet and Arrow crates have different data types... I tried
trait ArrowReader<T> where T: ArrowPrimitiveType {
fn read(&mut self, batch_size: usize, is_nullable: bool) -> Result<Arc<PrimitiveArray<T>>>;
}
impl<T,P> ArrowReader<T> for ColumnReaderImpl<P> where T: ArrowPrimitiveType, P: parquet::data_type::DataType {
fn read(&mut self, batch_size: usize, is_nullable: bool) -> Result<Arc<PrimitiveArray<T>>> {
// create read buffer
let mut read_buffer: Vec<P::T> =
Vec::with_capacity(batch_size);
for _ in 0..batch_size {
read_buffer.push(T::default_value());
}
let (values_read, _) = self.read_batch(
batch_size,
None,
None,
&mut read_buffer,
)?;
let mut builder = PrimitiveBuilder::<T>::new(values_read);
builder.append_slice(&read_buffer[0..values_read])?;
Ok(Arc::new(builder.finish()))
}
}
but get errors like this
= note: expected type `&[<T as arrow::datatypes::ArrowPrimitiveType>::Native]`
found type `&[<P as parquet::data_type::DataType>::T]`
I guess I'll do macros for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @andygrove,
You could rewrite the above as:
trait ArrowReader<T> where T: ArrowPrimitiveType {
fn read(&mut self, batch_size: usize, is_nullable: bool) -> Result<Arc<PrimitiveArray<T>>>;
}
impl<A,P> ArrowReader<A> for ColumnReaderImpl<P>
where
A: ArrowPrimitiveType,
P: parquet::data_type::DataType,
// the problem's that we didn't have a trait bound that allows for converting between Parquet native and Arrow native
P::T: std::convert::From<A::Native>,
A::Native: std::convert::From<P::T>,
{
fn read(&mut self, batch_size: usize, is_nullable: bool) -> Result<Arc<PrimitiveArray<A>>> {
// create read buffer
let mut read_buffer: Vec<P::T> =
Vec::with_capacity(batch_size);
for _ in 0..batch_size {
// convert from Arrow native to Parquet native
read_buffer.push(A::default_value().into());
}
let (values_read, _) = self.read_batch(
batch_size,
None,
None,
&mut read_buffer,
)?;
let mut builder = PrimitiveBuilder::<A>::new(values_read);
// need to convert the vec of Parquet native types to Arrow native types
// otherwise we could create std::convert::From<Vec<P::T>> to Vec<T::Native>
let converted_buffer: Vec<A::Native> = read_buffer.into_iter().map(|v| v.into()).collect();
builder.append_slice(&converted_buffer[0..values_read])?;
Ok(Arc::new(builder.finish()))
}
}
This compiles, but I don't know where in your code I'd put this so that I can test if it's doing the right thing.
@sunchao @liurenjie1024 might it benefit us to change the DataType::T
below to DataType::Native
like we have with Arrow? I had to change the T
in the above code to A
because P::T
was confusing, where P::Native
would have been easier to read.
// parquet/src/data_type.rs
macro_rules! make_type {
($name:ident, $physical_ty:path, $native_ty:ty, $size:expr) => {
pub struct $name {}
impl DataType for $name {
type T = $native_ty;
fn get_physical_type() -> Type {
$physical_ty
}
fn get_type_size() -> usize {
$size
}
}
};
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@andygrove here's the implementation as a PR in your fork:: andygrove#1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, thanks @nevi-me !
@sunchao I fixed the nits but I need some guidance on checking against logical types and how definition/repetition levels work when you have some time. Maybe you can point me to some code I can learn from? |
@andygrove Yes. To convert group type, you can check here, which actually implements the conversion from Parquet to Arrow schema. For an introduction on definition/repetition levels, you can read this article, which I found pretty helpful. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late reply. Left some comments, and for a simple implementation without nulls and nested types, you can refer to my previous implementation https://github.com/liurenjie1024/zeus/blob/arrow/zeus-server/src/storage/blizard_storage/reader.rs.
|
||
match r.read_batch( | ||
self.batch_size, | ||
None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a simple POC, I think it's better to ignore nulls and nest types. For null elements you need an spaced reader.
} | ||
} | ||
|
||
fn to_arrow(t: &Type) -> Result<Field> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you can use the schema converter in parquet::reader::schema::parquet_to_arrow_schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
rust/parquet/src/reader/schema.rs
Outdated
@@ -177,6 +177,7 @@ impl ParquetTypeConverter { | |||
PhysicalType::BOOLEAN => Ok(DataType::Boolean), | |||
PhysicalType::INT32 => self.to_int32(), | |||
PhysicalType::INT64 => self.to_int64(), | |||
PhysicalType::INT96 => self.to_int64(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
INT96 is for timestamp in nano and when reading to arrow I am converting to timestamp in ms
@sunchao @liurenjie1024 This is ready for another review ... I am checking if the field is nullable, and if so, I pass def levels. If values_read == levels_read then there are no null values and I read as usual. If there are null values I return an I would also like to work with you both on a generic arrow reader. |
fn create_binary_array(b: &Vec<ByteArray>, row_count: usize) -> Result<Arc<BinaryArray>> { | ||
let mut builder = BinaryBuilder::new(b.len()); | ||
for j in 0..row_count { | ||
let slice = b[j].slice(0, b[j].len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need to call the slice
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
None, | ||
&mut read_buffer, | ||
)?; | ||
let mut builder = $BUILDER::new(levels_read); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we extract these common code out of the if
statement:
let mut builder = $BUILDER::new(values_read);
builder.append_slice(&read_buffer[0..values_read])?;
Arc::new(builder.finish())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored this to remove duplication - i was prematurely optimizing
macro_rules! read_column { | ||
($SELF:ident, $R:ident, $INDEX:expr, $BUILDER:ident, $TY:ident, $DEFAULT:expr) => {{ | ||
//TODO: should be able to get num_rows in row group instead of defaulting to batch size | ||
let mut read_buffer: Vec<$TY> = Vec::with_capacity($SELF.batch_size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can replace this with:
let mut read_buffer: Vec<$TY> = vec![$DEFAULT; $SELF.batch_size];
rust/datafusion/tests/sql.rs
Outdated
/// Execute query and return result set as tab delimited string | ||
fn execute(ctx: &mut ExecutionContext, sql: &str) -> String { | ||
let results = ctx.sql(&sql, DEFAULT_BATCH_SIZE).unwrap(); | ||
let plan = ctx.create_logical_plan(&sql).unwrap(); | ||
println!("Plan: {:?}", plan); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove the println?
@sunchao @nevi-me @liurenjie1024 I cleaned the code up to remove duplication, added null support, and addressed other feedback. I have been testing null support manually so far since there are no suitable test files in this repo. I will create a separate PR for creating a suitable file to test with. |
|
||
use crate::datasource::{RecordBatchIterator, ScanResult, Table}; | ||
use crate::execution::error::{ExecutionError, Result}; | ||
use arrow::builder::{BinaryBuilder, Int64Builder}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we could condense some of these imports into fewer lines
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
&mut read_buffer, | ||
)?; | ||
|
||
let mut builder = Int64Builder::new(levels_read); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EDIT: You can ignore the below, doesn't work.
What do you think of using TimestampMillisecondBuilder
for millisecond timestamp? It should be a drop-in replacement to Int64Builder
} | ||
|
||
/// convert a parquet timestamp in nanoseconds to a timestamp with milliseconds | ||
fn convert_int96_timestamp(v: &[u32]) -> i64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As DataType::Timestamp
supports nanosecond precision, might it not be better to keep the resolution at nanosecond level and use a TimestampNanosecond
? @xhochy do we have to worry about int96
conversion semantics from apache/parquet-format#49?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@andygrove I'll submit another PR that addresses this, please see (https://gist.github.com/nevi-me/574038fdec8e9c207f661813789d58fb)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the fix: andygrove#2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, INT96 is that weird. Please ensure that you don't write them by default ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix int96 conversion to read timestamps correctly
@sunchao @liurenjie1024 @nevi-me Ready for re-review. I compared the schema converter with the C++ implementation and made it consistent. |
)?; | ||
|
||
let mut builder = | ||
TimestampNanosecondBuilder::new(levels_read); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry about the back and forth @andygrove, the builder should be TimestampMillisecondBuilder
in this instance as convert_int96_timestamp
still returns milliseconds. Alternatively we could change seconds * MILLIS_PER_SECOND + nanoseconds / 1_000_000
to seconds * MILLIS_PER_SECOND * 1_000_000 + nanoseconds
to keep nano precision (which I prefer).
The original JIRA ticket that deprecated INT96 (https://issues.apache.org/jira/browse/PARQUET-323) based it on the reason that nanosecond precision is 'rarely a real requirement'.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Thanks. Since the schema converter says that INT96 is timestamp in nanoseconds (just like the C++ impl) we should convert to nanoseconds, so I updated as you suggested.
let array = batch | ||
.column(0) | ||
.as_any() | ||
.downcast_ref::<TimestampNanosecondArray>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will downcast to millisecond per the related comment. As an aside, the nice thing with temporal arrays now is that we can do the below (which isn't necessary for this test):
let mut values = vec![];
for i in 0..batch.num_rows() {
values.push(array.datetime(i));
}
assert_eq!("2001-03-31 12:00:00, ...", format!("{:?}", values);
rust/arrow/src/datatypes.rs
Outdated
@@ -751,6 +752,22 @@ impl Schema { | |||
"fields": self.fields.iter().map(|field| field.to_json()).collect::<Vec<Value>>(), | |||
}) | |||
} | |||
|
|||
/// Create a new schema by applying a projection to this schema's fields | |||
pub fn projection(&self, projection: &[usize]) -> Result<Arc<Schema>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this belong in datafusion
as a free function? It doesn't seem like this will be used within the arrow
sub-crate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm 50/50 on this but I moved it to datafusion for now.
} | ||
|
||
impl ParquetTable { | ||
pub fn try_new(filename: &str) -> Result<Self> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Maybe filename
should be of type AsRef<Path>
just like File::open
? (probably does not need to be included in this PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I'd prefer to look at that as a separate PR as you suggested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @andygrove
Thanks @nevi-me. @sunchao @paddyhoran I think this is ready to go now? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @andygrove . Looks much better!
is_nullable: bool, | ||
) -> Result<Arc<PrimitiveArray<A>>> { | ||
// create read buffer | ||
let mut read_buffer: Vec<P::T> = vec![A::default_value().into(); batch_size]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can replace A::default_value().into()
with P::T::default()
- save a into()
call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had problems making this change. Will look at this as separate PR too.
if self.row_group_index < self.reader.num_row_groups() { | ||
let reader = self.reader.get_row_group(self.row_group_index)?; | ||
|
||
self.column_readers = Vec::with_capacity(self.projection.len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we call self.column_readers.clear()
here?
row_group_index: 0, | ||
projection_schema: projected_schema, | ||
projection, | ||
batch_size: 64 * 1024, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extract this as a constant.
is_nullable, | ||
)? | ||
} | ||
ColumnReader::Int64ColumnReader(ref mut r) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we look at the logical type for the int64
? it may need to be converted to timestamp or decimal. Same for int32
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's a good point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. It's fine to do it in a separate PR though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would you feel about doing this as separate PRs after this is merged?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. That's fine for me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool. Once you and/or Paddy have approved this one, I'll merge and start on the date/time support.
|
||
fn next(&mut self) -> Result<Option<RecordBatch>> { | ||
// advance the row group reader if necessary | ||
if self.current_row_group.is_none() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at some point we may need to think about small row groups versus big batch size, so that we may need to read across row group boundaries.
}) | ||
} | ||
|
||
fn load_next_row_group(&mut self) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have a test for this case as well? i.e., loading multiple row groups.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
LGTM - pending on CI. |
I'm sure I'll need some guidance on this one from @sunchao or @liurenjie1024 but I am keen to get parquet support added for primitive types so that I can actually use DataFusion and Arrow in production at some point.