Skip to content

[SPARK-12855][SQL] Remove parser dialect developer API #10801

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ object MimaExcludes {
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.jdbc"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.jsonFile"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.jsonRDD"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.load")
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.load"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.dialectClassName"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.SQLContext.getSQLDialect")
) ++ Seq(
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.SparkContext.emptyRDD"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.random.RandomSampler

/**
* This class translates a HQL String to a Catalyst [[LogicalPlan]] or [[Expression]].
* This class translates SQL to Catalyst [[LogicalPlan]]s or [[Expression]]s.
*/
private[sql] class CatalystQl(val conf: ParserConf = SimpleParserConf()) extends ParserDialect {
object Token {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,12 @@

package org.apache.spark.sql.catalyst

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

/**
* Root class of SQL Parser Dialect, and we don't guarantee the binary
* compatibility for the future release, let's keep it as the internal
* interface for advanced user.
* Interface for a parser.
*/
@DeveloperApi
trait ParserDialect {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on naming it ParserInterface/Parser

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I will do that in a separate pull request.

/** Creates LogicalPlan for a given SQL string. */
def parsePlan(sqlText: String): LogicalPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ package object errors {
}
}

class DialectException(msg: String, cause: Throwable) extends Exception(msg, cause)

/**
* Wraps any exceptions that are thrown while executing `f` in a
* [[catalyst.errors.TreeNodeException TreeNodeException]], attaching the provided `tree`.
Expand Down
20 changes: 0 additions & 20 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,6 @@ private[spark] object SQLConf {
doc = "When true, common subexpressions will be eliminated.",
isPublic = false)

val DIALECT = stringConf(
"spark.sql.dialect",
defaultValue = Some("sql"),
doc = "The default SQL dialect to use.")

val CASE_SENSITIVE = booleanConf("spark.sql.caseSensitive",
defaultValue = Some(true),
doc = "Whether the query analyzer should be case sensitive or not.")
Expand Down Expand Up @@ -524,21 +519,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with ParserCon
new java.util.HashMap[String, String]())

/** ************************ Spark SQL Params/Hints ******************* */
// TODO: refactor so that these hints accessors don't pollute the name space of SQLContext?

/**
* The SQL dialect that is used when parsing queries. This defaults to 'sql' which uses
* a simple SQL parser provided by Spark SQL. This is currently the only option for users of
* SQLContext.
*
* When using a HiveContext, this value defaults to 'hiveql', which uses the Hive 0.12.0 HiveQL
* parser. Users can change this to 'sql' if they want to run queries that aren't supported by
* HiveQL (e.g., SELECT 1).
*
* Note that the choice of dialect does not affect things like what tables are available or
* how query execution is performed.
*/
private[spark] def dialect: String = getConf(DIALECT)

private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED)

Expand Down
35 changes: 3 additions & 32 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicReference
import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.reflect.runtime.universe.TypeTag
import scala.util.control.NonFatal

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.annotation.{DeveloperApi, Experimental}
Expand All @@ -33,13 +32,11 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.{execution => sparkexecution}
import org.apache.spark.sql.SQLConf.SQLConfEntry
import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, _}
import org.apache.spark.sql.catalyst.{InternalRow, _}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.catalyst.errors.DialectException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserConf
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution._
Expand Down Expand Up @@ -206,30 +203,10 @@ class SQLContext private[sql](
protected[sql] lazy val optimizer: Optimizer = new SparkOptimizer(this)

@transient
protected[sql] val ddlParser = new DDLParser(sqlParser)
protected[sql] val sqlParser: ParserDialect = new SparkSQLParser(new SparkQl(conf))

@transient
protected[sql] val sqlParser = new SparkSQLParser(getSQLDialect())

protected[sql] def getSQLDialect(): ParserDialect = {
try {
val clazz = Utils.classForName(dialectClassName)
clazz.getConstructor(classOf[ParserConf])
.newInstance(conf)
.asInstanceOf[ParserDialect]
} catch {
case NonFatal(e) =>
// Since we didn't find the available SQL Dialect, it will fail even for SET command:
// SET spark.sql.dialect=sql; Let's reset as default dialect automatically.
val dialect = conf.dialect
// reset the sql dialect
conf.unsetConf(SQLConf.DIALECT)
// throw out the exception, and the default sql dialect will take effect for next query.
throw new DialectException(
s"""Instantiating dialect '$dialect' failed.
|Reverting to default dialect '${conf.dialect}'""".stripMargin, e)
}
}
protected[sql] val ddlParser: DDLParser = new DDLParser(sqlParser)

protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false)

Expand All @@ -239,12 +216,6 @@ class SQLContext private[sql](
protected[sql] def executePlan(plan: LogicalPlan) =
new sparkexecution.QueryExecution(this, plan)

protected[sql] def dialectClassName = if (conf.dialect == "sql") {
classOf[SparkQl].getCanonicalName
} else {
conf.dialect
}

/**
* Add a jar to SQLContext
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
case Some((key, None)) =>
val runFunc = (sqlContext: SQLContext) => {
val value =
try {
if (key == SQLConf.DIALECT.key) {
sqlContext.conf.dialect
} else {
sqlContext.getConf(key)
}
} catch {
try sqlContext.getConf(key) catch {
case _: NoSuchElementException => "<undefined>"
}
Seq(Row(key, value))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ class DDLParser(fallback: => ParserDialect)

override def parseExpression(sql: String): Expression = fallback.parseExpression(sql)

override def parseTableIdentifier(sql: String): TableIdentifier =

override def parseTableIdentifier(sql: String): TableIdentifier = {
fallback.parseTableIdentifier(sql)
}

def parse(input: String, exceptionOnError: Boolean): LogicalPlan = {
try {
parsePlan(input)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,7 @@ object functions extends LegacyFunctions {
* @group normal_funcs
*/
def expr(expr: String): Column = {
val parser = SQLContext.getActive().map(_.getSQLDialect()).getOrElse(new CatalystQl())
val parser = SQLContext.getActive().map(_.sqlParser).getOrElse(new CatalystQl())
Column(parser.parseExpression(expr))
}

Expand Down
20 changes: 0 additions & 20 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import java.sql.Timestamp
import org.apache.spark.AccumulatorSuite
import org.apache.spark.sql.catalyst.CatalystQl
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.errors.DialectException
import org.apache.spark.sql.catalyst.parser.ParserConf
import org.apache.spark.sql.execution.{aggregate, SparkQl}
import org.apache.spark.sql.execution.joins.{CartesianProduct, SortMergeJoin}
Expand All @@ -32,8 +31,6 @@ import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext}
import org.apache.spark.sql.test.SQLTestData._
import org.apache.spark.sql.types._

/** A SQL Dialect for testing purpose, and it can not be nested type */
class MyDialect(conf: ParserConf) extends CatalystQl(conf)

class SQLQuerySuite extends QueryTest with SharedSQLContext {
import testImplicits._
Expand Down Expand Up @@ -148,23 +145,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
.count(), Row(24, 1) :: Row(14, 1) :: Nil)
}

