Skip to content

Commit 1b1b3d6

Browse files
Fixing one problem with nested arrays
1 parent ddb40d2 commit 1b1b3d6

File tree

3 files changed

+19
-13
lines changed

3 files changed

+19
-13
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -206,8 +206,11 @@ private[parquet] object ParquetTypesConverter {
206206
val fields = groupType.getFields.map {
207207
field => new StructField(field.getName, toDataType(field), field.getRepetition != Repetition.REQUIRED)
208208
}
209-
if (fields.size == 1) new ArrayType(fields.apply(0).dataType)
210-
new ArrayType(StructType(fields))
209+
if (fields.size == 1) {
210+
new ArrayType(fields.apply(0).dataType)
211+
} else {
212+
new ArrayType(StructType(fields))
213+
}
211214
}
212215
case _ => { // everything else nested becomes a Struct, unless it has a single repeated field
213216
// in which case it becomes an array (this should correspond to the inverse operation of
@@ -260,7 +263,7 @@ private[parquet] object ParquetTypesConverter {
260263
elementType match {
261264
case StructType(fields) => { // first case: array of structs
262265
val parquetFieldTypes = fields.map(f => fromDataType(f.dataType, f.name, f.nullable, false))
263-
new ParquetGroupType(Repetition.REPEATED, name, ParquetOriginalType.LIST, parquetFieldTypes)
266+
new ParquetGroupType(repetition, name, ParquetOriginalType.LIST, parquetFieldTypes)
264267
//ConversionPatterns.listType(Repetition.REPEATED, name, parquetFieldTypes)
265268
}
266269
case _ => { // second case: array of primitive types

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ private[parquet] object CatalystConverter {
157157
case ArrayType(elementType: DataType) => {
158158
elementType match {
159159
case StructType(fields) =>
160-
if (fields.size > 1) new CatalystGroupConverter(fields, fieldIndex, parent) //CatalystStructArrayConverter(fields, fieldIndex, parent)
160+
if (fields.size > 1) new CatalystGroupConverter(fields, fieldIndex, parent)
161161
else new CatalystArrayConverter(fields(0).dataType, fieldIndex, parent)
162162
case _ => new CatalystArrayConverter(elementType, fieldIndex, parent)
163163
}
@@ -244,8 +244,11 @@ class CatalystGroupConverter(
244244
override val size = schema.size
245245

246246
// Should be only called in root group converter!
247-
def getCurrentRecord: Row = new GenericRow {
248-
override val values: Array[Any] = current.toArray
247+
def getCurrentRecord: Row = {
248+
assert(isRootConverter, "getCurrentRecord should only be called in root group converter!")
249+
new GenericRow {
250+
override val values: Array[Any] = current.toArray
251+
}
249252
}
250253

251254
override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex)

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,10 @@ private[sql] object ParquetTestData {
143143
|required double value;
144144
|optional boolean truth;
145145
|}
146-
|required group outerouter {
147-
|required group outer {
148-
|required group inner {
149-
|required int32 number;
146+
|optional group outerouter {
147+
|repeated group values {
148+
|repeated group values {
149+
|repeated int32 values;
150150
|}
151151
|}
152152
|}
@@ -263,9 +263,9 @@ private[sql] object ParquetTestData {
263263
val booleanNumberPairs = r1.addGroup(3)
264264
booleanNumberPairs.add("value", 2.5)
265265
booleanNumberPairs.add("truth", false)
266-
r1.addGroup(4).addGroup(0).addGroup(0).add("number", 7)
267-
r1.addGroup(4).addGroup(0).addGroup(0).add("number", 8)
268-
r1.addGroup(4).addGroup(0).addGroup(0).add("number", 9)
266+
r1.addGroup(4).addGroup(0).addGroup(0).add("values", 7)
267+
r1.addGroup(4).addGroup(0).addGroup(0).add("values", 8)
268+
r1.addGroup(4).addGroup(0).addGroup(0).add("values", 9)
269269

270270
val writeSupport = new TestGroupWriteSupport(schema)
271271
val writer = new ParquetWriter[Group](path, writeSupport)

0 commit comments

Comments
 (0)