@@ -21,21 +21,25 @@ use crate::arrow::record_reader::RecordReader;
2121use crate :: arrow:: schema:: parquet_to_arrow_field;
2222use crate :: basic:: Type as PhysicalType ;
2323use crate :: column:: page:: PageIterator ;
24- use crate :: data_type:: DataType ;
24+ use crate :: data_type:: { DataType , Int96 } ;
2525use crate :: errors:: { ParquetError , Result } ;
2626use crate :: schema:: types:: ColumnDescPtr ;
27- use arrow:: array:: { Array , ArrayDataBuilder , ArrayRef , BooleanArray , BooleanBufferBuilder , Decimal128Array , Float32Array , Float64Array , Int32Array , Int64Array } ;
27+ use arrow:: array:: {
28+ Array , ArrayDataBuilder , ArrayRef , BooleanArray , BooleanBufferBuilder ,
29+ Decimal128Array , Float32Array , Float64Array , Int32Array , Int64Array ,
30+ TimestampNanosecondArray , TimestampNanosecondBufferBuilder ,
31+ } ;
2832use arrow:: buffer:: Buffer ;
29- use arrow:: datatypes:: DataType as ArrowType ;
33+ use arrow:: datatypes:: { DataType as ArrowType , TimeUnit } ;
3034use std:: any:: Any ;
3135use std:: sync:: Arc ;
3236
3337/// Primitive array readers are leaves of array reader tree. They accept page iterator
3438/// and read them into primitive arrays.
3539pub struct PrimitiveArrayReader < T >
36- where
37- T : DataType ,
38- T :: T : ScalarValue ,
40+ where
41+ T : DataType ,
42+ T :: T : ScalarValue ,
3943{
4044 data_type : ArrowType ,
4145 pages : Box < dyn PageIterator > ,
@@ -45,9 +49,9 @@ pub struct PrimitiveArrayReader<T>
4549}
4650
4751impl < T > PrimitiveArrayReader < T >
48- where
49- T : DataType ,
50- T :: T : ScalarValue ,
52+ where
53+ T : DataType ,
54+ T :: T : ScalarValue ,
5155{
5256 /// Construct primitive array reader.
5357 pub fn new (
@@ -77,9 +81,9 @@ impl<T> PrimitiveArrayReader<T>
7781
7882/// Implementation of primitive array reader.
7983impl < T > ArrayReader for PrimitiveArrayReader < T >
80- where
81- T : DataType ,
82- T :: T : ScalarValue ,
84+ where
85+ T : DataType ,
86+ T :: T : ScalarValue ,
8387{
8488 fn as_any ( & self ) -> & dyn Any {
8589 self
@@ -95,7 +99,7 @@ impl<T> ArrayReader for PrimitiveArrayReader<T>
9599 }
96100
97101 fn consume_batch ( & mut self ) -> Result < ArrayRef > {
98- let target_type = self . get_data_type ( ) . clone ( ) ;
102+ let target_type = & self . data_type ;
99103 let arrow_data_type = match T :: get_physical_type ( ) {
100104 PhysicalType :: BOOLEAN => ArrowType :: Boolean ,
101105 PhysicalType :: INT32 => {
@@ -120,9 +124,11 @@ impl<T> ArrayReader for PrimitiveArrayReader<T>
120124 }
121125 PhysicalType :: FLOAT => ArrowType :: Float32 ,
122126 PhysicalType :: DOUBLE => ArrowType :: Float64 ,
123- PhysicalType :: INT96
124- | PhysicalType :: BYTE_ARRAY
125- | PhysicalType :: FIXED_LEN_BYTE_ARRAY => {
127+ PhysicalType :: INT96 => match target_type {
128+ ArrowType :: Timestamp ( TimeUnit :: Nanosecond , _) => target_type. clone ( ) ,
129+ _ => unreachable ! ( "INT96 must be timestamp nanosecond" ) ,
130+ } ,
131+ PhysicalType :: BYTE_ARRAY | PhysicalType :: FIXED_LEN_BYTE_ARRAY => {
126132 unreachable ! (
127133 "PrimitiveArrayReaders don't support complex physical types"
128134 ) ;
@@ -132,16 +138,31 @@ impl<T> ArrayReader for PrimitiveArrayReader<T>
132138 // Convert to arrays by using the Parquet physical type.
133139 // The physical types are then cast to Arrow types if necessary
134140
135- let mut record_data = self . record_reader . consume_record_data ( ) ;
141+ let record_data = self . record_reader . consume_record_data ( ) ;
142+ let record_data = match T :: get_physical_type ( ) {
143+ PhysicalType :: BOOLEAN => {
144+ let mut boolean_buffer = BooleanBufferBuilder :: new ( record_data. len ( ) ) ;
136145
137- if T :: get_physical_type ( ) == PhysicalType :: BOOLEAN {
138- let mut boolean_buffer = BooleanBufferBuilder :: new ( record_data. len ( ) ) ;
146+ for e in record_data. as_slice ( ) {
147+ boolean_buffer. append ( * e > 0 ) ;
148+ }
149+ boolean_buffer. finish ( )
150+ }
151+ PhysicalType :: INT96 => {
152+ // SAFETY - record_data is an aligned buffer of Int96
153+ let ( prefix, slice, suffix) =
154+ unsafe { record_data. as_slice ( ) . align_to :: < Int96 > ( ) } ;
155+ assert ! ( prefix. is_empty( ) && suffix. is_empty( ) ) ;
156+
157+ let mut builder = TimestampNanosecondBufferBuilder :: new ( slice. len ( ) ) ;
158+ for v in slice {
159+ builder. append ( v. to_nanos ( ) )
160+ }
139161
140- for e in record_data. as_slice ( ) {
141- boolean_buffer. append ( * e > 0 ) ;
162+ builder. finish ( )
142163 }
143- record_data = boolean_buffer . finish ( ) ;
144- }
164+ _ => record_data ,
165+ } ;
145166
146167 let array_data = ArrayDataBuilder :: new ( arrow_data_type)
147168 . len ( self . record_reader . num_values ( ) )
@@ -155,9 +176,10 @@ impl<T> ArrayReader for PrimitiveArrayReader<T>
155176 PhysicalType :: INT64 => Arc :: new ( Int64Array :: from ( array_data) ) as ArrayRef ,
156177 PhysicalType :: FLOAT => Arc :: new ( Float32Array :: from ( array_data) ) as ArrayRef ,
157178 PhysicalType :: DOUBLE => Arc :: new ( Float64Array :: from ( array_data) ) as ArrayRef ,
158- PhysicalType :: INT96
159- | PhysicalType :: BYTE_ARRAY
160- | PhysicalType :: FIXED_LEN_BYTE_ARRAY => {
179+ PhysicalType :: INT96 => {
180+ Arc :: new ( TimestampNanosecondArray :: from ( array_data) ) as ArrayRef
181+ }
182+ PhysicalType :: BYTE_ARRAY | PhysicalType :: FIXED_LEN_BYTE_ARRAY => {
161183 unreachable ! (
162184 "PrimitiveArrayReaders don't support complex physical types"
163185 ) ;
@@ -178,7 +200,7 @@ impl<T> ArrayReader for PrimitiveArrayReader<T>
178200 ArrowType :: Date64 => {
179201 // this is cheap as it internally reinterprets the data
180202 let a = arrow:: compute:: cast ( & array, & ArrowType :: Date32 ) ?;
181- arrow:: compute:: cast ( & a, & target_type) ?
203+ arrow:: compute:: cast ( & a, target_type) ?
182204 }
183205 ArrowType :: Decimal128 ( p, s) => {
184206 let array = match array. data_type ( ) {
@@ -204,11 +226,11 @@ impl<T> ArrayReader for PrimitiveArrayReader<T>
204226 ) ) ;
205227 }
206228 }
207- . with_precision_and_scale ( p, s) ?;
229+ . with_precision_and_scale ( * p, * s) ?;
208230
209231 Arc :: new ( array) as ArrayRef
210232 }
211- _ => arrow:: compute:: cast ( & array, & target_type) ?,
233+ _ => arrow:: compute:: cast ( & array, target_type) ?,
212234 } ;
213235
214236 // save definition and repetition buffers
@@ -243,11 +265,11 @@ mod tests {
243265 use crate :: util:: test_common:: rand_gen:: make_pages;
244266 use crate :: util:: InMemoryPageIterator ;
245267 use arrow:: array:: { Array , PrimitiveArray } ;
246- use arrow:: datatypes:: { ArrowPrimitiveType } ;
268+ use arrow:: datatypes:: ArrowPrimitiveType ;
247269
270+ use arrow:: datatypes:: DataType :: Decimal128 ;
248271 use rand:: distributions:: uniform:: SampleUniform ;
249272 use std:: collections:: VecDeque ;
250- use arrow:: datatypes:: DataType :: Decimal128 ;
251273
252274 #[ allow( clippy:: too_many_arguments) ]
253275 fn make_column_chunks < T : DataType > (
@@ -313,7 +335,7 @@ mod tests {
313335 column_desc,
314336 None ,
315337 )
316- . unwrap ( ) ;
338+ . unwrap ( ) ;
317339
318340 // expect no values to be read
319341 let array = array_reader. next_batch ( 50 ) . unwrap ( ) ;
@@ -360,7 +382,7 @@ mod tests {
360382 column_desc,
361383 None ,
362384 )
363- . unwrap ( ) ;
385+ . unwrap ( ) ;
364386
365387 // Read first 50 values, which are all from the first column chunk
366388 let array = array_reader. next_batch ( 50 ) . unwrap ( ) ;
@@ -560,7 +582,7 @@ mod tests {
560582 column_desc,
561583 None ,
562584 )
563- . unwrap ( ) ;
585+ . unwrap ( ) ;
564586
565587 let mut accu_len: usize = 0 ;
566588
@@ -602,7 +624,6 @@ mod tests {
602624 }
603625 }
604626
605-
606627 #[ test]
607628 fn test_primitive_array_reader_decimal_types ( ) {
608629 // parquet `INT32` to decimal
@@ -641,18 +662,30 @@ mod tests {
641662 column_desc,
642663 None ,
643664 )
644- . unwrap ( ) ;
665+ . unwrap ( ) ;
645666
646667 // read data from the reader
647668 // the data type is decimal(8,2)
648669 let array = array_reader. next_batch ( 50 ) . unwrap ( ) ;
649670 assert_eq ! ( array. data_type( ) , & Decimal128 ( 8 , 2 ) ) ;
650671 let array = array. as_any ( ) . downcast_ref :: < Decimal128Array > ( ) . unwrap ( ) ;
651- let data_decimal_array = data[ 0 ..50 ] . iter ( ) . copied ( ) . map ( |v| Some ( v as i128 ) ) . collect :: < Decimal128Array > ( ) . with_precision_and_scale ( 8 , 2 ) . unwrap ( ) ;
672+ let data_decimal_array = data[ 0 ..50 ]
673+ . iter ( )
674+ . copied ( )
675+ . map ( |v| Some ( v as i128 ) )
676+ . collect :: < Decimal128Array > ( )
677+ . with_precision_and_scale ( 8 , 2 )
678+ . unwrap ( ) ;
652679 assert_eq ! ( array, & data_decimal_array) ;
653680
654681 // not equal with different data type(precision and scale)
655- let data_decimal_array = data[ 0 ..50 ] . iter ( ) . copied ( ) . map ( |v| Some ( v as i128 ) ) . collect :: < Decimal128Array > ( ) . with_precision_and_scale ( 9 , 0 ) . unwrap ( ) ;
682+ let data_decimal_array = data[ 0 ..50 ]
683+ . iter ( )
684+ . copied ( )
685+ . map ( |v| Some ( v as i128 ) )
686+ . collect :: < Decimal128Array > ( )
687+ . with_precision_and_scale ( 9 , 0 )
688+ . unwrap ( ) ;
656689 assert_ne ! ( array, & data_decimal_array)
657690 }
658691
@@ -692,18 +725,30 @@ mod tests {
692725 column_desc,
693726 None ,
694727 )
695- . unwrap ( ) ;
728+ . unwrap ( ) ;
696729
697730 // read data from the reader
698731 // the data type is decimal(18,4)
699732 let array = array_reader. next_batch ( 50 ) . unwrap ( ) ;
700733 assert_eq ! ( array. data_type( ) , & Decimal128 ( 18 , 4 ) ) ;
701734 let array = array. as_any ( ) . downcast_ref :: < Decimal128Array > ( ) . unwrap ( ) ;
702- let data_decimal_array = data[ 0 ..50 ] . iter ( ) . copied ( ) . map ( |v| Some ( v as i128 ) ) . collect :: < Decimal128Array > ( ) . with_precision_and_scale ( 18 , 4 ) . unwrap ( ) ;
735+ let data_decimal_array = data[ 0 ..50 ]
736+ . iter ( )
737+ . copied ( )
738+ . map ( |v| Some ( v as i128 ) )
739+ . collect :: < Decimal128Array > ( )
740+ . with_precision_and_scale ( 18 , 4 )
741+ . unwrap ( ) ;
703742 assert_eq ! ( array, & data_decimal_array) ;
704743
705744 // not equal with different data type(precision and scale)
706- let data_decimal_array = data[ 0 ..50 ] . iter ( ) . copied ( ) . map ( |v| Some ( v as i128 ) ) . collect :: < Decimal128Array > ( ) . with_precision_and_scale ( 34 , 0 ) . unwrap ( ) ;
745+ let data_decimal_array = data[ 0 ..50 ]
746+ . iter ( )
747+ . copied ( )
748+ . map ( |v| Some ( v as i128 ) )
749+ . collect :: < Decimal128Array > ( )
750+ . with_precision_and_scale ( 34 , 0 )
751+ . unwrap ( ) ;
707752 assert_ne ! ( array, & data_decimal_array)
708753 }
709754 }
0 commit comments