-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-4502][SQL]Support parquet nested struct pruning and add relevant test #14957
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -259,8 +259,23 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru | |||
* @throws IllegalArgumentException if a field with the given name does not exist | |||
*/ | |||
def apply(name: String): StructField = { | |||
nameToField.getOrElse(name, | |||
throw new IllegalArgumentException(s"""Field "$name" does not exist.""")) | |||
if (name.contains('.')) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this will drop the support to access the field name containing .
(e.g. "a.b"
) which can be accessed via "
a.b"
. Could you confirm this please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon Thanks for your review, mix the recursively get with the default apply has this problem, I fixed it in next patch and use ',' which is a invalid character in Parquet schema
@@ -97,7 +98,16 @@ object FileSourceStrategy extends Strategy with Logging { | |||
dataColumns | |||
.filter(requiredAttributes.contains) | |||
.filterNot(partitionColumns.contains) | |||
val outputSchema = readDataColumns.toStructType | |||
val outputSchema = if (fsRelation.sqlContext.conf.isParquetNestColumnPruning) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will affect all other data sources. I am pretty sure any tests related with this will not pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I run all datasource tests by
./build/sbt "test-only org.apache.spark.sql.execution.datasources.*"
three test failed, but run failed suit seperately, all tests can passwd
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All three tests failed because a datetime check error, correct answer is like '2015-05-23' but the spark answer is '2015-05-22', I don't think this error is made by my patch.
Do anybody have the same problem before? it really confusing me and running test suit seperate it will pass!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I meant enabling/disabling affects the other data sources. I see it's disabled by default. I ran ,for example, JsonSuite
after manually enabling this option (after leaving the comment above) and saw some failures related with nested structures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, do you mind if I ask which tests were failed? I will try to reproduce by myself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's my mistake here, I can only make sure this patch work for parquet,so I should check the fileFormat here, also like the config namespace(spark.sql.parquet.nestColumnPruning
), it can only work for parquet. I add a patch to fix this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran the command
./build/sbt "test-only org.apache.spark.sql.execution.datasources.*"
locally and three test suits failed
[error] Failed tests:
[error] org.apache.spark.sql.execution.datasources.csv.CSVSuite
[error] org.apache.spark.sql.execution.datasources.json.JsonSuite
[error] org.apache.spark.sql.execution.datasources.parquet.ParquetPartitionDiscoverySuite
But I run them separately, all three pass. Also I run
./build/sbt "test-only org.apache.spark.sql.execution.csv.*"
./build/sbt "test-only org.apache.spark.sql.execution.json.*"
all tests pass.
Could you please check out if related tests pass locally? It seems it affects all other data sources.
|
Also, it seems you might need to update your PR description. It seems the last commit you just pushed acts differently with your PR description. In addition, maybe you would need to fix the title of this PR to be complete (without |
(BTW, I just left some comments because I am interested in some codes in this path. You can wait for committer's review.) |
(Thanks for your comments :) ) |
This would be good to include in Spark 2.1... cc @davies and @liancheng |
test this please |
add to whitelist |
Test build #67316 has finished for PR 14957 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late reply and thanks for the contribution!
I've been expected for this feature for a long time and had once come up with basically the same idea but didn't get time to implement it. Thanks!
I haven't finished reviewing everything but would like to post my comments for the first round of review so that we can iterate.
// Merge schema in same StructType and merge with filterAttributes | ||
prunedSchema.fields.map(f => StructType(Array(f))).reduceLeft(_ merge _) | ||
.merge(filterAttributes.toSeq.toStructType) | ||
} else readDataColumns.toStructType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please re-format the above change to the following format:
if (
... &&
...
) {
...
} else {
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix done
@@ -126,4 +136,52 @@ object FileSourceStrategy extends Strategy with Logging { | |||
|
|||
case _ => Nil | |||
} | |||
|
|||
private def generateStructFieldsContainsNesting(projects: Seq[Expression], | |||
totalSchema: StructType) : Seq[StructField] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check Spark code style guide and re-format this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you please add comments and test cases for testing this method, which is basically the essential part of this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix code style done.
No problem, I'll add tests for the private func generateStructFieldsContainsNesting next patch, this patch fix all code style and naming problem.
private def generateStructFieldsContainsNesting(projects: Seq[Expression], | ||
totalSchema: StructType) : Seq[StructField] = { | ||
def generateStructField(curField: List[String], | ||
node: Expression) : Seq[StructField] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix done
} | ||
|
||
def getFieldRecursively(totalSchema: StructType, | ||
name: List[String]): StructField = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix done
@@ -212,6 +212,11 @@ object SQLConf { | |||
.booleanConf | |||
.createWithDefault(true) | |||
|
|||
val PARQUET_NEST_COLUMN_PRUNING = SQLConfigBuilder("spark.sql.parquet.nestColumnPruning") | |||
.doc("When set this to true, we will tell parquet only read the nest column`s leaf fields ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please reword this doc string to:
When true, Parquet column pruning also works for nested fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reword done
@@ -212,6 +212,11 @@ object SQLConf { | |||
.booleanConf | |||
.createWithDefault(true) | |||
|
|||
val PARQUET_NEST_COLUMN_PRUNING = SQLConfigBuilder("spark.sql.parquet.nestColumnPruning") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename to PARQUET_NESTED_COLUMN_PRUNING
and spark.sql.parquet.nestedColumnPruning
respectively.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename done
@@ -661,6 +666,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { | |||
|
|||
def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP) | |||
|
|||
def isParquetNestColumnPruning: Boolean = getConf(PARQUET_NEST_COLUMN_PRUNING) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parquetNestedColumnPruningEnabled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename done
// |-- num: long (nullable = true) | ||
// |-- str: string (nullable = true) | ||
val df = readResourceParquetFile("test-data/nested-struct.snappy.parquet") | ||
df.createOrReplaceTempView("tmp_table") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may use SQLTestUtils.withTempView
to wrap this test so that you don't need to drop the temporary view manually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix done
val outputSchema = readDataColumns.toStructType | ||
val outputSchema = if (fsRelation.sqlContext.conf.isParquetNestColumnPruning | ||
&& fsRelation.fileFormat.isInstanceOf[ParquetFileFormat]) { | ||
val totalSchema = readDataColumns.toStructType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe fullSchema
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix done
} else { | ||
totalSchema(name.head) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually this function can be simplified to:
def getNestedField(schema: StructType, path: Seq[String]): StructField = {
require(path.nonEmpty, "<error message>")
path.tail.foldLeft(schema(path.head)) { (field, name) =>
field.dataType match {
case t: StructType => t(name)
case _ => ??? // Throw exception here
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The func getFieldRecursively here need the return value which is a StructField contains all nested relation in path. For example:
The fullSchema is:
root
|-- col: struct (nullable = true)
| |-- s1: struct (nullable = true)
| | |-- s1_1: long (nullable = true)
| | |-- s1_2: long (nullable = true)
| |-- str: string (nullable = true)
|-- num: long (nullable = true)
|-- str: string (nullable = true)
and when we want to get col.s1.s1_1
, the func should return:
StructField(col,StructType(StructField(s1,StructType(StructField(s1_1,LongType,true)),true)),true)
So maybe I can't use the simplified func getNestedField because it returns only the last StructField:
StructField(s1_1,LongType,true)
Together with this one, we should have a optimizer rule that could 1) extract GetStructField (and others) and push that down closer to the data source, or 2) flatten all the nested field in data source, then replace GetStructField as the flatten ones, the pruning not used ones. |
Test build #67410 has finished for PR 14957 at commit
|
Test build #67515 has finished for PR 14957 at commit
|
Test build #67550 has finished for PR 14957 at commit
|
Test build #67564 has finished for PR 14957 at commit
|
case attr: AttributeReference => | ||
Seq(getFieldRecursively(totalSchema, attr.name :: curField)) | ||
case sf: GetStructField => | ||
generateStructField(sf.name.get :: curField, sf.child) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name is optional and might not be set. We should retrieve the actual field name using the ordinal of sf
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix done.
a little question here, all projects parse from sql must have the name while projects from dataframe api may not , right?
val outputSchema = readDataColumns.toStructType | ||
val outputSchema = if ( | ||
fsRelation.sqlContext.conf.parquetNestedColumnPruningEnabled && | ||
fsRelation.fileFormat.isInstanceOf[ParquetFileFormat] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use two space indentation here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix done
val result = FileSourceStrategy.invokePrivate[Seq[StructField]](testFunc(projects, | ||
fullSchema)) | ||
assert(result == expextResult) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to split this method into several test cases that test some typical but minimal cases.
BTW, I tried the following test code:
test("foo") {
val schema = new StructType()
.add("f0", IntegerType)
.add("f1", new StructType()
.add("f10", IntegerType))
val expr = GetStructField(
CreateNamedStruct(Seq(
Literal("f10"),
AttributeReference("f0", IntegerType)()
)),
0,
Some("f10")
)
StructType(
FileSourceStrategy.generateStructFieldsContainsNesting(expr :: Nil, schema)
).printTreeString()
}
and it fails with the following exception:
[info] - foo *** FAILED *** (37 milliseconds)
[info] java.lang.IllegalArgumentException: Field "f0" is not struct field.
[info] at org.apache.spark.sql.execution.datasources.FileSourceStrategy$.getFieldRecursively$1(FileSourceStrategy.scala:188)
[info] at org.apache.spark.sql.execution.datasources.FileSourceStrategy$.org$apache$spark$sql$execution$datasources$FileSourceStrategy$$generateStructField$1(FileSourceStrategy.scala:166)
[info] at org.apache.spark.sql.execution.datasources.FileSourceStrategy$$anonfun$org$apache$spark$sql$execution$datasources$FileSourceStrategy$$generateStructField$1$1.apply(FileSourceStrategy.scala:171)
[info] at org.apache.spark.sql.execution.datasources.FileSourceStrategy$$anonfun$org$apache$spark$sql$execution$datasources$FileSourceStrategy$$generateStructField$1$1.apply(FileSourceStrategy.scala:171)
[info] at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
[info] at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
[info] at scala.collection.immutable.List.foreach(List.scala:381)
[info] at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
[info] at scala.collection.immutable.List.flatMap(List.scala:344)
[info] at org.apache.spark.sql.execution.datasources.FileSourceStrategy$.org$apache$spark$sql$execution$datasources$FileSourceStrategy$$generateStructField$1(FileSourceStrategy.scala:171)
[info] at org.apache.spark.sql.execution.datasources.FileSourceStrategy$$anonfun$generateStructFieldsContainsNesting$1.apply(FileSourceStrategy.scala:195)
[info] at org.apache.spark.sql.execution.datasources.FileSourceStrategy$$anonfun$generateStructFieldsContainsNesting$1.apply(FileSourceStrategy.scala:195)
[info] at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
[info] at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
[info] at scala.collection.immutable.List.foreach(List.scala:381)
[info] at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
[info] at scala.collection.immutable.List.flatMap(List.scala:344)
[info] at org.apache.spark.sql.execution.datasources.FileSourceStrategy$.generateStructFieldsContainsNesting(FileSourceStrategy.scala:195)
[info] at org.apache.spark.sql.execution.datasources.FileSourceStrategySuite$$anonfun$16.apply$mcV$sp(FileSourceStrategySuite.scala:462)
[info] at org.apache.spark.sql.execution.datasources.FileSourceStrategySuite$$anonfun$16.apply(FileSourceStrategySuite.scala:446)
[info] at org.apache.spark.sql.execution.datasources.FileSourceStrategySuite$$anonfun$16.apply(FileSourceStrategySuite.scala:446)
[info] at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
[info] at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info] at org.scalatest.Transformer.apply(Transformer.scala:22)
[info] at org.scalatest.Transformer.apply(Transformer.scala:20)
[info] at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
[info] at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:68)
[info] at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
[info] at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
[info] at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
[info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info] at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
[info] at org.apache.spark.sql.execution.datasources.FileSourceStrategySuite.org$scalatest$BeforeAndAfterEach$$super$runTest(FileSourceStrategySuite.scala:42)
[info] at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:255)
[info] at org.apache.spark.sql.execution.datasources.FileSourceStrategySuite.runTest(FileSourceStrategySuite.scala:42)
[info] at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
[info] at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208)
[info] at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
[info] at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
[info] at scala.collection.immutable.List.foreach(List.scala:381)
[info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info] at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
[info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
[info] at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208)
[info] at org.scalatest.FunSuite.runTests(FunSuite.scala:1555)
[info] at org.scalatest.Suite$class.run(Suite.scala:1424)
[info] at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555)
[info] at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
[info] at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212)
[info] at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
[info] at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212)
[info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:31)
[info] at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
[info] at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
[info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:31)
[info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:357)
[info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:502)
[info] at sbt.ForkMain$Run$2.call(ForkMain.java:296)
[info] at sbt.ForkMain$Run$2.call(ForkMain.java:286)
[info] at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[info] at java.lang.Thread.run(Thread.java:745)
Basically, we also need to consider named_struct
and struct
expressions to get corner cases correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix done. Thanks for liancheng's remind.
Here I considered the CreateStruct(Unsafe) and CreateNamedStruct(Unsafe), other expressions in complexTypeCreator(CreateArray, CreateMap) just ignore.
@@ -126,4 +140,59 @@ object FileSourceStrategy extends Strategy with Logging { | |||
|
|||
case _ => Nil | |||
} | |||
|
|||
private def generateStructFieldsContainsNesting( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may make this method private[sql]
so that you don't need to rely on the PrivateMethod
ScalaTest trick to test it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix done
Test build #67781 has finished for PR 14957 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Add support for CreateStruct(Unsafe) and CreateNamedStruct(Unsafe)
- Split origin test case to minimal test cases
- Add test case for named_struct
@xuanyuanking Have you determined if the functionality provided here is superseded by #16578? I am trying to figure out which PR to help out on since I need this feature as well. |
@saulshanabrook looks like #16578 is a superset, trying to invest in that pull request. |
@xuanyuanking, let's close this and help review #16578 if you agree on the comments above. |
OK, I'll close this and just use it in our internal env, thanks all guys's suggestion and review work. Next we may try more complex scenario of this. |
What changes were proposed in this pull request?
Like the description in SPARK-4502, we have the same problem in Baidu and our user's parquet file has complex nested parquet struct(400+ fields and 4 layer nested) so this problem brings unnecessary data read and time spend. This pr fixed the problem and main fix ideas list as follows:
spark.sql.parquet.nestColumnPruning
, when it’s closed, same logical with beforeFileSourceStrategy
, traverseprojects[NamedExpression]
and generate the access path of nested struct.For example: in query
select people.addr.city from table
project getStructField('city', getStructField('addr', AttributeRefence('people'))) will have the access path ['people', 'addr', 'city']
For example: the json format of struct type
and
will merge to
How was this patch tested?
add new test in
ParquetQuerySuite