Skip to content

Commit beeafcf

Browse files
committed
Revert "[SPARK-5213] [SQL] Pluggable SQL Parser Support"
This reverts commit 3ba5aaa.
1 parent 473552f commit beeafcf

File tree

9 files changed

+42
-199
lines changed

9 files changed

+42
-199
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ import scala.util.parsing.input.CharArrayReader.EofCh
2525

2626
import org.apache.spark.sql.catalyst.plans.logical._
2727

28+
private[sql] object KeywordNormalizer {
29+
def apply(str: String): String = str.toLowerCase()
30+
}
31+
2832
private[sql] abstract class AbstractSparkSQLParser
2933
extends StandardTokenParsers with PackratParsers {
3034

@@ -38,7 +42,7 @@ private[sql] abstract class AbstractSparkSQLParser
3842
}
3943

4044
protected case class Keyword(str: String) {
41-
def normalize: String = lexical.normalizeKeyword(str)
45+
def normalize: String = KeywordNormalizer(str)
4246
def parser: Parser[String] = normalize
4347
}
4448

@@ -86,16 +90,13 @@ class SqlLexical extends StdLexical {
8690
reserved ++= keywords
8791
}
8892

89-
/* Normal the keyword string */
90-
def normalizeKeyword(str: String): String = str.toLowerCase
91-
9293
delimiters += (
9394
"@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
9495
",", ";", "%", "{", "}", ":", "[", "]", ".", "&", "|", "^", "~", "<=>"
9596
)
9697

9798
protected override def processIdent(name: String) = {
98-
val token = normalizeKeyword(name)
99+
val token = KeywordNormalizer(name)
99100
if (reserved contains token) Keyword(token) else Identifier(name)
100101
}
101102

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/Dialect.scala

Lines changed: 0 additions & 33 deletions
This file was deleted.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@ package object errors {
3838
}
3939
}
4040

