Skip to content

Commit edd96c0

Browse files
tustvoldalamb
andauthored
Add support for nested list arrays from parquet to arrow arrays (#993) (#1588)
* Add support for nested list arrays (#993) * More tests * Minor cleanup * Filter nulls * Update comments * Fix doc * Fix clippy * Apply suggestions from code review Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * More tests * Add sanity check to ListArrayReader * Fix test_struct_array_reader Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent aaa1023 commit edd96c0

File tree

5 files changed

+578
-316
lines changed

5 files changed

+578
-316
lines changed

parquet/src/arrow/array_reader.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -717,13 +717,7 @@ impl ArrayReader for StructArrayReader {
717717
.children
718718
.iter_mut()
719719
.map(|reader| reader.next_batch(batch_size))
720-
.try_fold(
721-
Vec::new(),
722-
|mut result, child_array| -> Result<Vec<ArrayRef>> {
723-
result.push(child_array?);
724-
Ok(result)
725-
},
726-
)?;
720+
.collect::<Result<Vec<_>>>()?;
727721

728722
// check that array child data has same size
729723
let children_array_len =
@@ -1538,15 +1532,15 @@ mod tests {
15381532
ArrowType::Int32,
15391533
array_1.clone(),
15401534
Some(vec![0, 1, 2, 3, 1]),
1541-
Some(vec![1, 1, 1, 1, 1]),
1535+
Some(vec![0, 1, 1, 1, 1]),
15421536
);
15431537

15441538
let array_2 = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![5, 4, 3, 2, 1]));
15451539
let array_reader_2 = InMemoryArrayReader::new(
15461540
ArrowType::Int32,
15471541
array_2.clone(),
15481542
Some(vec![0, 1, 3, 1, 2]),
1549-
Some(vec![1, 1, 1, 1, 1]),
1543+
Some(vec![0, 1, 1, 1, 1]),
15501544
);
15511545

15521546
let struct_type = ArrowType::Struct(vec![
@@ -1576,7 +1570,7 @@ mod tests {
15761570
struct_array_reader.get_def_levels()
15771571
);
15781572
assert_eq!(
1579-
Some(vec![1, 1, 1, 1, 1].as_slice()),
1573+
Some(vec![0, 1, 1, 1, 1].as_slice()),
15801574
struct_array_reader.get_rep_levels()
15811575
);
15821576
}

parquet/src/arrow/array_reader/builder.rs

Lines changed: 132 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use crate::data_type::{
3838
Int96Type,
3939
};
4040
use crate::errors::ParquetError::ArrowError;
41-
use crate::errors::{Result};
41+
use crate::errors::{ParquetError, Result};
4242
use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr};
4343
use crate::schema::visitor::TypeVisitor;
4444

@@ -129,9 +129,10 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
129129

130130
let null_mask_only = match cur_type.get_basic_info().repetition() {
131131
Repetition::REPEATED => {
132-
new_context.def_level += 1;
133-
new_context.rep_level += 1;
134-
false
132+
return Err(ArrowError(format!(
133+
"Reading repeated primitive ({:?}) is not supported yet!",
134+
cur_type.name()
135+
)));
135136
}
136137
Repetition::OPTIONAL => {
137138
new_context.def_level += 1;
@@ -143,19 +144,12 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
143144
};
144145

145146
let reader = self.build_for_primitive_type_inner(
146-
cur_type.clone(),
147+
cur_type,
147148
&new_context,
148149
null_mask_only,
149150
)?;
150151

151-
if cur_type.get_basic_info().repetition() == Repetition::REPEATED {
152-
Err(ArrowError(format!(
153-
"Reading repeated field ({:?}) is not supported yet!",
154-
cur_type.name()
155-
)))
156-
} else {
157-
Ok(Some(reader))
158-
}
152+
Ok(Some(reader))
159153
} else {
160154
Ok(None)
161155
}
@@ -173,30 +167,19 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
173167
if cur_type.get_basic_info().has_repetition() {
174168
match cur_type.get_basic_info().repetition() {
175169
Repetition::REPEATED => {
176-
new_context.def_level += 1;
177-
new_context.rep_level += 1;
170+
return Err(ArrowError(format!(
171+
"Reading repeated struct ({:?}) is not supported yet!",
172+
cur_type.name(),
173+
)))
178174
}
179175
Repetition::OPTIONAL => {
180176
new_context.def_level += 1;
181177
}
182-
_ => (),
178+
Repetition::REQUIRED => {}
183179
}
184180
}
185181

186-
if let Some(reader) = self.build_for_struct_type_inner(&cur_type, &new_context)? {
187-
if cur_type.get_basic_info().has_repetition()
188-
&& cur_type.get_basic_info().repetition() == Repetition::REPEATED
189-
{
190-
Err(ArrowError(format!(
191-
"Reading repeated field ({:?}) is not supported yet!",
192-
cur_type.name(),
193-
)))
194-
} else {
195-
Ok(Some(reader))
196-
}
197-
} else {
198-
Ok(None)
199-
}
182+
self.build_for_struct_type_inner(&cur_type, &new_context)
200183
}
201184

