Skip to content

Commit b33066f

Browse files
viiryadongjoon-hyun
authored andcommitted
[SPARK-32622][SQL][TEST] Add case-sensitivity test for ORC predicate pushdown
### What changes were proposed in this pull request? During working on SPARK-25557, we found that ORC predicate pushdown doesn't have case-sensitivity test. This PR proposes to add case-sensitivity test for ORC predicate pushdown. ### Why are the changes needed? Increasing test coverage for ORC predicate pushdown. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass Jenkins tests. Closes #29427 from viirya/SPARK-25557-followup3. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 86852c5 commit b33066f

File tree

2 files changed

+190
-4
lines changed

2 files changed

+190
-4
lines changed

sql/core/v1.2/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala

Lines changed: 95 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ import scala.collection.JavaConverters._
2525

2626
import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}
2727

28-
import org.apache.spark.SparkConf
29-
import org.apache.spark.sql.{AnalysisException, Column, DataFrame}
28+
import org.apache.spark.{SparkConf, SparkException}
29+
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Row}
3030
import org.apache.spark.sql.catalyst.dsl.expressions._
3131
import org.apache.spark.sql.catalyst.expressions._
3232
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -513,5 +513,98 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession {
513513
).get.toString
514514
}
515515
}
516+
517+
test("SPARK-32622: case sensitivity in predicate pushdown") {
518+
withTempPath { dir =>
519+
val count = 10
520+
val tableName = "spark_32622"
521+
val tableDir1 = dir.getAbsoluteFile + "/table1"
522+
523+
// Physical ORC files have both `A` and `a` fields.
524+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
525+
spark.range(count).repartition(count).selectExpr("id - 1 as A", "id as a")
526+
.write.mode("overwrite").orc(tableDir1)
527+
}
528+
529+
// Metastore table has both `A` and `a` fields too.
530+
withTable(tableName) {
531+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
532+
sql(
533+
s"""
534+
|CREATE TABLE $tableName (A LONG, a LONG) USING ORC LOCATION '$tableDir1'
535+
""".stripMargin)
536+
537+
checkAnswer(sql(s"select a, A from $tableName"), (0 until count).map(c => Row(c, c - 1)))
538+
539+
val actual1 = stripSparkFilter(sql(s"select A from $tableName where A < 0"))
540+
assert(actual1.count() == 1)
541+
542+
val actual2 = stripSparkFilter(sql(s"select A from $tableName where a < 0"))
543+
assert(actual2.count() == 0)
544+
}
545+
546+
// Exception thrown for ambiguous case.
547+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
548+
val e = intercept[AnalysisException] {
549+
sql(s"select a from $tableName where a < 0").collect()
550+
}
551+
assert(e.getMessage.contains(
552+
"Reference 'a' is ambiguous"))
553+
}
554+
}
555+
556+
// Metastore table has only `A` field.
557+
withTable(tableName) {
558+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
559+
sql(
560+
s"""
561+
|CREATE TABLE $tableName (A LONG) USING ORC LOCATION '$tableDir1'
562+
""".stripMargin)
563+
564+
val e = intercept[SparkException] {
565+
sql(s"select A from $tableName where A < 0").collect()
566+
}
567+
assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains(
568+
"""Found duplicate field(s) "A": [A, a] in case-insensitive mode"""))
569+
}
570+
}
571+
572+
// Physical ORC files have only `A` field.
573+
val tableDir2 = dir.getAbsoluteFile + "/table2"
574+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
575+
spark.range(count).repartition(count).selectExpr("id - 1 as A")
576+
.write.mode("overwrite").orc(tableDir2)
577+
}
578+
579+
withTable(tableName) {
580+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
581+
sql(
582+
s"""
583+
|CREATE TABLE $tableName (a LONG) USING ORC LOCATION '$tableDir2'
584+
""".stripMargin)
585+
586+
checkAnswer(sql(s"select a from $tableName"), (0 until count).map(c => Row(c - 1)))
587+
588+
val actual = stripSparkFilter(sql(s"select a from $tableName where a < 0"))
589+
// TODO: ORC predicate pushdown should work under case-insensitive analysis.
590+
// assert(actual.count() == 1)
591+
}
592+
}
593+
594+
withTable(tableName) {
595+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
596+
sql(
597+
s"""
598+
|CREATE TABLE $tableName (A LONG) USING ORC LOCATION '$tableDir2'
599+
""".stripMargin)
600+
601+
checkAnswer(sql(s"select A from $tableName"), (0 until count).map(c => Row(c - 1)))
602+
603+
val actual = stripSparkFilter(sql(s"select A from $tableName where A < 0"))
604+
assert(actual.count() == 1)
605+
}
606+
}
607+
}
608+
}
516609
}
517610

