Skip to content

Commit d1911dc

Browse files
Simplifying ArrayType conversion
1 parent f777b4b commit d1911dc

File tree

4 files changed

+92
-117
lines changed

4 files changed

+92
-117
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
8787
.findAllIn(field)
8888
.matchData
8989
.map(_.group(2))
90-
// TODO: we should recover the JVM type of valueType to match the
90+
// TODO: we should recover the JVM type of keyType to match the
9191
// actual type of the key?! should we restrict ourselves to NativeType?
9292
(ordinals.foldLeft(exp)((v1: Expression, v2: String) =>
9393
GetItem(v1, Literal(v2, keyType))), valueType)

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,11 @@ private[parquet] object CatalystConverter {
3232
// The type internally used for fields
3333
type FieldType = StructField
3434

35-
// Note: repeated primitive fields that form an array (together with
36-
// their surrounding group) need to have this name in the schema
37-
// TODO: "values" is a generic name but without it the Parquet column path would
38-
// be incomplete and values may be silently dropped; better would be to give
39-
// primitive-type array elements a name of some sort
35+
// This is mostly Parquet convention (see, e.g., `ConversionPatterns`)
4036
val ARRAY_ELEMENTS_SCHEMA_NAME = "values"
37+
val MAP_KEY_SCHEMA_NAME = "key"
38+
val MAP_VALUE_SCHEMA_NAME = "value"
39+
val MAP_SCHEMA_NAME = "map"
4140

4241
protected[parquet] def createConverter(
4342
field: FieldType,
@@ -46,21 +45,16 @@ private[parquet] object CatalystConverter {
4645
val fieldType: DataType = field.dataType
4746
fieldType match {
4847
case ArrayType(elementType: DataType) => {
49-
elementType match {
50-
case StructType(fields) =>
51-
if (fields.size > 1) new CatalystGroupConverter(fields, fieldIndex, parent)
52-
else new CatalystArrayConverter(fields(0).dataType, fieldIndex, parent)
53-
case _ => new CatalystArrayConverter(elementType, fieldIndex, parent)
54-
}
48+
new CatalystArrayConverter(elementType, fieldIndex, parent)
5549
}
5650
case StructType(fields: Seq[StructField]) => {
5751
new CatalystStructConverter(fields, fieldIndex, parent)
5852
}
5953
case MapType(keyType: DataType, valueType: DataType) => {
6054
new CatalystMapConverter(
6155
Seq(
62-
new FieldType("key", keyType, false),
63-
new FieldType("value", valueType, true)),
56+
new FieldType(MAP_KEY_SCHEMA_NAME, keyType, false),
57+
new FieldType(MAP_VALUE_SCHEMA_NAME, valueType, true)),
6458
fieldIndex,
6559
parent)
6660
}

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,11 @@ private[sql] object ParquetTestData {
123123
|optional group ownerPhoneNumbers {
124124
|repeated binary values;
125125
|}
126-
|repeated group contacts {
127-
|required binary name;
128-
|optional binary phoneNumber;
126+
|optional group contacts {
127+
|repeated group values {
128+
|required binary name;
129+
|optional binary phoneNumber;
130+
|}
129131
|}
130132
|}
131133
""".stripMargin
@@ -139,9 +141,11 @@ private[sql] object ParquetTestData {
139141
|optional group longs {
140142
|repeated int64 values;
141143
|}
142-
|repeated group entries {
143-
|required double value;
144-
|optional boolean truth;
144+
|required group entries {
145+
|repeated group values {
146+
|required double value;
147+
|optional boolean truth;
148+
|}
145149
|}
146150
|optional group outerouter {
147151
|repeated group values {
@@ -156,12 +160,16 @@ private[sql] object ParquetTestData {
156160
val testNestedSchema3 =
157161
"""
158162
|message TestNested3 {
159-
|required int32 x;
160-
|repeated group booleanNumberPairs {
161-
|required int32 key;
162-
|repeated group value {
163-
|required double nestedValue;
164-
|optional boolean truth;
163+
|required int32 x;
164+
|optional group booleanNumberPairs {
165+
|repeated group values {
166+
|required int32 key;
167+
|optional group value {
168+
|repeated group values {
169+
|required double nestedValue;
170+
|optional boolean truth;
171+
|}
172+
|}
165173
|}
166174
|}
167175
|}
@@ -268,12 +276,11 @@ private[sql] object ParquetTestData {
268276
.append("values", "555 123 4567")
269277
.append("values", "555 666 1337")
270278
.append("values", "XXX XXX XXXX")
271-
r1.addGroup(2)
272-
// .addGroup(0)
279+
val contacts = r1.addGroup(2)
280+
contacts.addGroup(0)
273281
.append("name", "Dmitriy Ryaboy")
274282
.append("phoneNumber", "555 987 6543")
275-
r1.addGroup(2)
276-
// .addGroup(0)
283+
contacts.addGroup(0)
277284
.append("name", "Chris Aniszczyk")
278285

279286
val r2 = new SimpleGroup(schema)
@@ -298,9 +305,9 @@ private[sql] object ParquetTestData {
298305
longs.add(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME , 1.toLong << 32)
299306
longs.add(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 1.toLong << 33)
300307
longs.add(CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME, 1.toLong << 34)
301-
val booleanNumberPairs = r1.addGroup(3)
302-
booleanNumberPairs.add("value", 2.5)
303-
booleanNumberPairs.add("truth", false)
308+
val booleanNumberPair = r1.addGroup(3).addGroup(0)
309+
booleanNumberPair.add("value", 2.5)
310+
booleanNumberPair.add("truth", false)
304311
val top_level = r1.addGroup(4)
305312
val second_level_a = top_level.addGroup(0)
306313
val second_level_b = top_level.addGroup(0)
@@ -330,17 +337,20 @@ private[sql] object ParquetTestData {
330337

331338
val r1 = new SimpleGroup(schema)
332339
r1.add(0, 1)
333-
val g1 = r1.addGroup(1)
340+
val booleanNumberPairs = r1.addGroup(1)
341+
val g1 = booleanNumberPairs.addGroup(0)
334342
g1.add(0, 1)
335-
val ng1 = g1.addGroup(1)
343+
val nested1 = g1.addGroup(1)
344+
val ng1 = nested1.addGroup(0)
336345
ng1.add(0, 1.5)
337346
ng1.add(1, false)
338-
val ng2 = g1.addGroup(1)
347+
val ng2 = nested1.addGroup(0)
339348
ng2.add(0, 2.5)
340349
ng2.add(1, true)
341-
val g2 = r1.addGroup(1)
350+
val g2 = booleanNumberPairs.addGroup(0)
342351
g2.add(0, 2)
343352
val ng3 = g2.addGroup(1)
353+
.addGroup(0)
344354
ng3.add(0, 3.5)
345355
ng3.add(1, false)
346356

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala

Lines changed: 51 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -64,20 +64,15 @@ private[parquet] object ParquetTypesConverter {
6464
* <ul>
6565
* <li> Primitive types are converter to the corresponding primitive type.</li>
6666
* <li> Group types that have a single field that is itself a group, which has repetition
67-
* level `REPEATED` and two fields (named `key` and `value`), are converted to
68-
* a [[MapType]] with the corresponding key and value (value possibly complex)
69-
* as element type.</li>
70-
* <li> Other group types are converted as follows:<ul>
71-
* <li> Group types that have a single field with repetition `REPEATED` or themselves
72-
* have repetition level `REPEATED` are converted to an [[ArrayType]] with the
73-
* corresponding field type (possibly primitive) as element type.</li>
74-
* <li> Other groups with a single field are converted into a [[StructType]] with
75-
* the corresponding field type.</li>
76-
* <li> If groups have more than one field and repetition level `REPEATED` they are
77-
* converted into an [[ArrayType]] with the corresponding [[StructType]] as complex
78-
* element type.</li>
79-
* <li> Otherwise they are converted into a [[StructType]] with the corresponding
80-
* field types.</li></ul></li>
67+
* level `REPEATED` are treated as follows:<ul>
68+
* <li> If the nested group has name `values` and repetition level `REPEATED`, the
69+
* surrounding group is converted into an [[ArrayType]] with the
70+
* corresponding field type (primitive or complex) as element type.</li>
71+
* <li> If the nested group has name `map`, repetition level `REPEATED` and two fields
72+
* (named `key` and `value`), the surrounding group is converted into a [[MapType]]
73+
* with the corresponding key and value (value possibly complex) types.</li>
74+
* <li> Other group types are converted into a [[StructType]] with the corresponding
75+
* field types.</li></ul></li>
8176
* </ul>
8277
* Note that fields are determined to be `nullable` if and only if their Parquet repetition
8378
* level is not `REQUIRED`.
@@ -93,15 +88,16 @@ private[parquet] object ParquetTypesConverter {
9388
// This mostly follows the convention in ``parquet.schema.ConversionPatterns``
9489
val keyValueGroup = groupType.getFields.apply(0).asGroupType()
9590
keyValueGroup.getRepetition == Repetition.REPEATED &&
96-
keyValueGroup.getName == "map" &&
97-
keyValueGroup.getFields.apply(0).getName == "key" &&
98-
keyValueGroup.getFields.apply(1).getName == "value"
91+
keyValueGroup.getName == CatalystConverter.MAP_SCHEMA_NAME &&
92+
keyValueGroup.getFieldCount == 2 &&
93+
keyValueGroup.getFields.apply(0).getName == CatalystConverter.MAP_KEY_SCHEMA_NAME &&
94+
keyValueGroup.getFields.apply(1).getName == CatalystConverter.MAP_VALUE_SCHEMA_NAME
9995
}
10096
}
10197
def correspondsToArray(groupType: ParquetGroupType): Boolean = {
10298
groupType.getFieldCount == 1 &&
103-
(groupType.getFields.apply(0).getRepetition == Repetition.REPEATED ||
104-
groupType.getRepetition == Repetition.REPEATED)
99+
groupType.getFieldName(0) == CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME &&
100+
groupType.getFields.apply(0).getRepetition == Repetition.REPEATED
105101
}
106102

107103
if (parquetType.isPrimitive) {
@@ -112,17 +108,9 @@ private[parquet] object ParquetTypesConverter {
112108
// if the schema was constructed programmatically there may be hints how to convert
113109
// it inside the metadata via the OriginalType field
114110
case ParquetOriginalType.LIST => { // TODO: check enums!
115-
val fields = groupType.getFields.map {
116-
field => new StructField(
117-
field.getName,
118-
toDataType(field),
119-
field.getRepetition != Repetition.REQUIRED)
120-
}
121-
if (fields.size == 1) {
122-
new ArrayType(fields.apply(0).dataType)
123-
} else {
124-
new ArrayType(StructType(fields))
125-
}
111+
assert(groupType.getFieldCount == 1)
112+
val field = groupType.getFields.apply(0)
113+
new ArrayType(toDataType(field))
126114
}
127115
case ParquetOriginalType.MAP => {
128116
assert(
@@ -153,16 +141,7 @@ private[parquet] object ParquetTypesConverter {
153141
ptype.getName,
154142
toDataType(ptype),
155143
ptype.getRepetition != Repetition.REQUIRED))
156-
157-
if (groupType.getFieldCount == 1) {
158-
new StructType(fields)
159-
} else {
160-
if (parquetType.getRepetition == Repetition.REPEATED) {
161-
new ArrayType(StructType(fields))
162-
} else {
163-
new StructType(fields)
164-
}
165-
}
144+
new StructType(fields)
166145
}
167146
}
168147
}
@@ -199,17 +178,17 @@ private[parquet] object ParquetTypesConverter {
199178
* <li> Primitive types are converted into Parquet's primitive types.</li>
200179
* <li> [[org.apache.spark.sql.catalyst.types.StructType]]s are converted
201180
* into Parquet's `GroupType` with the corresponding field types.</li>
181+
* <li> [[org.apache.spark.sql.catalyst.types.ArrayType]]s are converterd
182+
* into a 2-level nested group, where the outer group has the inner
183+
* group as sole field. The inner group has name `values` and
184+
* repetition level `REPEATED` and has the element type of
185+
* the array as schema. We use Parquet's `ConversionPatterns` for this
186+
* purpose.</li>
202187
* <li> [[org.apache.spark.sql.catalyst.types.MapType]]s are converted
203-
* into a nested (2-level) Parquet `GroupType` with two fields: a key type and
204-
* a value type. The nested group has repetition level `REPEATED`.</li>
205-
* <li> [[org.apache.spark.sql.catalyst.types.ArrayType]]s are handled as follows:<ul>
206-
* <li> If their element is complex, that is of type
207-
* [[org.apache.spark.sql.catalyst.types.StructType]], they are converted
208-
* into a `GroupType` with the corresponding field types of the struct and
209-
* original type of the `GroupType` is set to `LIST`.</li>
210-
* <li> Otherwise, that is they contain a primitive they are converted into a `GroupType`
211-
* that is also a list but has only a single field of the type corresponding to
212-
* the element type.</li></ul></li>
188+
* into a nested (2-level) Parquet `GroupType` with two fields: a key
189+
* type and a value type. The nested group has repetition level
190+
* `REPEATED` and name `map`. We use Parquet's `ConversionPatterns`
191+
* for this purpose</li>
213192
* </ul>
214193
* Parquet's repetition level is generally set according to the following rule:
215194
* <ul>
@@ -218,11 +197,8 @@ private[parquet] object ParquetTypesConverter {
218197
* <li> Otherwise, if the attribute whose type is converted is `nullable`, the Parquet
219198
* type gets repetition level `OPTIONAL` and otherwise `REQUIRED`.</li>
220199
* </ul>
221-
* The single exception to this rule is an [[org.apache.spark.sql.catalyst.types.ArrayType]]
222-
* that contains a [[org.apache.spark.sql.catalyst.types.StructType]], whose repetition level
223-
* is always set to `REPEATED`.
224200
*
225-
* @param ctype The type to convert.
201+
*@param ctype The type to convert
226202
* @param name The name of the [[org.apache.spark.sql.catalyst.expressions.Attribute]]
227203
* whose type is converted
228204
* @param nullable When true indicates that the attribute is nullable
@@ -245,43 +221,38 @@ private[parquet] object ParquetTypesConverter {
245221
new ParquetPrimitiveType(repetition, primitiveType.get, name)
246222
} else {
247223
ctype match {
248-
case ArrayType(elementType: DataType) => {
249-
elementType match {
250-
case StructType(fields) => { // first case: array of structs
251-
val parquetFieldTypes = fields.map(
252-
f => fromDataType(f.dataType, f.name, f.nullable, inArray = false))
253-
assert(
254-
fields.size > 1,
255-
"Found struct inside array with a single field.. error parsing Catalyst schema")
256-
new ParquetGroupType(
257-
Repetition.REPEATED,
258-
name,
259-
ParquetOriginalType.LIST,
260-
parquetFieldTypes)
261-
}
262-
case _ => { // second case: array of primitive types
263-
val parquetElementType = fromDataType(
264-
elementType,
265-
CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
266-
nullable = false,
267-
inArray = true)
268-
ConversionPatterns.listType(repetition, name, parquetElementType)
269-
}
270-
}
224+
case ArrayType(elementType) => {
225+
val parquetElementType = fromDataType(
226+
elementType,
227+
CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
228+
nullable = false,
229+
inArray = true)
230+
ConversionPatterns.listType(repetition, name, parquetElementType)
271231
}
272-
// TODO: test structs inside arrays
273232
case StructType(structFields) => {
274233
val fields = structFields.map {
275234
field => fromDataType(field.dataType, field.name, field.nullable, inArray = false)
276235
}
277236
new ParquetGroupType(repetition, name, fields)
278237
}
279238
case MapType(keyType, valueType) => {
239+
val parquetKeyType =
240+
fromDataType(
241+
keyType,
242+
CatalystConverter.MAP_KEY_SCHEMA_NAME,
243+
false,
244+
inArray = false)
245+
val parquetValueType =
246+
fromDataType(
247+
valueType,
248+
CatalystConverter.MAP_VALUE_SCHEMA_NAME,
249+
true,
250+
inArray = false)
280251
ConversionPatterns.mapType(
281252
repetition,
282253
name,
283-
fromDataType(keyType, "key", false, inArray = false),
284-
fromDataType(valueType, "value", true, inArray = false))
254+
parquetKeyType,
255+
parquetValueType)
285256
}
286257
case _ => sys.error(s"Unsupported datatype $ctype")
287258
}

0 commit comments

Comments
 (0)