Skip to content

Commit

Permalink
[SPARK-32622][SQL][TEST] Add case-sensitivity test for ORC predicate …
Browse files Browse the repository at this point in the history
…pushdown

### What changes were proposed in this pull request?

During working on SPARK-25557, we found that ORC predicate pushdown doesn't have case-sensitivity test. This PR proposes to add case-sensitivity test for ORC predicate pushdown.

### Why are the changes needed?

Increasing test coverage for ORC predicate pushdown.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Pass Jenkins tests.

Closes apache#29427 from viirya/SPARK-25557-followup3.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit b33066f)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
viirya authored and dongjoon-hyun committed Aug 17, 2020
1 parent ee12374 commit 6cdc32f
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import scala.collection.JavaConverters._

import org.apache.orc.storage.ql.io.sarg.{PredicateLeaf, SearchArgument}

import org.apache.spark.SparkConf
import org.apache.spark.sql.{AnalysisException, Column, DataFrame}
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Row}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
Expand Down Expand Up @@ -469,5 +469,98 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession {
).get.toString
}
}

test("SPARK-32622: case sensitivity in predicate pushdown") {
withTempPath { dir =>
val count = 10
val tableName = "spark_32622"
val tableDir1 = dir.getAbsoluteFile + "/table1"

// Physical ORC files have both `A` and `a` fields.
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
spark.range(count).repartition(count).selectExpr("id - 1 as A", "id as a")
.write.mode("overwrite").orc(tableDir1)
}

// Metastore table has both `A` and `a` fields too.
withTable(tableName) {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
sql(
s"""
|CREATE TABLE $tableName (A LONG, a LONG) USING ORC LOCATION '$tableDir1'
""".stripMargin)

checkAnswer(sql(s"select a, A from $tableName"), (0 until count).map(c => Row(c, c - 1)))

val actual1 = stripSparkFilter(sql(s"select A from $tableName where A < 0"))
assert(actual1.count() == 1)

val actual2 = stripSparkFilter(sql(s"select A from $tableName where a < 0"))
assert(actual2.count() == 0)
}

// Exception thrown for ambiguous case.
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
val e = intercept[AnalysisException] {
sql(s"select a from $tableName where a < 0").collect()
}
assert(e.getMessage.contains(
"Reference 'a' is ambiguous"))
}
}

// Metastore table has only `A` field.
withTable(tableName) {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
sql(
s"""
|CREATE TABLE $tableName (A LONG) USING ORC LOCATION '$tableDir1'
""".stripMargin)

val e = intercept[SparkException] {
sql(s"select A from $tableName where A < 0").collect()
}
assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains(
"""Found duplicate field(s) "A": [A, a] in case-insensitive mode"""))
}
}

// Physical ORC files have only `A` field.
val tableDir2 = dir.getAbsoluteFile + "/table2"
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
spark.range(count).repartition(count).selectExpr("id - 1 as A")
.write.mode("overwrite").orc(tableDir2)
}

withTable(tableName) {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
sql(
s"""
|CREATE TABLE $tableName (a LONG) USING ORC LOCATION '$tableDir2'
""".stripMargin)

checkAnswer(sql(s"select a from $tableName"), (0 until count).map(c => Row(c - 1)))

val actual = stripSparkFilter(sql(s"select a from $tableName where a < 0"))
// TODO: ORC predicate pushdown should work under case-insensitive analysis.
// assert(actual.count() == 1)
}
}

withTable(tableName) {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
sql(
s"""
|CREATE TABLE $tableName (A LONG) USING ORC LOCATION '$tableDir2'
""".stripMargin)

checkAnswer(sql(s"select A from $tableName"), (0 until count).map(c => Row(c - 1)))

val actual = stripSparkFilter(sql(s"select A from $tableName where A < 0"))
assert(actual.count() == 1)
}
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import scala.collection.JavaConverters._

import org.apache.hadoop.hive.ql.io.sarg.{PredicateLeaf, SearchArgument}

import org.apache.spark.SparkConf
import org.apache.spark.sql.{AnalysisException, Column, DataFrame}
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Row}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
Expand Down Expand Up @@ -470,5 +470,98 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession {
).get.toString
}
}

test("SPARK-32622: case sensitivity in predicate pushdown") {
withTempPath { dir =>
val count = 10
val tableName = "spark_32622"
val tableDir1 = dir.getAbsoluteFile + "/table1"

// Physical ORC files have both `A` and `a` fields.
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
spark.range(count).repartition(count).selectExpr("id - 1 as A", "id as a")
.write.mode("overwrite").orc(tableDir1)
}

// Metastore table has both `A` and `a` fields too.
withTable(tableName) {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
sql(
s"""
|CREATE TABLE $tableName (A LONG, a LONG) USING ORC LOCATION '$tableDir1'
""".stripMargin)

checkAnswer(sql(s"select a, A from $tableName"), (0 until count).map(c => Row(c, c - 1)))

val actual1 = stripSparkFilter(sql(s"select A from $tableName where A < 0"))
assert(actual1.count() == 1)

val actual2 = stripSparkFilter(sql(s"select A from $tableName where a < 0"))
assert(actual2.count() == 0)
}

// Exception thrown for ambiguous case.
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
val e = intercept[AnalysisException] {
sql(s"select a from $tableName where a < 0").collect()
}
assert(e.getMessage.contains(
"Reference 'a' is ambiguous"))
}
}

// Metastore table has only `A` field.
withTable(tableName) {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
sql(
s"""
|CREATE TABLE $tableName (A LONG) USING ORC LOCATION '$tableDir1'
""".stripMargin)

val e = intercept[SparkException] {
sql(s"select A from $tableName where A < 0").collect()
}
assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains(
"""Found duplicate field(s) "A": [A, a] in case-insensitive mode"""))
}
}

// Physical ORC files have only `A` field.
val tableDir2 = dir.getAbsoluteFile + "/table2"
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
spark.range(count).repartition(count).selectExpr("id - 1 as A")
.write.mode("overwrite").orc(tableDir2)
}

withTable(tableName) {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
sql(
s"""
|CREATE TABLE $tableName (a LONG) USING ORC LOCATION '$tableDir2'
""".stripMargin)

checkAnswer(sql(s"select a from $tableName"), (0 until count).map(c => Row(c - 1)))

val actual = stripSparkFilter(sql(s"select a from $tableName where a < 0"))
// TODO: ORC predicate pushdown should work under case-insensitive analysis.
// assert(actual.count() == 1)
}
}

withTable(tableName) {
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
sql(
s"""
|CREATE TABLE $tableName (A LONG) USING ORC LOCATION '$tableDir2'
""".stripMargin)

checkAnswer(sql(s"select A from $tableName"), (0 until count).map(c => Row(c - 1)))

val actual = stripSparkFilter(sql(s"select A from $tableName where A < 0"))
assert(actual.count() == 1)
}
}
}
}
}

0 comments on commit 6cdc32f

Please sign in to comment.