Skip to content

Commit 10710a2

Browse files
committed
Parquet datasource
1 parent 92433c8 commit 10710a2

File tree

2 files changed

+311
-0
lines changed

2 files changed

+311
-0
lines changed

rust/datafusion/src/datasource/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
pub mod csv;
1919
pub mod datasource;
2020
pub mod memory;
21+
pub mod parquet;
2122

2223
pub use self::csv::{CsvBatchIterator, CsvFile};
2324
pub use self::datasource::{RecordBatchIterator, ScanResult, Table};
Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Parquet Data source
19+
20+
use std::cell::RefCell;
21+
use std::fs::File;
22+
use std::rc::Rc;
23+
use std::string::String;
24+
use std::sync::Arc;
25+
26+
use arrow::array::Array;
27+
use arrow::datatypes::{DataType, Field, Schema};
28+
use arrow::record_batch::RecordBatch;
29+
30+
use parquet::basic;
31+
use parquet::column::reader::*;
32+
use parquet::data_type::ByteArray;
33+
use parquet::file::reader::*;
34+
use parquet::schema::types::Type;
35+
36+
use crate::datasource::{RecordBatchIterator, Table};
37+
use crate::execution::error::{ExecutionError, Result};
38+
use arrow::builder::{BinaryBuilder, Float64Builder, Int32Builder};
39+
40+
pub struct ParquetTable {
41+
filename: String,
42+
schema: Arc<Schema>,
43+
}
44+
45+
impl ParquetTable {
46+
pub fn new(filename: &str) -> Self {
47+
let file = File::open(filename).unwrap();
48+
let parquet_file = ParquetFile::open(file, None).unwrap();
49+
let schema = parquet_file.schema.clone();
50+
Self {
51+
filename: filename.to_string(),
52+
schema,
53+
}
54+
}
55+
}
56+
57+
impl Table for ParquetTable {
58+
fn schema(&self) -> &Arc<Schema> {
59+
&self.schema
60+
}
61+
62+
fn scan(
63+
&self,
64+
projection: &Option<Vec<usize>>,
65+
_batch_size: usize,
66+
) -> Result<Rc<RefCell<RecordBatchIterator>>> {
67+
let file = File::open(self.filename.clone()).unwrap();
68+
let parquet_file = ParquetFile::open(file, projection.clone()).unwrap();
69+
Ok(Rc::new(RefCell::new(parquet_file)))
70+
}
71+
}
72+
73+
pub struct ParquetFile {
74+
reader: SerializedFileReader<File>,
75+
row_group_index: usize,
76+
schema: Arc<Schema>,
77+
projection: Option<Vec<usize>>,
78+
batch_size: usize,
79+
current_row_group: Option<Box<RowGroupReader>>,
80+
column_readers: Vec<ColumnReader>,
81+
}
82+
83+
impl ParquetFile {
84+
pub fn open(file: File, projection: Option<Vec<usize>>) -> Result<Self> {
85+
let reader = SerializedFileReader::new(file).unwrap();
86+
87+
let metadata = reader.metadata();
88+
let file_type = to_arrow(metadata.file_metadata().schema())?;
89+
90+
match file_type.data_type() {
91+
DataType::Struct(fields) => {
92+
let schema = Schema::new(fields.clone());
93+
//println!("Parquet schema: {:?}", schema);
94+
Ok(ParquetFile {
95+
reader: reader,
96+
row_group_index: 0,
97+
schema: Arc::new(schema),
98+
projection,
99+
batch_size: 64 * 1024,
100+
current_row_group: None,
101+
column_readers: vec![],
102+
})
103+
}
104+
_ => Err(ExecutionError::General(
105+
"Failed to read Parquet schema".to_string(),
106+
)),
107+
}
108+
}
109+
110+
fn load_next_row_group(&mut self) {
111+
if self.row_group_index < self.reader.num_row_groups() {
112+
//println!("Loading row group {} of {}", self.row_group_index, self.reader.num_row_groups());
113+
let reader = self.reader.get_row_group(self.row_group_index).unwrap();
114+
115+
self.column_readers = vec![];
116+
117+
match &self.projection {
118+
None => {
119+
for i in 0..reader.num_columns() {
120+
self.column_readers
121+
.push(reader.get_column_reader(i).unwrap());
122+
}
123+
}
124+
Some(proj) => {
125+
for i in proj {
126+
//TODO validate index in bounds
127+
self.column_readers
128+
.push(reader.get_column_reader(*i).unwrap());
129+
}
130+
}
131+
}
132+
133+
self.current_row_group = Some(reader);
134+
self.row_group_index += 1;
135+
} else {
136+
panic!()
137+
}
138+
}
139+
140+
fn load_batch(&mut self) -> Result<Option<RecordBatch>> {
141+
match &self.current_row_group {
142+
Some(reader) => {
143+
let mut batch: Vec<Arc<Array>> = Vec::with_capacity(reader.num_columns());
144+
let mut row_count = 0;
145+
for i in 0..self.column_readers.len() {
146+
let array: Arc<Array> = match self.column_readers[i] {
147+
ColumnReader::Int32ColumnReader(ref mut r) => {
148+
let mut builder = Int32Builder::new(self.batch_size);
149+
let mut read_buffer: Vec<i32> =
150+
Vec::with_capacity(self.batch_size);
151+
match r.read_batch(
152+
self.batch_size,
153+
None,
154+
None,
155+
&mut read_buffer,
156+
) {
157+
//TODO this isn't handling null values
158+
Ok((count, _)) => {
159+
builder.append_slice(&read_buffer).unwrap();
160+
row_count = count;
161+
Arc::new(builder.finish())
162+
}
163+
_ => {
164+
return Err(ExecutionError::NotImplemented(format!(
165+
"Error reading parquet batch (column {})",
166+
i
167+
)));
168+
}
169+
}
170+
}
171+
ColumnReader::DoubleColumnReader(ref mut r) => {
172+
let mut builder = Float64Builder::new(self.batch_size);
173+
let mut read_buffer: Vec<f64> =
174+
Vec::with_capacity(self.batch_size);
175+
match r.read_batch(
176+
self.batch_size,
177+
None,
178+
None,
179+
&mut read_buffer,
180+
) {
181+
//TODO this isn't handling null values
182+
Ok((count, _)) => {
183+
builder.append_slice(&read_buffer).unwrap();
184+
row_count = count;
185+
Arc::new(builder.finish())
186+
}
187+
_ => {
188+
return Err(ExecutionError::NotImplemented(format!(
189+
"Error reading parquet batch (column {})",
190+
i
191+
)));
192+
}
193+
}
194+
}
195+
ColumnReader::ByteArrayColumnReader(ref mut r) => {
196+
let mut b: Vec<ByteArray> =
197+
Vec::with_capacity(self.batch_size);
198+
for _ in 0..self.batch_size {
199+
b.push(ByteArray::default());
200+
}
201+
match r.read_batch(self.batch_size, None, None, &mut b) {
202+
//TODO this isn't handling null values
203+
Ok((count, _)) => {
204+
row_count = count;
205+
//TODO this is horribly inefficient
206+
let mut builder = BinaryBuilder::new(row_count);
207+
for j in 0..row_count {
208+
let foo = b[j].slice(0, b[j].len());
209+
let bytes: &[u8] = foo.data();
210+
let str =
211+
String::from_utf8(bytes.to_vec()).unwrap();
212+
builder.append_string(&str).unwrap();
213+
}
214+
Arc::new(builder.finish())
215+
}
216+
_ => {
217+
return Err(ExecutionError::NotImplemented(format!(
218+
"Error reading parquet batch (column {})",
219+
i
220+
)));
221+
}
222+
}
223+
}
224+
_ => {
225+
return Err(ExecutionError::NotImplemented(
226+
"unsupported column reader type".to_string(),
227+
));
228+
}
229+
};
230+
231+
batch.push(array);
232+
}
233+
234+
// println!("Loaded batch of {} rows", row_count);
235+
236+
if row_count == 0 {
237+
Ok(None)
238+
} else {
239+
Ok(Some(RecordBatch::try_new(self.schema.clone(), batch)?))
240+
}
241+
}
242+
_ => Ok(None),
243+
}
244+
}
245+
}
246+
247+
fn to_arrow(t: &Type) -> Result<Field> {
248+
match t {
249+
Type::PrimitiveType {
250+
basic_info,
251+
physical_type,
252+
..
253+
} => {
254+
let arrow_type = match physical_type {
255+
basic::Type::BOOLEAN => DataType::Boolean,
256+
basic::Type::INT32 => DataType::Int32,
257+
basic::Type::INT64 => DataType::Int64,
258+
basic::Type::INT96 => DataType::Int64, //TODO ???
259+
basic::Type::FLOAT => DataType::Float32,
260+
basic::Type::DOUBLE => DataType::Float64,
261+
basic::Type::BYTE_ARRAY => DataType::Utf8, /*match basic_info.logical_type() {
262+
basic::LogicalType::UTF8 => DataType::Utf8,
263+
_ => unimplemented!("No support for Parquet BYTE_ARRAY yet"),
264+
}*/
265+
basic::Type::FIXED_LEN_BYTE_ARRAY => {
266+
unimplemented!("No support for Parquet FIXED_LEN_BYTE_ARRAY yet")
267+
}
268+
};
269+
270+
Ok(Field::new(basic_info.name(), arrow_type, false))
271+
}
272+
Type::GroupType { basic_info, fields } => Ok(Field::new(
273+
basic_info.name(),
274+
DataType::Struct(
275+
fields
276+
.iter()
277+
.map(|f| to_arrow(f))
278+
.collect::<Result<Vec<Field>>>()?,
279+
),
280+
false,
281+
)),
282+
}
283+
}
284+
285+
impl RecordBatchIterator for ParquetFile {
286+
fn schema(&self) -> &Arc<Schema> {
287+
&self.schema
288+
}
289+
290+
fn next(&mut self) -> Result<Option<RecordBatch>> {
291+
// advance the row group reader if necessary
292+
if self.current_row_group.is_none() {
293+
self.load_next_row_group();
294+
self.load_batch()
295+
} else {
296+
match self.load_batch() {
297+
Ok(Some(b)) => Ok(Some(b)),
298+
Ok(None) => {
299+
if self.row_group_index < self.reader.num_row_groups() {
300+
self.load_next_row_group();
301+
self.load_batch()
302+
} else {
303+
Ok(None)
304+
}
305+
}
306+
Err(e) => Err(e),
307+
}
308+
}
309+
}
310+
}

0 commit comments

Comments
 (0)