Skip to content

Commit 9d3047a

Browse files
committed
code cleanup
1 parent 639e13e commit 9d3047a

File tree

1 file changed

+38
-109
lines changed

1 file changed

+38
-109
lines changed

rust/datafusion/src/datasource/parquet.rs

Lines changed: 38 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ use parquet::file::reader::*;
3232

3333
use crate::datasource::{RecordBatchIterator, ScanResult, Table};
3434
use crate::execution::error::{ExecutionError, Result};
35-
use arrow::array::BinaryArray;
3635
use arrow::builder::{BinaryBuilder, Int64Builder};
3736
use parquet::data_type::Int96;
3837
use parquet::reader::schema::parquet_to_arrow_schema;
@@ -82,60 +81,31 @@ pub struct ParquetFile {
8281
column_readers: Vec<ColumnReader>,
8382
}
8483

85-
fn create_binary_array(b: &Vec<ByteArray>, row_count: usize) -> Result<Arc<BinaryArray>> {
86-
let mut builder = BinaryBuilder::new(b.len());
87-
for i in 0..row_count {
88-
builder.append_string(&String::from_utf8(b[i].data().to_vec()).unwrap())?;
89-
}
90-
Ok(Arc::new(builder.finish()))
91-
}
92-
9384
macro_rules! read_binary_column {
9485
($SELF:ident, $R:ident, $INDEX:expr) => {{
95-
//TODO: should be able to get num_rows in row group instead of defaulting to batch size
9686
let mut read_buffer: Vec<ByteArray> =
97-
Vec::with_capacity($SELF.batch_size);
98-
for _ in 0..$SELF.batch_size {
99-
read_buffer.push(ByteArray::default());
100-
}
101-
if $SELF.projection_schema.field($INDEX).is_nullable() {
102-
103-
let mut def_levels: Vec<i16> = Vec::with_capacity($SELF.batch_size);
104-
for _ in 0..$SELF.batch_size {
105-
def_levels.push(0);
106-
}
107-
108-
let (values_read, levels_read) = $R.read_batch(
109-
$SELF.batch_size,
110-
Some(&mut def_levels),
111-
None,
112-
&mut read_buffer,
113-
)?;
114-
if values_read == levels_read {
115-
create_binary_array(&read_buffer, values_read)?
87+
vec![ByteArray::default(); $SELF.batch_size];
88+
let mut def_levels: Vec<i16> = vec![0; $SELF.batch_size];
89+
let (_, levels_read) = $R.read_batch(
90+
$SELF.batch_size,
91+
Some(&mut def_levels),
92+
None,
93+
&mut read_buffer,
94+
)?;
95+
let mut builder = BinaryBuilder::new(levels_read);
96+
let mut value_index = 0;
97+
for i in 0..levels_read {
98+
if def_levels[i] > 0 {
99+
builder.append_string(
100+
&String::from_utf8(read_buffer[value_index].data().to_vec()).unwrap(),
101+
)?;
102+
value_index += 1;
116103
} else {
117-
let mut builder = BinaryBuilder::new(levels_read);
118-
let mut value_index = 0;
119-
for i in 0..levels_read {
120-
if def_levels[i] > 0 {
121-
builder.append_string(&String::from_utf8(read_buffer[value_index].data().to_vec()).unwrap())?;
122-
value_index += 1;
123-
} else {
124-
builder.append_null()?;
125-
}
126-
}
127-
Arc::new(builder.finish())
104+
builder.append_null()?;
128105
}
129-
} else {
130-
let (values_read, _) = $R.read_batch(
131-
$SELF.batch_size,
132-
None,
133-
None,
134-
&mut read_buffer,
135-
)?;
136-
create_binary_array(&read_buffer, values_read)?
137106
}
138-
}}
107+
Arc::new(builder.finish())
108+
}};
139109
}
140110

141111
trait ArrowReader<T>
@@ -165,10 +135,7 @@ where
165135
let mut read_buffer: Vec<P::T> = vec![A::default_value().into(); batch_size];
166136

167137
if is_nullable {
168-
let mut def_levels: Vec<i16> = Vec::with_capacity(batch_size);
169-
for _ in 0..batch_size {
170-
def_levels.push(0);
171-
}
138+
let mut def_levels: Vec<i16> = vec![0; batch_size];
172139

173140
let (values_read, levels_read) = self.read_batch(
174141
batch_size,
@@ -308,67 +275,29 @@ impl ParquetFile {
308275
}
309276
ColumnReader::Int96ColumnReader(ref mut r) => {
310277
let mut read_buffer: Vec<Int96> =
311-
Vec::with_capacity(self.batch_size);
278+
vec![Int96::new(); self.batch_size];
312279

313-
for _ in 0..self.batch_size {
314-
read_buffer.push(Int96::new());
315-
}
316-
317-
if self.projection_schema.field(i).is_nullable() {
318-
let mut def_levels: Vec<i16> =
319-
Vec::with_capacity(self.batch_size);
320-
for _ in 0..self.batch_size {
321-
def_levels.push(0);
322-
}
323-
let (values_read, levels_read) = r.read_batch(
324-
self.batch_size,
325-
Some(&mut def_levels),
326-
None,
327-
&mut read_buffer,
328-
)?;
329-
330-
if values_read == levels_read {
331-
let mut builder = Int64Builder::new(values_read);
332-
for i in 0..values_read {
333-
builder.append_value(convert_int96_timestamp(
334-
read_buffer[i].data(),
335-
))?;
336-
}
337-
Arc::new(builder.finish())
338-
} else {
339-
let mut builder = Int64Builder::new(levels_read);
340-
let mut value_index = 0;
341-
for i in 0..levels_read {
342-
if def_levels[i] > 0 {
343-
builder.append_value(
344-
convert_int96_timestamp(
345-
read_buffer[value_index].data(),
346-
),
347-
)?;
348-
value_index += 1;
349-
} else {
350-
builder.append_null()?;
351-
}
352-
}
353-
Arc::new(builder.finish())
354-
}
355-
} else {
356-
let (values_read, _) = r.read_batch(
357-
self.batch_size,
358-
None,
359-
None,
360-
&mut read_buffer,
361-
)?;
362-
363-
let mut builder = Int64Builder::new(values_read);
364-
365-
for i in 0..values_read {
280+
let mut def_levels: Vec<i16> = vec![0; self.batch_size];
281+
let (_, levels_read) = r.read_batch(
282+
self.batch_size,
283+
Some(&mut def_levels),
284+
None,
285+
&mut read_buffer,
286+
)?;
287+
288+
let mut builder = Int64Builder::new(levels_read);
289+
let mut value_index = 0;
290+
for i in 0..levels_read {
291+
if def_levels[i] > 0 {
366292
builder.append_value(convert_int96_timestamp(
367-
read_buffer[i].data(),
293+
read_buffer[value_index].data(),
368294
))?;
295+
value_index += 1;
296+
} else {
297+
builder.append_null()?;
369298
}
370-
Arc::new(builder.finish())
371299
}
300+
Arc::new(builder.finish())
372301
}
373302
ColumnReader::FloatColumnReader(ref mut r) => {
374303
ArrowReader::<Float32Type>::read(

0 commit comments

Comments
 (0)