sql/core/v2.3/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala

Lines changed: 95 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ import scala.collection.JavaConverters._
2525

2626
import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument}
2727

28-
import org.apache.spark.SparkConf
29-
import org.apache.spark.sql.{AnalysisException, Column, DataFrame}
28+
import org.apache.spark.{SparkConf, SparkException}
29+
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Row}
3030
import org.apache.spark.sql.catalyst.dsl.expressions._
3131
import org.apache.spark.sql.catalyst.expressions._
3232
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -514,5 +514,98 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession {
514514
).get.toString
515515
}
516516
}
517+
518+
test("SPARK-32622: case sensitivity in predicate pushdown") {
519+
withTempPath { dir =>
520+
val count = 10
521+
val tableName = "spark_32622"
522+
val tableDir1 = dir.getAbsoluteFile + "/table1"
523+
524+
// Physical ORC files have both `A` and `a` fields.
525+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
526+
spark.range(count).repartition(count).selectExpr("id - 1 as A", "id as a")
527+
.write.mode("overwrite").orc(tableDir1)
528+
}
529+
530+
// Metastore table has both `A` and `a` fields too.
531+
withTable(tableName) {
532+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
533+
sql(
534+
s"""
535+
|CREATE TABLE $tableName (A LONG, a LONG) USING ORC LOCATION '$tableDir1'
536+
""".stripMargin)
537+
538+
checkAnswer(sql(s"select a, A from $tableName"), (0 until count).map(c => Row(c, c - 1)))
539+
540+
val actual1 = stripSparkFilter(sql(s"select A from $tableName where A < 0"))
541+
assert(actual1.count() == 1)
542+
543+
val actual2 = stripSparkFilter(sql(s"select A from $tableName where a < 0"))
544+
assert(actual2.count() == 0)
545+
}
546+
547+
// Exception thrown for ambiguous case.
548+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
549+
val e = intercept[AnalysisException] {
550+
sql(s"select a from $tableName where a < 0").collect()
551+
}
552+
assert(e.getMessage.contains(
553+
"Reference 'a' is ambiguous"))
554+
}
555+
}
556+
557+
// Metastore table has only `A` field.
558+
withTable(tableName) {
559+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
560+
sql(
561+
s"""
562+
|CREATE TABLE $tableName (A LONG) USING ORC LOCATION '$tableDir1'
563+
""".stripMargin)
564+
565+
val e = intercept[SparkException] {
566+
sql(s"select A from $tableName where A < 0").collect()
567+
}
568+
assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains(
569+
"""Found duplicate field(s) "A": [A, a] in case-insensitive mode"""))
570+
}
571+
}
572+
573+
// Physical ORC files have only `A` field.
574+
val tableDir2 = dir.getAbsoluteFile + "/table2"
575+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
576+
spark.range(count).repartition(count).selectExpr("id - 1 as A")
577+
.write.mode("overwrite").orc(tableDir2)
578+
}
579+
580+
withTable(tableName) {
581+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
582+
sql(
583+
s"""
584+
|CREATE TABLE $tableName (a LONG) USING ORC LOCATION '$tableDir2'
585+
""".stripMargin)
586+
587+
checkAnswer(sql(s"select a from $tableName"), (0 until count).map(c => Row(c - 1)))
588+
589+
val actual = stripSparkFilter(sql(s"select a from $tableName where a < 0"))
590+
// TODO: ORC predicate pushdown should work under case-insensitive analysis.
591+
// assert(actual.count() == 1)
592+
}
593+
}
594+
595+
withTable(tableName) {
596+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
597+
sql(
598+
s"""
599+
|CREATE TABLE $tableName (A LONG) USING ORC LOCATION '$tableDir2'
600+
""".stripMargin)
601+
602+
checkAnswer(sql(s"select A from $tableName"), (0 until count).map(c => Row(c - 1)))
603+
604+
val actual = stripSparkFilter(sql(s"select A from $tableName where A < 0"))
605+
assert(actual.count() == 1)
606+
}
607+
}
608+
}
609+
}
517610
}
518611

0 commit comments

Comments
 (0)