test("SQL Dialect Switching to a new SQL parser") {
val newContext = new SQLContext(sparkContext)
newContext.setConf("spark.sql.dialect", classOf[MyDialect].getCanonicalName())
assert(newContext.getSQLDialect().getClass === classOf[MyDialect])
assert(newContext.sql("SELECT 1").collect() === Array(Row(1)))
}

test("SQL Dialect Switch to an invalid parser with alias") {
val newContext = new SQLContext(sparkContext)
newContext.sql("SET spark.sql.dialect=MyTestClass")
intercept[DialectException] {
newContext.sql("SELECT 1")
}
// test if the dialect set back to DefaultSQLDialect
assert(newContext.getSQLDialect().getClass === classOf[SparkQl])
}

test("SPARK-4625 support SORT BY in SimpleSQLParser & DSL") {
checkAnswer(
sql("SELECT a FROM testData2 SORT BY a"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,16 +542,12 @@ class HiveContext private[hive](
}

protected[sql] override lazy val conf: SQLConf = new SQLConf {
override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
}

protected[sql] override def getSQLDialect(): ParserDialect = {
if (conf.dialect == "hiveql") {
new ExtendedHiveQlParser(this)
} else {
super.getSQLDialect()
}
@transient
protected[sql] override val sqlParser: ParserDialect = {
new SparkSQLParser(new ExtendedHiveQlParser(this))
}

@transient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,6 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
new this.QueryExecution(plan)

protected[sql] override lazy val conf: SQLConf = new SQLConf {
// The super.getConf(SQLConf.DIALECT) is "sql" by default, we need to set it as "hiveql"
override def dialect: String = super.getConf(SQLConf.DIALECT, "hiveql")
override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)

clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, FunctionRegistry}
import org.apache.spark.sql.catalyst.errors.DialectException
import org.apache.spark.sql.catalyst.parser.ParserConf
import org.apache.spark.sql.execution.SparkQl
import org.apache.spark.sql.execution.datasources.LogicalRelation
Expand Down Expand Up @@ -57,8 +56,6 @@ case class WindowData(
month: Int,
area: String,
product: Int)
/** A SQL Dialect for testing purpose, and it can not be nested type */
class MyDialect(conf: ParserConf) extends HiveQl(conf)

/**
* A collection of hive query tests where we generate the answers ourselves instead of depending on
Expand Down Expand Up @@ -337,42 +334,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}

test("SQL dialect at the start of HiveContext") {
val hiveContext = new HiveContext(sqlContext.sparkContext)
val dialectConf = "spark.sql.dialect"
checkAnswer(hiveContext.sql(s"set $dialectConf"), Row(dialectConf, "hiveql"))
assert(hiveContext.getSQLDialect().getClass === classOf[ExtendedHiveQlParser])
}

test("SQL Dialect Switching") {
assert(getSQLDialect().getClass === classOf[ExtendedHiveQlParser])
setConf("spark.sql.dialect", classOf[MyDialect].getCanonicalName())
assert(getSQLDialect().getClass === classOf[MyDialect])
assert(sql("SELECT 1").collect() === Array(Row(1)))

// set the dialect back to the DefaultSQLDialect
sql("SET spark.sql.dialect=sql")
assert(getSQLDialect().getClass === classOf[SparkQl])
sql("SET spark.sql.dialect=hiveql")
assert(getSQLDialect().getClass === classOf[ExtendedHiveQlParser])

// set invalid dialect
sql("SET spark.sql.dialect.abc=MyTestClass")
sql("SET spark.sql.dialect=abc")
intercept[Exception] {
sql("SELECT 1")
}
// test if the dialect set back to HiveQLDialect
getSQLDialect().getClass === classOf[ExtendedHiveQlParser]

sql("SET spark.sql.dialect=MyTestClass")
intercept[DialectException] {
sql("SELECT 1")
}
// test if the dialect set back to HiveQLDialect
assert(getSQLDialect().getClass === classOf[ExtendedHiveQlParser])
}

test("CTAS with serde") {
sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect()
sql(
Expand Down