41-
class DialectException(msg: String, cause: Throwable) extends Exception(msg, cause)
42-
4341
/**
4442
* Wraps any exceptions that are thrown while executing `f` in a
4543
* [[catalyst.errors.TreeNodeException TreeNodeException]], attaching the provided `tree`.

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 14 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import scala.collection.JavaConversions._
2424
import scala.collection.immutable
2525
import scala.language.implicitConversions
2626
import scala.reflect.runtime.universe.TypeTag
27-
import scala.util.control.NonFatal
2827

2928
import com.google.common.reflect.TypeToken
3029

@@ -33,11 +32,9 @@ import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
3332
import org.apache.spark.rdd.RDD
3433
import org.apache.spark.sql.catalyst.analysis._
3534
import org.apache.spark.sql.catalyst.expressions._
36-
import org.apache.spark.sql.catalyst.errors.DialectException
3735
import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
3836
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
3937
import org.apache.spark.sql.catalyst.rules.RuleExecutor
40-
import org.apache.spark.sql.catalyst.Dialect
4138
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, expressions}
4239
import org.apache.spark.sql.execution.{Filter, _}
4340
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
@@ -47,45 +44,6 @@ import org.apache.spark.sql.types._
4744
import org.apache.spark.util.Utils
4845
import org.apache.spark.{Partition, SparkContext}
4946

50-
/**
51-
* Currently we support the default dialect named "sql", associated with the class
52-
* [[DefaultDialect]]
53-
*
54-
* And we can also provide custom SQL Dialect, for example in Spark SQL CLI:
55-
* {{{
56-
*-- switch to "hiveql" dialect
57-
* spark-sql>SET spark.sql.dialect=hiveql;
58-
* spark-sql>SELECT * FROM src LIMIT 1;
59-
*
60-
*-- switch to "sql" dialect
61-
* spark-sql>SET spark.sql.dialect=sql;
62-
* spark-sql>SELECT * FROM src LIMIT 1;
63-
*
64-
*-- register the new SQL dialect
65-
* spark-sql> SET spark.sql.dialect=com.xxx.xxx.SQL99Dialect;
66-
* spark-sql> SELECT * FROM src LIMIT 1;
67-
*
68-
*-- register the non-exist SQL dialect
69-
* spark-sql> SET spark.sql.dialect=NotExistedClass;
70-
* spark-sql> SELECT * FROM src LIMIT 1;
71-
*
72-
*-- Exception will be thrown and switch to dialect
73-
*-- "sql" (for SQLContext) or
74-
*-- "hiveql" (for HiveContext)
75-
* }}}
76-
*/
77-
private[spark] class DefaultDialect extends Dialect {
78-
@transient
79-
protected val sqlParser = {
80-
val catalystSqlParser = new catalyst.SqlParser
81-
new SparkSQLParser(catalystSqlParser.parse)
82-
}
83-
84-
override def parse(sqlText: String): LogicalPlan = {
85-
sqlParser.parse(sqlText)
86-
}
87-
}
88-
8947
/**
9048
* The entry point for working with structured data (rows and columns) in Spark. Allows the
9149
* creation of [[DataFrame]] objects as well as the execution of SQL queries.
@@ -174,27 +132,17 @@ class SQLContext(@transient val sparkContext: SparkContext)
174132
protected[sql] lazy val optimizer: Optimizer = DefaultOptimizer
175133

176134
@transient
177-
protected[sql] val ddlParser = new DDLParser((sql: String) => { getSQLDialect().parse(sql) })
178-
179-
protected[sql] def getSQLDialect(): Dialect = {
180-
try {
181-
val clazz = Utils.classForName(dialectClassName)
182-
clazz.newInstance().asInstanceOf[Dialect]
183-
} catch {
184-
case NonFatal(e) =>
185-
// Since we didn't find the available SQL Dialect, it will fail even for SET command:
186-
// SET spark.sql.dialect=sql; Let's reset as default dialect automatically.
187-
val dialect = conf.dialect
188-
// reset the sql dialect
189-
conf.unsetConf(SQLConf.DIALECT)
190-
// throw out the exception, and the default sql dialect will take effect for next query.
191-
throw new DialectException(
192-
s"""Instantiating dialect '$dialect' failed.
193-
|Reverting to default dialect '${conf.dialect}'""".stripMargin, e)
194-
}
135+
protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_))
136+
137+
@transient
138+
protected[sql] val sqlParser = {
139+
val fallback = new catalyst.SqlParser
140+
new SparkSQLParser(fallback.parse(_))
195141
}
196142

197-
protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false)
143+
protected[sql] def parseSql(sql: String): LogicalPlan = {
144+
ddlParser.parse(sql, false).getOrElse(sqlParser.parse(sql))
145+
}
198146

199147
protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql))
200148

@@ -208,12 +156,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
208156
@transient
209157
protected[sql] val defaultSession = createSession()
210158

211-
protected[sql] def dialectClassName = if (conf.dialect == "sql") {
212-
classOf[DefaultDialect].getCanonicalName
213-
} else {
214-
conf.dialect
215-
}
216-
217159
sparkContext.getConf.getAll.foreach {
218160
case (key, value) if key.startsWith("spark.sql") => setConf(key, value)
219161
case _ =>
@@ -1003,7 +945,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
1003945
* @group basic
1004946
*/
1005947
def sql(sqlText: String): DataFrame = {
1006-
DataFrame(this, parseSql(sqlText))
948+
if (conf.dialect == "sql") {
949+
DataFrame(this, parseSql(sqlText))
950+
} else {
951+
sys.error(s"Unsupported SQL dialect: ${conf.dialect}")
952+
}
1007953
}
1008954

1009955
/**

sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,12 @@ private[sql] class DDLParser(
3838
parseQuery: String => LogicalPlan)
3939
extends AbstractSparkSQLParser with DataTypeParser with Logging {
4040

41-
def parse(input: String, exceptionOnError: Boolean): LogicalPlan = {
41+
def parse(input: String, exceptionOnError: Boolean): Option[LogicalPlan] = {
4242
try {
43-
parse(input)
43+
Some(parse(input))
4444
} catch {
4545
case ddlException: DDLException => throw ddlException
46-
case _ if !exceptionOnError => parseQuery(input)
46+
case _ if !exceptionOnError => None
4747
case x: Throwable => throw x
4848
}
4949
}

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,13 @@ package org.apache.spark.sql
1919

2020
import org.scalatest.BeforeAndAfterAll
2121

22-
import org.apache.spark.sql.catalyst.errors.DialectException
2322
import org.apache.spark.sql.execution.GeneratedAggregate
2423
import org.apache.spark.sql.functions._
2524
import org.apache.spark.sql.TestData._
2625
import org.apache.spark.sql.test.TestSQLContext
2726
import org.apache.spark.sql.test.TestSQLContext.{udf => _, _}
28-
2927
import org.apache.spark.sql.types._
3028

31-
/** A SQL Dialect for testing purpose, and it can not be nested type */
32-
class MyDialect extends DefaultDialect
33-
3429
class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
3530
// Make sure the tables are loaded.
3631
TestData
@@ -79,23 +74,6 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
7974
Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)
8075
}
8176

82-
test("SQL Dialect Switching to a new SQL parser") {
83-
val newContext = new SQLContext(TestSQLContext.sparkContext)
84-
newContext.setConf("spark.sql.dialect", classOf[MyDialect].getCanonicalName())
85-
assert(newContext.getSQLDialect().getClass === classOf[MyDialect])
86-
assert(newContext.sql("SELECT 1").collect() === Array(Row(1)))
87-
}
88-
89-
test("SQL Dialect Switch to an invalid parser with alias") {
90-
val newContext = new SQLContext(TestSQLContext.sparkContext)
91-
newContext.sql("SET spark.sql.dialect=MyTestClass")
92-
intercept[DialectException] {
93-
newContext.sql("SELECT 1")
94-
}
95-
// test if the dialect set back to DefaultSQLDialect
96-
assert(newContext.getSQLDialect().getClass === classOf[DefaultDialect])
97-
}
98-
9977
test("SPARK-4625 support SORT BY in SimpleSQLParser & DSL") {
10078
checkAnswer(
10179
sql("SELECT a FROM testData2 SORT BY a"),

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,6 @@ package org.apache.spark.sql.hive
2020
import java.io.{BufferedReader, InputStreamReader, PrintStream}
2121
import java.sql.Timestamp
2222

23-
import org.apache.hadoop.hive.ql.parse.VariableSubstitution
24-
import org.apache.spark.sql.catalyst.Dialect
25-
2623
import scala.collection.JavaConversions._
2724
import scala.language.implicitConversions
2825

@@ -45,15 +42,6 @@ import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNative
4542
import org.apache.spark.sql.sources.{DDLParser, DataSourceStrategy}
4643
import org.apache.spark.sql.types._
4744

48-
/**
49-
* This is the HiveQL Dialect, this dialect is strongly bind with HiveContext
50-
*/
51-
private[hive] class HiveQLDialect extends Dialect {
52-
override def parse(sqlText: String): LogicalPlan = {
53-
HiveQl.parseSql(sqlText)
54-
}
55-
}
56-
5745
/**
5846
* An instance of the Spark SQL execution engine that integrates with data stored in Hive.
5947
* Configuration for Hive is read from hive-site.xml on the classpath.
@@ -93,16 +81,25 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
9381
protected[sql] def convertCTAS: Boolean =
9482
getConf("spark.sql.hive.convertCTAS", "false").toBoolean
9583

96-
@transient
97-
protected[sql] lazy val substitutor = new VariableSubstitution()
98-
99-
protected[sql] override def parseSql(sql: String): LogicalPlan = {
100-
super.parseSql(substitutor.substitute(hiveconf, sql))
101-
}
102-
10384
override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
10485
new this.QueryExecution(plan)
10586

87+
@transient
88+
protected[sql] val ddlParserWithHiveQL = new DDLParser(HiveQl.parseSql(_))
89+
90+
override def sql(sqlText: String): DataFrame = {
91+
val substituted = new VariableSubstitution().substitute(hiveconf, sqlText)
92+
// TODO: Create a framework for registering parsers instead of just hardcoding if statements.
93+
if (conf.dialect == "sql") {
94+
super.sql(substituted)
95+
} else if (conf.dialect == "hiveql") {
96+
val ddlPlan = ddlParserWithHiveQL.parse(sqlText, exceptionOnError = false)
97+
DataFrame(this, ddlPlan.getOrElse(HiveQl.parseSql(substituted)))
98+
} else {
99+
sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'")
100+
}
101+
}
102+
106103
/**
107104
* Invalidate and refresh all the cached the metadata of the given table. For performance reasons,
108105
* Spark SQL or the external data source library it uses might cache certain metadata about a
@@ -359,12 +356,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
359356
}
360357
}
361358

362-
override protected[sql] def dialectClassName = if (conf.dialect == "hiveql") {
363-
classOf[HiveQLDialect].getCanonicalName
364-
} else {
365-
super.dialectClassName
366-
}
367-
368359
@transient
369360
private val hivePlanner = new SparkPlanner with HiveStrategies {
370361
val hiveContext = self

sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
107107
/** Fewer partitions to speed up testing. */
108108
protected[sql] override lazy val conf: SQLConf = new SQLConf {
109109
override def numShufflePartitions: Int = getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
110-
111-
// TODO as in unit test, conf.clear() probably be called, all of the value will be cleared.
112-
// The super.getConf(SQLConf.DIALECT) is "sql" by default, we need to set it as "hiveql"
113-
override def dialect: String = super.getConf(SQLConf.DIALECT, "hiveql")
110+
override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
114111
}
115112
}
116113

0 commit comments

Comments
 (0)