202185
/// Build array reader for map type.
@@ -208,42 +191,61 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
208191
// Add map type to context
209192
let mut new_context = context.clone();
210193
new_context.path.append(vec![map_type.name().to_string()]);
211-
if let Repetition::OPTIONAL = map_type.get_basic_info().repetition() {
212-
new_context.def_level += 1;
194+
195+
match map_type.get_basic_info().repetition() {
196+
Repetition::REQUIRED => {}
197+
Repetition::OPTIONAL => {
198+
new_context.def_level += 1;
199+
}
200+
Repetition::REPEATED => {
201+
return Err(ArrowError("Map cannot be repeated".to_string()))
202+
}
203+
}
204+
205+
if map_type.get_fields().len() != 1 {
206+
return Err(ArrowError(format!(
207+
"Map field must have exactly one key_value child, found {}",
208+
map_type.get_fields().len()
209+
)));
213210
}
214211

215212
// Add map entry (key_value) to context
216-
let map_key_value = map_type.get_fields().first().ok_or_else(|| {
217-
ArrowError("Map field must have a key_value entry".to_string())
218-
})?;
213+
let map_key_value = &map_type.get_fields()[0];
214+
if map_key_value.get_basic_info().repetition() != Repetition::REPEATED {
215+
return Err(ArrowError(
216+
"Child of map field must be repeated".to_string(),
217+
));
218+
}
219+
219220
new_context
220221
.path
221222
.append(vec![map_key_value.name().to_string()]);
223+
222224
new_context.rep_level += 1;
225+
new_context.def_level += 1;
226+
227+
if map_key_value.get_fields().len() != 2 {
228+
// According to the specification the values are optional (#1642)
229+
return Err(ArrowError(format!(
230+
"Child of map field must have two children, found {}",
231+
map_key_value.get_fields().len()
232+
)));
233+
}
223234

224235
// Get key and value, and create context for each
225-
let map_key = map_key_value
226-
.get_fields()
227-
.first()
228-
.ok_or_else(|| ArrowError("Map entry must have a key".to_string()))?;
229-
let map_value = map_key_value
230-
.get_fields()
231-
.get(1)
232-
.ok_or_else(|| ArrowError("Map entry must have a value".to_string()))?;
233-
234-
let key_reader = {
235-
let mut key_context = new_context.clone();
236-
key_context.def_level += 1;
237-
key_context.path.append(vec![map_key.name().to_string()]);
238-
self.dispatch(map_key.clone(), &key_context)?.unwrap()
239-
};
240-
let value_reader = {
241-
let mut value_context = new_context.clone();
242-
if let Repetition::OPTIONAL = map_value.get_basic_info().repetition() {
243-
value_context.def_level += 1;
244-
}
245-
self.dispatch(map_value.clone(), &value_context)?.unwrap()
246-
};
236+
let map_key = &map_key_value.get_fields()[0];
237+
let map_value = &map_key_value.get_fields()[1];
238+
239+
if map_key.get_basic_info().repetition() != Repetition::REQUIRED {
240+
return Err(ArrowError("Map keys must be required".to_string()));
241+
}
242+
243+
if map_value.get_basic_info().repetition() == Repetition::REPEATED {
244+
return Err(ArrowError("Map values cannot be repeated".to_string()));
245+
}
246+
247+
let key_reader = self.dispatch(map_key.clone(), &new_context)?.unwrap();
248+
let value_reader = self.dispatch(map_value.clone(), &new_context)?.unwrap();
247249

248250
let arrow_type = self
249251
.arrow_schema
@@ -290,101 +292,89 @@ impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext
290292
item_type: Arc<Type>,
291293
context: &'a ArrayReaderBuilderContext,
292294
) -> Result<Option<Box<dyn ArrayReader>>> {
293-
let mut list_child = &list_type
294-
.get_fields()
295-
.first()
296-
.ok_or_else(|| ArrowError("List field must have a child.".to_string()))?
297-
.clone();
298295
let mut new_context = context.clone();
299-
300296
new_context.path.append(vec![list_type.name().to_string()]);
301-
// We need to know at what definition a list or its child is null
302-
let list_null_def = new_context.def_level;
303-
let mut list_empty_def = new_context.def_level;
304-
305-
// If the list's root is nullable
306-
if let Repetition::OPTIONAL = list_type.get_basic_info().repetition() {
307-
new_context.def_level += 1;
308-
// current level is nullable, increment to get level for empty list slot
309-
list_empty_def += 1;
310-
}
311297

312-
match list_child.get_basic_info().repetition() {
313-
Repetition::REPEATED => {
314-
new_context.def_level += 1;
315-
new_context.rep_level += 1;
316-
}
298+
// If the list is nullable
299+
let nullable = match list_type.get_basic_info().repetition() {
300+
Repetition::REQUIRED => false,
317301
Repetition::OPTIONAL => {
318302
new_context.def_level += 1;
303+
true
304+
}
305+
Repetition::REPEATED => {
306+
return Err(general_err!("List type cannot be repeated"))
319307
}
320-
_ => (),
308+
};
309+
310+
if list_type.get_fields().len() != 1 {
311+
return Err(ArrowError(format!(
312+
"List field must have exactly one child, found {}",
313+
list_type.get_fields().len()
314+
)));
321315
}
316+
let mut list_child = &list_type.get_fields()[0];
322317

323-
let reader = self.dispatch(item_type.clone(), &new_context);
324-
if let Ok(Some(item_reader)) = reader {
325-
let item_reader_type = item_reader.get_data_type().clone();
326-
327-
match item_reader_type {
328-
ArrowType::List(_)
329-
| ArrowType::FixedSizeList(_, _)
330-
| ArrowType::Dictionary(_, _) => Err(ArrowError(format!(
331-
"reading List({:?}) into arrow not supported yet",
332-
item_type
333-
))),
334-
_ => {
335-
// a list is a group type with a single child. The list child's
336-
// name comes from the child's field name.
337-
// if the child's name is "list" and it has a child, then use this child
338-
if list_child.name() == "list" && !list_child.get_fields().is_empty()
339-
{
340-
list_child = list_child.get_fields().first().unwrap();
341-
}
342-
let arrow_type = self
343-
.arrow_schema
344-
.field_with_name(list_type.name())
345-
.ok()
346-
.map(|f| f.data_type().to_owned())
347-
.unwrap_or_else(|| {
348-
ArrowType::List(Box::new(Field::new(
349-
list_child.name(),
350-
item_reader_type.clone(),
351-
list_child.is_optional(),
352-
)))
353-
});
354-
355-
let list_array_reader: Box<dyn ArrayReader> = match arrow_type {
356-
ArrowType::List(_) => Box::new(ListArrayReader::<i32>::new(
357-
item_reader,
358-
arrow_type,
359-
item_reader_type,
360-
new_context.def_level,
361-
new_context.rep_level,
362-
list_null_def,
363-
list_empty_def,
364-
)),
365-
ArrowType::LargeList(_) => Box::new(ListArrayReader::<i64>::new(
366-
item_reader,
367-
arrow_type,
368-
item_reader_type,
369-
new_context.def_level,
370-
new_context.rep_level,
371-
list_null_def,
372-
list_empty_def,
373-
)),
374-
375-
_ => {
376-
return Err(ArrowError(format!(
377-
"creating ListArrayReader with type {:?} should be unreachable",
378-
arrow_type
379-
)))
380-
}
381-
};
318+
if list_child.get_basic_info().repetition() != Repetition::REPEATED {
319+
return Err(ArrowError("List child must be repeated".to_string()));
320+
}
382321

383-
Ok(Some(list_array_reader))
322+
// The repeated field
323+
new_context.rep_level += 1;
324+
new_context.def_level += 1;
325+
326+
match self.dispatch(item_type, &new_context) {
327+
Ok(Some(item_reader)) => {
328+
let item_type = item_reader.get_data_type().clone();
329+
330+
// a list is a group type with a single child. The list child's
331+
// name comes from the child's field name.
332+
// if the child's name is "list" and it has a child, then use this child
333+
if list_child.name() == "list" && !list_child.get_fields().is_empty() {
334+
list_child = list_child.get_fields().first().unwrap();
384335
}
336+
337+
let arrow_type = self
338+
.arrow_schema
339+
.field_with_name(list_type.name())
340+
.ok()
341+
.map(|f| f.data_type().to_owned())
342+
.unwrap_or_else(|| {
343+
ArrowType::List(Box::new(Field::new(
344+
list_child.name(),
345+
item_type.clone(),
346+
list_child.is_optional(),
347+
)))
348+
});
349+
350+
let list_array_reader: Box<dyn ArrayReader> = match arrow_type {
351+
ArrowType::List(_) => Box::new(ListArrayReader::<i32>::new(
352+
item_reader,
353+
arrow_type,
354+
item_type,
355+
new_context.def_level,
356+
new_context.rep_level,
357+
nullable,
358+
)),
359+
ArrowType::LargeList(_) => Box::new(ListArrayReader::<i64>::new(
360+
item_reader,
361+
arrow_type,
362+
item_type,
363+
new_context.def_level,
364+
new_context.rep_level,
365+
nullable,
366+
)),
367+
_ => {
368+
return Err(ArrowError(format!(
369+
"creating ListArrayReader with type {:?} should be unreachable",
370+
arrow_type
371+
)))
372+
}
373+
};
374+
375+
Ok(Some(list_array_reader))
385376
}
386-
} else {
387-
reader
377+
result => result,
388378
}
389379
}
390380
}
@@ -637,10 +627,10 @@ impl<'a> ArrayReaderBuilder {
637627
let mut children_reader = Vec::with_capacity(cur_type.get_fields().len());
638628

639629
for child in cur_type.get_fields() {
640-
let mut struct_context = context.clone();
641630
if let Some(child_reader) = self.dispatch(child.clone(), context)? {
642631
// TODO: this results in calling get_arrow_field twice, it could be reused
643632
// from child_reader above, by making child_reader carry its `Field`
633+
let mut struct_context = context.clone();
644634
struct_context.path.append(vec![child.name().to_string()]);
645635
let field = match self.get_arrow_field(child, &struct_context) {
646636
Some(f) => f.clone(),

0 commit comments

Comments
 (0)