-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-4502][SQL]Support parquet nested struct pruning and add relevant test #14957
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7eaa428
46153f1
46c2474
1c34877
23465ba
ab8f5ec
92ed369
5697911
d9aa397
d093c82
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,16 +27,15 @@ import org.apache.hadoop.mapreduce.Job | |
|
||
import org.apache.spark.{SparkConf, SparkException} | ||
import org.apache.spark.sql._ | ||
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.catalyst.{util, InternalRow} | ||
import org.apache.spark.sql.catalyst.catalog.BucketSpec | ||
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper} | ||
import org.apache.spark.sql.catalyst.util | ||
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, CreateNamedStruct, Expression, ExpressionSet, GetArrayItem, GetStructField, Literal, PredicateHelper} | ||
import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan} | ||
import org.apache.spark.sql.functions._ | ||
import org.apache.spark.sql.internal.SQLConf | ||
import org.apache.spark.sql.sources._ | ||
import org.apache.spark.sql.test.SharedSQLContext | ||
import org.apache.spark.sql.types.{IntegerType, StructType} | ||
import org.apache.spark.sql.types._ | ||
import org.apache.spark.util.Utils | ||
|
||
class FileSourceStrategySuite extends QueryTest with SharedSQLContext with PredicateHelper { | ||
|
@@ -442,6 +441,132 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi | |
} | ||
} | ||
|
||
test("[SPARK-4502] pruning nested schema by GetStructField projects") { | ||
// Construct fullSchema like below: | ||
// root | ||
// |-- col: struct (nullable = true) | ||
// | |-- s1: struct (nullable = true) | ||
// | | |-- s1_1: long (nullable = true) | ||
// | | |-- s1_2: long (nullable = true) | ||
// | |-- str: string (nullable = true) | ||
// |-- num: long (nullable = true) | ||
// |-- str: string (nullable = true) | ||
val nested_s1 = StructField("s1", | ||
StructType( | ||
Seq( | ||
StructField("s1_1", LongType, true), | ||
StructField("s1_2", LongType, true) | ||
) | ||
), true) | ||
val flat_str = StructField("str", StringType, true) | ||
|
||
val fullSchema = StructType( | ||
Seq( | ||
StructField("col", StructType(Seq(nested_s1, flat_str)), true), | ||
StructField("num", LongType, true), | ||
flat_str | ||
)) | ||
|
||
// Attr of struct col | ||
val colAttr = AttributeReference("col", StructType( | ||
Seq(nested_s1, flat_str)), true)() | ||
// Child expression of col.s1.s1_1 | ||
val childExp = GetStructField( | ||
GetStructField(colAttr, 0, Some("s1")), 0, Some("s1_1")) | ||
|
||
// Project list of "select num, col.s1.s1_1 as s1_1" | ||
val projects = Seq( | ||
AttributeReference("num", LongType, true)(), | ||
Alias(childExp, "s1_1")() | ||
) | ||
val expextResult = | ||
Seq( | ||
StructField("num", LongType, true), | ||
StructField("col", StructType( | ||
Seq( | ||
StructField( | ||
"s1", | ||
StructType(Seq(StructField("s1_1", LongType, true))), | ||
true) | ||
) | ||
), true) | ||
) | ||
// Call the function generateStructFieldsContainsNesting | ||
val result = FileSourceStrategy.generateStructFieldsContainsNesting(projects, | ||
fullSchema) | ||
assert(result == expextResult) | ||
} | ||
|
||
test("[SPARK-4502] pruning nested schema by GetArrayItem projects") { | ||
// Construct fullSchema like below: | ||
// root | ||
// |-- col: struct (nullable = true) | ||
// | |-- info_list: array (nullable = true) | ||
// | | |-- element: struct (containsNull = true) | ||
// | | | |-- s1: struct (nullable = true) | ||
// | | | | |-- s1_1: long (nullable = true) | ||
// | | | | |-- s1_2: long (nullable = true) | ||
val nested_s1 = StructField("s1", | ||
StructType( | ||
Seq( | ||
StructField("s1_1", LongType, true), | ||
StructField("s1_2", LongType, true) | ||
) | ||
), true) | ||
val nested_arr = StructField("info_list", ArrayType(StructType(Seq(nested_s1))), true) | ||
|
||
val fullSchema = StructType( | ||
Seq( | ||
StructField("col", StructType(Seq(nested_arr)), true) | ||
)) | ||
|
||
// Attr of struct col | ||
val colAttr = AttributeReference("col", StructType( | ||
Seq(nested_arr)), true)() | ||
// Child expression of col.info_list[0].s1.s1_1 | ||
val arrayChildExp = GetStructField( | ||
GetStructField( | ||
GetArrayItem( | ||
GetStructField(colAttr, 0, Some("info_list")), | ||
Literal(0) | ||
), 0, Some("s1") | ||
), 0, Some("s1_1") | ||
) | ||
// Project list of "select col.info_list[0].s1.s1_1 as complex_get" | ||
val projects = Seq( | ||
Alias(arrayChildExp, "complex_get")() | ||
) | ||
val expextResult = | ||
Seq( | ||
StructField("col", StructType(Seq(nested_arr))) | ||
) | ||
// Call the function generateStructFieldsContainsNesting | ||
val result = FileSourceStrategy.generateStructFieldsContainsNesting(projects, | ||
fullSchema) | ||
assert(result == expextResult) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice to split this method into several test cases that test some typical but minimal cases. BTW, I tried the following test code: test("foo") {
val schema = new StructType()
.add("f0", IntegerType)
.add("f1", new StructType()
.add("f10", IntegerType))
val expr = GetStructField(
CreateNamedStruct(Seq(
Literal("f10"),
AttributeReference("f0", IntegerType)()
)),
0,
Some("f10")
)
StructType(
FileSourceStrategy.generateStructFieldsContainsNesting(expr :: Nil, schema)
).printTreeString()
} and it fails with the following exception:
Basically, we also need to consider There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fix done. Thanks for liancheng's remind. |
||
|
||
test("[SPARK-4502] pruning nested schema while named_struct in project") { | ||
val schema = new StructType() | ||
.add("f0", IntegerType) | ||
.add("f1", new StructType() | ||
.add("f10", IntegerType)) | ||
|
||
val expr = GetStructField( | ||
CreateNamedStruct(Seq( | ||
Literal("f10"), | ||
AttributeReference("f0", IntegerType)() | ||
)), | ||
0, | ||
Some("f10") | ||
) | ||
|
||
val expect = new StructType() | ||
.add("f0", IntegerType) | ||
|
||
assert(FileSourceStrategy.generateStructFieldsContainsNesting(expr :: Nil, schema) == expect) | ||
} | ||
|
||
test("spark.files.ignoreCorruptFiles should work in SQL") { | ||
val inputFile = File.createTempFile("input-", ".gz") | ||
try { | ||
|
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually this function can be simplified to:
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The func getFieldRecursively here need the return value which is a StructField contains all nested relation in path. For example:
The fullSchema is:
and when we want to get
col.s1.s1_1
, the func should return:So maybe I can't use the simplified func getNestedField because it returns only the last StructField: