Skip to content

Commit aea9f8a

Browse files
committed
convert to use row iter
1 parent f46e6f7 commit aea9f8a

File tree

1 file changed

+40
-38
lines changed

1 file changed

+40
-38
lines changed

rust/datafusion/src/datasource/parquet.rs

Lines changed: 40 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,13 @@ use arrow::datatypes::{DataType, Field, Schema};
2828
use arrow::record_batch::RecordBatch;
2929

3030
use parquet::basic;
31-
use parquet::column::reader::*;
32-
use parquet::data_type::ByteArray;
3331
use parquet::file::reader::*;
34-
use parquet::schema::types::Type;
3532
use parquet::record::{Row, RowAccessor};
33+
use parquet::schema::types::Type;
3634

3735
use crate::datasource::{RecordBatchIterator, Table};
3836
use crate::execution::error::{ExecutionError, Result};
39-
use arrow::builder::{BinaryBuilder, Float64Builder, Int32Builder};
37+
use arrow::builder::{BinaryBuilder, Float32Builder, Float64Builder, Int32Builder};
4038

4139
pub struct ParquetTable {
4240
filename: String,
@@ -121,27 +119,7 @@ impl ParquetFile {
121119

122120
fn load_next_row_group(&mut self) {
123121
if self.row_group_index < self.reader.num_row_groups() {
124-
//println!("Loading row group {} of {}", self.row_group_index, self.reader.num_row_groups());
125122
let reader = self.reader.get_row_group(self.row_group_index).unwrap();
126-
127-
// self.column_readers = vec![];
128-
//
129-
// match &self.projection {
130-
// None => {
131-
// for i in 0..reader.num_columns() {
132-
// self.column_readers
133-
// .push(reader.get_column_reader(i).unwrap());
134-
// }
135-
// }
136-
// Some(proj) => {
137-
// for i in proj {
138-
// //TODO validate index in bounds
139-
// self.column_readers
140-
// .push(reader.get_column_reader(*i).unwrap());
141-
// }
142-
// }
143-
// }
144-
145123
self.current_row_group = Some(reader);
146124
self.row_group_index += 1;
147125
} else {
@@ -152,10 +130,9 @@ impl ParquetFile {
152130
fn load_batch(&mut self) -> Result<Option<RecordBatch>> {
153131
match &self.current_row_group {
154132
Some(reader) => {
155-
156133
// read batch of rows into memory
157134

158-
// let parquet_projection = self.projection.iter().map(|i| reader.metadata().schema_descr().column(*i)).collect();
135+
// let parquet_projection = self.projection.iter().map(|i| reader.metadata().schema_descr().column(*i)).collect();
159136

160137
let mut row_iter = reader.get_row_iter(None).unwrap(); //TODO projection push down
161138
let mut rows: Vec<Row> = Vec::with_capacity(self.batch_size);
@@ -168,7 +145,8 @@ impl ParquetFile {
168145
println!("Loaded {} rows into memory", rows.len());
169146

170147
// convert to columnar batch
171-
let mut batch: Vec<Arc<Array>> = Vec::with_capacity(self.projection.len());
148+
let mut batch: Vec<Arc<Array>> =
149+
Vec::with_capacity(self.projection.len());
172150
for i in &self.projection {
173151
let array: Arc<Array> = match self.schema.field(*i).data_type() {
174152
DataType::Int32 => {
@@ -179,17 +157,44 @@ impl ParquetFile {
179157
}
180158
Arc::new(builder.finish())
181159
}
160+
DataType::Float32 => {
161+
let mut builder = Float32Builder::new(rows.len());
162+
for row in &rows {
163+
//TODO null handling
164+
builder.append_value(row.get_float(*i).unwrap()).unwrap();
165+
}
166+
Arc::new(builder.finish())
167+
}
168+
DataType::Float64 => {
169+
let mut builder = Float64Builder::new(rows.len());
170+
for row in &rows {
171+
//TODO null handling
172+
builder
173+
.append_value(row.get_double(*i).unwrap())
174+
.unwrap();
175+
}
176+
Arc::new(builder.finish())
177+
}
182178
DataType::Utf8 => {
183179
let mut builder = BinaryBuilder::new(rows.len());
184180
for row in &rows {
185181
//TODO null handling
186182
let bytes = row.get_bytes(*i).unwrap();
187-
builder.append_string(&String::from_utf8(bytes.data().to_vec()).unwrap()).unwrap();
183+
builder
184+
.append_string(
185+
&String::from_utf8(bytes.data().to_vec())
186+
.unwrap(),
187+
)
188+
.unwrap();
188189
}
189190
Arc::new(builder.finish())
190191
}
191-
other => return Err(ExecutionError::NotImplemented(
192-
format!("unsupported column reader type ({:?})", other)))
192+
other => {
193+
return Err(ExecutionError::NotImplemented(format!(
194+
"unsupported column reader type ({:?})",
195+
other
196+
)));
197+
}
193198
};
194199
batch.push(array);
195200
}
@@ -200,9 +205,9 @@ impl ParquetFile {
200205
Ok(None)
201206
} else {
202207
Ok(Some(RecordBatch::try_new(
203-
self.schema.projection(&self.projection)?,
204-
batch,
205-
)?))
208+
self.schema.projection(&self.projection)?,
209+
batch,
210+
)?))
206211
}
207212
}
208213
_ => Ok(None),
@@ -299,14 +304,11 @@ mod tests {
299304
.downcast_ref::<Int32Array>()
300305
.unwrap();
301306
let mut values: Vec<i32> = vec![];
302-
for i in 0..16 {
307+
for i in 0..batch.num_rows() {
303308
values.push(array.value(i));
304309
}
305310

306-
assert_eq!(
307-
"[4, 5, 6, 7, 2, 3, 0, 1, 0, 0, 9, 0, 1, 0, 0, 0]",
308-
format!("{:?}", values)
309-
);
311+
assert_eq!("[4, 5, 6, 7, 2, 3, 0, 1]", format!("{:?}", values));
310312
}
311313

312314
#[test]

0 commit comments

Comments
 (0)