Skip to content

Commit ab8f5ec

Browse files
committed
fix code style and variable name
1 parent 23465ba commit ab8f5ec

File tree

3 files changed

+41
-35
lines changed

3 files changed

+41
-35
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -99,15 +99,19 @@ object FileSourceStrategy extends Strategy with Logging {
9999
dataColumns
100100
.filter(requiredAttributes.contains)
101101
.filterNot(partitionColumns.contains)
102-
val outputSchema = if (fsRelation.sqlContext.conf.isParquetNestColumnPruning
103-
&& fsRelation.fileFormat.isInstanceOf[ParquetFileFormat]) {
104-
val totalSchema = readDataColumns.toStructType
102+
val outputSchema = if (
103+
fsRelation.sqlContext.conf.parquetNestedColumnPruningEnabled &&
104+
fsRelation.fileFormat.isInstanceOf[ParquetFileFormat]
105+
) {
106+
val fullSchema = readDataColumns.toStructType
105107
val prunedSchema = StructType(
106-
generateStructFieldsContainsNesting(projects, totalSchema))
108+
generateStructFieldsContainsNesting(projects, fullSchema))
107109
// Merge schema in same StructType and merge with filterAttributes
108110
prunedSchema.fields.map(f => StructType(Array(f))).reduceLeft(_ merge _)
109111
.merge(filterAttributes.toSeq.toStructType)
110-
} else readDataColumns.toStructType
112+
} else {
113+
readDataColumns.toStructType
114+
}
111115
logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")
112116

113117
val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
@@ -137,10 +141,12 @@ object FileSourceStrategy extends Strategy with Logging {
137141
case _ => Nil
138142
}
139143

140-
private def generateStructFieldsContainsNesting(projects: Seq[Expression],
141-
totalSchema: StructType) : Seq[StructField] = {
142-
def generateStructField(curField: List[String],
143-
node: Expression) : Seq[StructField] = {
144+
private def generateStructFieldsContainsNesting(
145+
projects: Seq[Expression],
146+
fullSchema: StructType) : Seq[StructField] = {
147+
def generateStructField(
148+
curField: List[String],
149+
node: Expression) : Seq[StructField] = {
144150
node match {
145151
case ai: GetArrayItem =>
146152
// Here we drop the previous for simplify array and map support.
@@ -151,7 +157,7 @@ object FileSourceStrategy extends Strategy with Logging {
151157
case mv: GetMapValue =>
152158
generateStructField(List.empty[String], mv.child)
153159
case attr: AttributeReference =>
154-
Seq(getFieldRecursively(totalSchema, attr.name :: curField))
160+
Seq(getFieldRecursively(fullSchema, attr.name :: curField))
155161
case sf: GetStructField =>
156162
generateStructField(sf.name.get :: curField, sf.child)
157163
case _ =>
@@ -163,11 +169,12 @@ object FileSourceStrategy extends Strategy with Logging {
163169
}
164170
}
165171

166-
def getFieldRecursively(totalSchema: StructType,
167-
name: List[String]): StructField = {
172+
def getFieldRecursively(
173+
schema: StructType,
174+
name: List[String]): StructField = {
168175
if (name.length > 1) {
169176
val curField = name.head
170-
val curFieldType = totalSchema(curField)
177+
val curFieldType = schema(curField)
171178
curFieldType.dataType match {
172179
case st: StructType =>
173180
val newField = getFieldRecursively(StructType(st.fields), name.drop(1))
@@ -177,7 +184,7 @@ object FileSourceStrategy extends Strategy with Logging {
177184
throw new IllegalArgumentException(s"""Field "$curField" is not struct field.""")
178185
}
179186
} else {
180-
totalSchema(name.head)
187+
schema(name.head)
181188
}
182189
}
183190

sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,8 +212,8 @@ object SQLConf {
212212
.booleanConf
213213
.createWithDefault(true)
214214

215-
val PARQUET_NEST_COLUMN_PRUNING = SQLConfigBuilder("spark.sql.parquet.nestColumnPruning")
216-
.doc("When set this to true, we will tell parquet only read the nest column`s leaf fields ")
215+
val PARQUET_NESTED_COLUMN_PRUNING = SQLConfigBuilder("spark.sql.parquet.nestedColumnPruning")
216+
.doc("When true, Parquet column pruning also works for nested fields.")
217217
.booleanConf
218218
.createWithDefault(false)
219219

@@ -666,7 +666,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
666666

667667
def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP)
668668

669-
def isParquetNestColumnPruning: Boolean = getConf(PARQUET_NEST_COLUMN_PRUNING)
669+
def parquetNestedColumnPruningEnabled: Boolean = getConf(PARQUET_NESTED_COLUMN_PRUNING)
670670

671671
def writeLegacyParquetFormat: Boolean = getConf(PARQUET_WRITE_LEGACY_FORMAT)
672672

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -581,25 +581,24 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
581581
// | |-- str: string (nullable = true)
582582
// |-- num: long (nullable = true)
583583
// |-- str: string (nullable = true)
584-
val df = readResourceParquetFile("test-data/nested-struct.snappy.parquet")
585-
df.createOrReplaceTempView("tmp_table")
586-
// normal test
587-
val query1 = "select num,col.s1.s1_1 from tmp_table"
588-
val result1 = sql(query1)
589-
withSQLConf(SQLConf.PARQUET_NEST_COLUMN_PRUNING.key -> "true") {
590-
checkAnswer(sql(query1), result1)
591-
}
592-
// test for same struct meta merge
593-
// col.s1.s1_1 and col.str should merge
594-
// like col.[s1.s1_1, str] before pass to parquet
595-
val query2 = "select col.s1.s1_1,col.str from tmp_table"
596-
val result2 = sql(query2)
597-
withSQLConf(SQLConf.PARQUET_NEST_COLUMN_PRUNING.key -> "true") {
598-
checkAnswer(sql(query2), result2)
584+
withTempView("tmp_table") {
585+
val df = readResourceParquetFile("test-data/nested-struct.snappy.parquet")
586+
df.createOrReplaceTempView("tmp_table")
587+
// normal test
588+
val query1 = "select num,col.s1.s1_1 from tmp_table"
589+
val result1 = sql(query1)
590+
withSQLConf(SQLConf.PARQUET_NESTED_COLUMN_PRUNING.key -> "true") {
591+
checkAnswer(sql(query1), result1)
592+
}
593+
// test for same struct meta merge
594+
// col.s1.s1_1 and col.str should merge
595+
// like col.[s1.s1_1, str] before pass to parquet
596+
val query2 = "select col.s1.s1_1,col.str from tmp_table"
597+
val result2 = sql(query2)
598+
withSQLConf(SQLConf.PARQUET_NESTED_COLUMN_PRUNING.key -> "true") {
599+
checkAnswer(sql(query2), result2)
600+
}
599601
}
600-
601-
spark.sessionState.catalog.dropTable(
602-
TableIdentifier("tmp_table"), ignoreIfNotExists = true, purge = false)
603602
}
604603

605604
test("expand UDT in StructType") {

0 commit comments

Comments
 (0)