Skip to content

[SPARK-14038][SQL] enable native view by default #11860

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -365,21 +365,10 @@ object SQLConf {
"unmatching partitions can be eliminated earlier.")

val NATIVE_VIEW = booleanConf("spark.sql.nativeView",
defaultValue = Some(false),
doc = "When true, CREATE VIEW will be handled by Spark SQL instead of Hive native commands. " +
"Note that this function is experimental and should ony be used when you are using " +
"non-hive-compatible tables written by Spark SQL. The SQL string used to create " +
"view should be fully qualified, i.e. use `tbl1`.`col1` instead of `*` whenever " +
"possible, or you may get wrong result.",
isPublic = false)

val CANONICAL_NATIVE_VIEW = booleanConf("spark.sql.nativeView.canonical",
defaultValue = Some(true),
doc = "When this option and spark.sql.nativeView are both true, Spark SQL tries to handle " +
"CREATE VIEW statement using SQL query string generated from view definition logical " +
"plan. If the logical plan doesn't have a SQL representation, we fallback to the " +
"original native view implementation.",
isPublic = false)
doc = "When true, CREATE VIEW statement will be handled by Spark SQL instead of Hive native " +
"command, using SQL query string generated from view definition logical plan. Note " +
"that this feature may break in some corner cases, you can disable it to workaround.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the last sentence and instead suggest the user to turn it off when toSQL fails.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any known issue? Disabling it does not really work for data source tables. So, for this kind of cases, setting spark.sql.nativeView to true and spark.sql.nativeView.canonical to false is the only workaround, right?


val COLUMN_NAME_OF_CORRUPT_RECORD = stringConf("spark.sql.columnNameOfCorruptRecord",
defaultValue = Some("_corrupt_record"),
Expand Down Expand Up @@ -578,8 +567,6 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin

def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)

def canonicalView: Boolean = getConf(CANONICAL_NATIVE_VIEW)

def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)

def subexpressionEliminationEnabled: Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

package org.apache.spark.sql.hive.execution

import scala.util.control.NonFatal

import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable}
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
Expand All @@ -31,8 +28,6 @@ import org.apache.spark.sql.hive.{ HiveContext, HiveMetastoreTypes, SQLBuilder}
* Create Hive view on non-hive-compatible tables by specifying schema ourselves instead of
* depending on Hive meta-store.
*/
// TODO: Note that this class can NOT canonicalize the view SQL string entirely, which is different
// from Hive and may not work for some cases like create view on self join.
private[hive] case class CreateViewAsSelect(
tableDesc: CatalogTable,
child: LogicalPlan,
Expand Down Expand Up @@ -73,15 +68,18 @@ private[hive] case class CreateViewAsSelect(
}

private def prepareTable(sqlContext: SQLContext): CatalogTable = {
val expandedText = if (sqlContext.conf.canonicalView) {
try rebuildViewQueryString(sqlContext) catch {
case NonFatal(e) => wrapViewTextWithSelect
}
val logicalPlan = if (tableDesc.schema.isEmpty) {
child
} else {
wrapViewTextWithSelect
val projectList = childSchema.zip(tableDesc.schema).map {
case (attr, col) => Alias(attr, col.name)()
}
sqlContext.executePlan(Project(projectList, child)).analyzed
}

val viewSchema = {
val viewText = new SQLBuilder(logicalPlan, sqlContext).toSQL
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about catching exceptions thrown from toSQL and suggest the user to turn off native view and retry?


val viewSchema: Seq[CatalogColumn] = {
if (tableDesc.schema.isEmpty) {
childSchema.map { a =>
CatalogColumn(a.name, HiveMetastoreTypes.toMetastoreType(a.dataType))
Expand All @@ -97,41 +95,6 @@ private[hive] case class CreateViewAsSelect(
}
}

tableDesc.copy(schema = viewSchema, viewText = Some(expandedText))
tableDesc.copy(schema = viewSchema, viewText = Some(viewText))
}

private def wrapViewTextWithSelect: String = {
// When user specified column names for view, we should create a project to do the renaming.
// When no column name specified, we still need to create a project to declare the columns
// we need, to make us more robust to top level `*`s.
val viewOutput = {
val columnNames = childSchema.map(f => quote(f.name))
if (tableDesc.schema.isEmpty) {
columnNames.mkString(", ")
} else {
columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map {
case (name, alias) => s"$name AS $alias"
}.mkString(", ")
}
}

val viewText = tableDesc.viewText.get
val viewName = quote(tableDesc.name.table)
s"SELECT $viewOutput FROM ($viewText) $viewName"
}

private def rebuildViewQueryString(sqlContext: SQLContext): String = {
val logicalPlan = if (tableDesc.schema.isEmpty) {
child
} else {
val projectList = childSchema.zip(tableDesc.schema).map {
case (attr, col) => Alias(attr, col.name)()
}
sqlContext.executePlan(Project(projectList, child)).analyzed
}
new SQLBuilder(logicalPlan, sqlContext).toSQL
}

// escape backtick with double-backtick in column name and wrap it with backtick.
private def quote(name: String) = s"`${name.replaceAll("`", "``")}`"
}
Original file line number Diff line number Diff line change
Expand Up @@ -1471,83 +1471,80 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}

Seq(true, false).foreach { enabled =>
val prefix = (if (enabled) "With" else "Without") + " canonical native view: "
test(s"$prefix correctly handle CREATE OR REPLACE VIEW") {
withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
withTable("jt", "jt2") {
sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt")
checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
test(s"correctly handle CREATE OR REPLACE VIEW") {
withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true") {
withTable("jt", "jt2") {
sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt")
checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))

val df = (1 until 10).map(i => i -> i).toDF("i", "j")
df.write.format("json").saveAsTable("jt2")
sql("CREATE OR REPLACE VIEW testView AS SELECT * FROM jt2")
// make sure the view has been changed.
checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
val df = (1 until 10).map(i => i -> i).toDF("i", "j")
df.write.format("json").saveAsTable("jt2")
sql("CREATE OR REPLACE VIEW testView AS SELECT * FROM jt2")
// make sure the view has been changed.
checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))

sql("DROP VIEW testView")
sql("DROP VIEW testView")

val e = intercept[AnalysisException] {
sql("CREATE OR REPLACE VIEW IF NOT EXISTS testView AS SELECT id FROM jt")
}
assert(e.message.contains("not allowed to define a view"))
val e = intercept[AnalysisException] {
sql("CREATE OR REPLACE VIEW IF NOT EXISTS testView AS SELECT id FROM jt")
}
assert(e.message.contains("not allowed to define a view"))
}
}
}

test(s"$prefix correctly handle ALTER VIEW") {
withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
withTable("jt", "jt2") {
withView("testView") {
sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
sql("CREATE VIEW testView AS SELECT id FROM jt")

val df = (1 until 10).map(i => i -> i).toDF("i", "j")
df.write.format("json").saveAsTable("jt2")
sql("ALTER VIEW testView AS SELECT * FROM jt2")
// make sure the view has been changed.
checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
}
test(s"correctly handle ALTER VIEW") {
withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true") {
withTable("jt", "jt2") {
withView("testView") {
sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
sql("CREATE VIEW testView AS SELECT id FROM jt")

val df = (1 until 10).map(i => i -> i).toDF("i", "j")
df.write.format("json").saveAsTable("jt2")
sql("ALTER VIEW testView AS SELECT * FROM jt2")
// make sure the view has been changed.
checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i)))
}
}
}
}

