Skip to content

Commit 2831588

Browse files
committed
[SPARK-25132][SQL][BACKPORT-2.3] Case-insensitive field resolution when reading from Parquet
1 parent 9cb9d72 commit 2831588

File tree

4 files changed

+161
-30
lines changed

4 files changed

+161
-30
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,9 @@ class ParquetFileFormat
310310
hadoopConf.set(
311311
SQLConf.SESSION_LOCAL_TIMEZONE.key,
312312
sparkSession.sessionState.conf.sessionLocalTimeZone)
313+
hadoopConf.setBoolean(
314+
SQLConf.CASE_SENSITIVE.key,
315+
sparkSession.sessionState.conf.caseSensitiveAnalysis)
313316

314317
ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
315318

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala

Lines changed: 59 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.execution.datasources.parquet
1919

20-
import java.util.{Map => JMap, TimeZone}
20+
import java.util.{Locale, Map => JMap, TimeZone}
2121

2222
import scala.collection.JavaConverters._
2323

@@ -30,6 +30,7 @@ import org.apache.parquet.schema.Type.Repetition
3030

3131
import org.apache.spark.internal.Logging
3232
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
33+
import org.apache.spark.sql.internal.SQLConf
3334
import org.apache.spark.sql.types._
3435

3536
/**
@@ -71,8 +72,10 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone])
7172
StructType.fromString(schemaString)
7273
}
7374

74-
val parquetRequestedSchema =
75-
ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema)
75+
val caseSensitive = context.getConfiguration.getBoolean(SQLConf.CASE_SENSITIVE.key,
76+
SQLConf.CASE_SENSITIVE.defaultValue.get)
77+
val parquetRequestedSchema = ParquetReadSupport.clipParquetSchema(
78+
context.getFileSchema, catalystRequestedSchema, caseSensitive)
7679

7780
new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
7881
}
@@ -117,8 +120,12 @@ private[parquet] object ParquetReadSupport {
117120
* Tailors `parquetSchema` according to `catalystSchema` by removing column paths don't exist
118121
* in `catalystSchema`, and adding those only exist in `catalystSchema`.
119122
*/
120-
def clipParquetSchema(parquetSchema: MessageType, catalystSchema: StructType): MessageType = {
121-
val clippedParquetFields = clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
123+
def clipParquetSchema(
124+
parquetSchema: MessageType,
125+
catalystSchema: StructType,
126+
caseSensitive: Boolean = true): MessageType = {
127+
val clippedParquetFields = clipParquetGroupFields(
128+
parquetSchema.asGroupType(), catalystSchema, caseSensitive)
122129
if (clippedParquetFields.isEmpty) {
123130
ParquetSchemaConverter.EMPTY_MESSAGE
124131
} else {
@@ -129,20 +136,21 @@ private[parquet] object ParquetReadSupport {
129136
}
130137
}
131138

132-
private def clipParquetType(parquetType: Type, catalystType: DataType): Type = {
139+
private def clipParquetType(
140+
parquetType: Type, catalystType: DataType, caseSensitive: Boolean): Type = {
133141
catalystType match {
134142
case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
135143
// Only clips array types with nested type as element type.
136-
clipParquetListType(parquetType.asGroupType(), t.elementType)
144+
clipParquetListType(parquetType.asGroupType(), t.elementType, caseSensitive)
137145

138146
case t: MapType
139147
if !isPrimitiveCatalystType(t.keyType) ||
140148
!isPrimitiveCatalystType(t.valueType) =>
141149
// Only clips map types with nested key type or value type
142-
clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType)
150+
clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType, caseSensitive)
143151

144152
case t: StructType =>
145-
clipParquetGroup(parquetType.asGroupType(), t)
153+
clipParquetGroup(parquetType.asGroupType(), t, caseSensitive)
146154

147155
case _ =>
148156
// UDTs and primitive types are not clipped. For UDTs, a clipped version might not be able
@@ -168,14 +176,15 @@ private[parquet] object ParquetReadSupport {
168176
* of the [[ArrayType]] should also be a nested type, namely an [[ArrayType]], a [[MapType]], or a
169177
* [[StructType]].
170178
*/
171-
private def clipParquetListType(parquetList: GroupType, elementType: DataType): Type = {
179+
private def clipParquetListType(
180+
parquetList: GroupType, elementType: DataType, caseSensitive: Boolean): Type = {
172181
// Precondition of this method, should only be called for lists with nested element types.
173182
assert(!isPrimitiveCatalystType(elementType))
174183

175184
// Unannotated repeated group should be interpreted as required list of required element, so
176185
// list element type is just the group itself. Clip it.
177186
if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition.REPEATED)) {
178-
clipParquetType(parquetList, elementType)
187+
clipParquetType(parquetList, elementType, caseSensitive)
179188
} else {
180189
assert(
181190
parquetList.getOriginalType == OriginalType.LIST,
@@ -207,7 +216,7 @@ private[parquet] object ParquetReadSupport {
207216
Types
208217
.buildGroup(parquetList.getRepetition)
209218
.as(OriginalType.LIST)
210-
.addField(clipParquetType(repeatedGroup, elementType))
219+
.addField(clipParquetType(repeatedGroup, elementType, caseSensitive))
211220
.named(parquetList.getName)
212221
} else {
213222
// Otherwise, the repeated field's type is the element type with the repeated field's
@@ -218,7 +227,7 @@ private[parquet] object ParquetReadSupport {
218227
.addField(
219228
Types
220229
.repeatedGroup()
221-
.addField(clipParquetType(repeatedGroup.getType(0), elementType))
230+
.addField(clipParquetType(repeatedGroup.getType(0), elementType, caseSensitive))
222231
.named(repeatedGroup.getName))
223232
.named(parquetList.getName)
224233
}
@@ -231,7 +240,10 @@ private[parquet] object ParquetReadSupport {
231240
* a [[StructType]].
232241
*/
233242
private def clipParquetMapType(
234-
parquetMap: GroupType, keyType: DataType, valueType: DataType): GroupType = {
243+
parquetMap: GroupType,
244+
keyType: DataType,
245+
valueType: DataType,
246+
caseSensitive: Boolean): GroupType = {
235247
// Precondition of this method, only handles maps with nested key types or value types.
236248
assert(!isPrimitiveCatalystType(keyType) || !isPrimitiveCatalystType(valueType))
237249

@@ -243,8 +255,8 @@ private[parquet] object ParquetReadSupport {
243255
Types
244256
.repeatedGroup()
245257
.as(repeatedGroup.getOriginalType)
246-
.addField(clipParquetType(parquetKeyType, keyType))
247-
.addField(clipParquetType(parquetValueType, valueType))
258+
.addField(clipParquetType(parquetKeyType, keyType, caseSensitive))
259+
.addField(clipParquetType(parquetValueType, valueType, caseSensitive))
248260
.named(repeatedGroup.getName)
249261

250262
Types
@@ -262,8 +274,9 @@ private[parquet] object ParquetReadSupport {
262274
* [[MessageType]]. Because it's legal to construct an empty requested schema for column
263275
* pruning.
264276
*/
265-
private def clipParquetGroup(parquetRecord: GroupType, structType: StructType): GroupType = {
266-
val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType)
277+
private def clipParquetGroup(
278+
parquetRecord: GroupType, structType: StructType, caseSensitive: Boolean): GroupType = {
279+
val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType, caseSensitive)
267280
Types
268281
.buildGroup(parquetRecord.getRepetition)
269282
.as(parquetRecord.getOriginalType)
@@ -277,14 +290,35 @@ private[parquet] object ParquetReadSupport {
277290
* @return A list of clipped [[GroupType]] fields, which can be empty.
278291
*/
279292
private def clipParquetGroupFields(
280-
parquetRecord: GroupType, structType: StructType): Seq[Type] = {
281-
val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
293+
parquetRecord: GroupType, structType: StructType, caseSensitive: Boolean): Seq[Type] = {
282294
val toParquet = new SparkToParquetSchemaConverter(writeLegacyParquetFormat = false)
283-
structType.map { f =>
284-
parquetFieldMap
285-
.get(f.name)
286-
.map(clipParquetType(_, f.dataType))
287-
.getOrElse(toParquet.convertField(f))
295+
if (caseSensitive) {
296+
val caseSensitiveParquetFieldMap =
297+
parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
298+
structType.map { f =>
299+
caseSensitiveParquetFieldMap
300+
.get(f.name)
301+
.map(clipParquetType(_, f.dataType, caseSensitive))
302+
.getOrElse(toParquet.convertField(f))
303+
}
304+
} else {
305+
// Do case-insensitive resolution only if in case-insensitive mode
306+
val caseInsensitiveParquetFieldMap =
307+
parquetRecord.getFields.asScala.groupBy(_.getName.toLowerCase(Locale.ROOT))
308+
structType.map { f =>
309+
caseInsensitiveParquetFieldMap
310+
.get(f.name.toLowerCase(Locale.ROOT))
311+
.map { parquetTypes =>
312+
if (parquetTypes.size > 1) {
313+
// Need to fail if there is ambiguity, i.e. more than one field is matched
314+
val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
315+
throw new RuntimeException(s"""Found duplicate field(s) "${f.name}": """ +
316+
s"$parquetTypesString in case-insensitive mode")
317+
} else {
318+
clipParquetType(parquetTypes.head, f.dataType, caseSensitive)
319+
}
320+
}.getOrElse(toParquet.convertField(f))
321+
}
288322
}
289323
}
290324

sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,4 +156,47 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
156156
}
157157
}
158158
}
159+
160+
test(s"SPARK-25132: case-insensitive field resolution when reading from Parquet") {
161+
withTempDir { dir =>
162+
val format = "parquet"
163+
val tableDir = dir.getCanonicalPath + s"/$format"
164+
val tableName = s"spark_25132_${format}"
165+
withTable(tableName) {
166+
val end = 5
167+
val data = spark.range(end).selectExpr("id as A", "id * 2 as b", "id * 3 as B")
168+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
169+
data.write.format(format).mode("overwrite").save(tableDir)
170+
}
171+
sql(s"CREATE TABLE $tableName (a LONG, b LONG) USING $format LOCATION '$tableDir'")
172+
173+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
174+
checkAnswer(sql(s"select a from $tableName"), data.select("A"))
175+
checkAnswer(sql(s"select A from $tableName"), data.select("A"))
176+
177+
// RuntimeException is triggered at executor side, which is then wrapped as
178+
// SparkException at driver side
179+
val e1 = intercept[SparkException] {
180+
sql(s"select b from $tableName").collect()
181+
}
182+
assert(
183+
e1.getCause.isInstanceOf[RuntimeException] &&
184+
e1.getCause.getMessage.contains(
185+
"""Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
186+
val e2 = intercept[SparkException] {
187+
sql(s"select B from $tableName").collect()
188+
}
189+
assert(
190+
e2.getCause.isInstanceOf[RuntimeException] &&
191+
e2.getCause.getMessage.contains(
192+
"""Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
193+
}
194+
195+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
196+
checkAnswer(sql(s"select a from $tableName"), (0 until end).map(_ => Row(null)))
197+
checkAnswer(sql(s"select b from $tableName"), data.select("b"))
198+
}
199+
}
200+
}
201+
}
159202
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1014,19 +1014,21 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
10141014
testName: String,
10151015
parquetSchema: String,
10161016
catalystSchema: StructType,
1017-
expectedSchema: String): Unit = {
1017+
expectedSchema: String,
1018+
caseSensitive: Boolean = true): Unit = {
10181019
testSchemaClipping(testName, parquetSchema, catalystSchema,
1019-
MessageTypeParser.parseMessageType(expectedSchema))
1020+
MessageTypeParser.parseMessageType(expectedSchema), caseSensitive)
10201021
}
10211022

10221023
private def testSchemaClipping(
10231024
testName: String,
10241025
parquetSchema: String,
10251026
catalystSchema: StructType,
1026-
expectedSchema: MessageType): Unit = {
1027+
expectedSchema: MessageType,
1028+
caseSensitive: Boolean): Unit = {
10271029
test(s"Clipping - $testName") {
10281030
val actual = ParquetReadSupport.clipParquetSchema(
1029-
MessageTypeParser.parseMessageType(parquetSchema), catalystSchema)
1031+
MessageTypeParser.parseMessageType(parquetSchema), catalystSchema, caseSensitive)
10301032

10311033
try {
10321034
expectedSchema.checkContains(actual)
@@ -1387,7 +1389,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
13871389

13881390
catalystSchema = new StructType(),
13891391

1390-
expectedSchema = ParquetSchemaConverter.EMPTY_MESSAGE)
1392+
expectedSchema = ParquetSchemaConverter.EMPTY_MESSAGE,
1393+
caseSensitive = true)
13911394

13921395
testSchemaClipping(
13931396
"disjoint field sets",
@@ -1544,4 +1547,52 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
15441547
| }
15451548
|}
15461549
""".stripMargin)
1550+
1551+
testSchemaClipping(
1552+
"case-insensitive resolution: no ambiguity",
1553+
parquetSchema =
1554+
"""message root {
1555+
| required group A {
1556+
| optional int32 B;
1557+
| }
1558+
| optional int32 c;
1559+
|}
1560+
""".stripMargin,
1561+
catalystSchema = {
1562+
val nestedType = new StructType().add("b", IntegerType, nullable = true)
1563+
new StructType()
1564+
.add("a", nestedType, nullable = true)
1565+
.add("c", IntegerType, nullable = true)
1566+
},
1567+
expectedSchema =
1568+
"""message root {
1569+
| required group A {
1570+
| optional int32 B;
1571+
| }
1572+
| optional int32 c;
1573+
|}
1574+
""".stripMargin,
1575+
caseSensitive = false)
1576+
1577+
test("Clipping - case-insensitive resolution: more than one field is matched") {
1578+
val parquetSchema =
1579+
"""message root {
1580+
| required group A {
1581+
| optional int32 B;
1582+
| }
1583+
| optional int32 c;
1584+
| optional int32 a;
1585+
|}
1586+
""".stripMargin
1587+
val catalystSchema = {
1588+
val nestedType = new StructType().add("b", IntegerType, nullable = true)
1589+
new StructType()
1590+
.add("a", nestedType, nullable = true)
1591+
.add("c", IntegerType, nullable = true)
1592+
}
1593+
assertThrows[RuntimeException] {
1594+
ParquetReadSupport.clipParquetSchema(
1595+
MessageTypeParser.parseMessageType(parquetSchema), catalystSchema, caseSensitive = false)
1596+
}
1597+
}
15471598
}

0 commit comments

Comments
 (0)