test(s"$prefix create hive view for json table") {
// json table is not hive-compatible, make sure the new flag fix it.
withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
withTable("jt") {
withView("testView") {
sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
sql("CREATE VIEW testView AS SELECT id FROM jt")
checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
}
test(s"create hive view for json table") {
// json table is not hive-compatible, make sure our native view can fix it.
withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true") {
withTable("jt") {
withView("testView") {
sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
sql("CREATE VIEW testView AS SELECT id FROM jt")
checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
}
}
}
}

test(s"$prefix create hive view for partitioned parquet table") {
// partitioned parquet table is not hive-compatible, make sure the new flag fix it.
withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
withTable("parTable") {
withView("testView") {
val df = Seq(1 -> "a").toDF("i", "j")
df.write.format("parquet").partitionBy("i").saveAsTable("parTable")
sql("CREATE VIEW testView AS SELECT i, j FROM parTable")
checkAnswer(sql("SELECT * FROM testView"), Row(1, "a"))
}
test(s"create hive view for partitioned parquet table") {
// partitioned parquet table is not hive-compatible, make sure our native view can fix it.
withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true") {
withTable("parTable") {
withView("testView") {
val df = Seq(1 -> "a").toDF("i", "j")
df.write.format("parquet").partitionBy("i").saveAsTable("parTable")
sql("CREATE VIEW testView AS SELECT i, j FROM parTable")
checkAnswer(sql("SELECT * FROM testView"), Row(1, "a"))
}
}
}
}

test("CTE within view") {
withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
SQLConf.NATIVE_VIEW.key -> "true") {
withView("cte_view") {
sql("CREATE VIEW cte_view AS WITH w AS (SELECT 1 AS n) SELECT n FROM w")
checkAnswer(sql("SELECT * FROM cte_view"), Row(1))
Expand All @@ -1557,7 +1554,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {

test("Using view after switching current database") {
withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
SQLConf.NATIVE_VIEW.key -> "true") {
withView("v") {
sql("CREATE VIEW v AS SELECT * FROM src")
withTempDatabase { db =>
Expand All @@ -1576,7 +1573,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {

test("Using view after adding more columns") {
withSQLConf(
SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> "true") {
SQLConf.NATIVE_VIEW.key -> "true") {
withTable("add_col") {
sqlContext.range(10).write.saveAsTable("add_col")
withView("v") {
Expand Down