Skip to content

[SPARK-27322][SQL] DataSourceV2 table relation #24741

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ queryTerm
queryPrimary
: querySpecification #queryPrimaryDefault
| fromStatement #fromStmt
| TABLE tableIdentifier #table
| TABLE multipartIdentifier #table
| inlineTable #inlineTableDefault1
| '(' queryNoWith ')' #subquery
;
Expand Down Expand Up @@ -579,7 +579,7 @@ identifierComment
;

relationPrimary
: tableIdentifier sample? tableAlias #tableName
: multipartIdentifier sample? tableAlias #tableName
| '(' queryNoWith ')' sample? tableAlias #aliasedQuery
| '(' relation ')' sample? tableAlias #aliasedRelation
| inlineTable #inlineTableDefault2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ import java.util.Collections

import scala.collection.JavaConverters._

import org.apache.spark.sql.catalog.v2.TableChange
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableChange}
import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, UpdateColumnType}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.sources.v2.Table
import org.apache.spark.sql.types.{StructField, StructType}

object CatalogV2Util {
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._

/**
* Apply properties changes to a map and return the result.
*/
Expand Down Expand Up @@ -149,4 +153,11 @@ object CatalogV2Util {

new StructType(newFields)
}

def loadTable(catalog: CatalogPlugin, ident: Identifier): Option[Table] =
try {
Option(catalog.asTableCatalog.loadTable(ident))
} catch {
case _: NoSuchTableException => None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we return None for NoSuchTableException only and propagate exceptions for all catalog errors like CatalogNotFoundException from loadTable and AnalysisException from asTableCatalog?

Copy link
Member Author

@jzhuge jzhuge Jun 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

BTW, I don't think TableCatalog.loadTable throws CatalogNotFoundException because catalog plugin has already been found.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
Expand Down Expand Up @@ -178,4 +179,21 @@ public double getDouble(String key, double defaultValue) {
public Map<String, String> asCaseSensitiveMap() {
return Collections.unmodifiableMap(original);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CaseInsensitiveStringMap that = (CaseInsensitiveStringMap) o;
return delegate.equals(that.delegate);
}

@Override
public int hashCode() {
return Objects.hash(delegate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.catalyst.util.toPrettySQL
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -162,6 +163,7 @@ class Analyzer(
new SubstituteUnresolvedOrdinals(conf)),
Batch("Resolution", fixedPoint,
ResolveTableValuedFunctions ::
ResolveTables ::
ResolveRelations ::
ResolveReferences ::
ResolveCreateNamedStruct ::
Expand Down Expand Up @@ -226,7 +228,7 @@ class Analyzer(

def substituteCTE(plan: LogicalPlan, cteName: String, ctePlan: LogicalPlan): LogicalPlan = {
plan resolveOperatorsUp {
case UnresolvedRelation(TableIdentifier(table, None)) if resolver(cteName, table) =>
case UnresolvedRelation(Seq(table)) if resolver(cteName, table) =>
ctePlan
case u: UnresolvedRelation =>
u
Expand Down Expand Up @@ -657,6 +659,20 @@ class Analyzer(
}
}

/**
* Resolve table relations with concrete relations from v2 catalog.
*
* [[ResolveRelations]] still resolves v1 tables.
*/
object ResolveTables extends Rule[LogicalPlan] {
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jun 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use ResolveV2Relations instead in order to avoid those confusion?

-   * Resolve table relations with concrete relations from v2 catalog.
-   *
-   * [[ResolveRelations]] still resolves v1 tables.
+   * Replaces [[UnresolvedRelation]]s with concrete relations from the v2 catalog.
    */
-  object ResolveTables extends Rule[LogicalPlan] {
+  object ResolveV2Relations extends Rule[LogicalPlan] {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please ignore the above comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Name it ResolveTables because there may be a new rule ResolveViews down the road, which will be part of ViewCatalog effort. More details to come.

import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util._

def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case u @ UnresolvedRelation(CatalogObjectIdentifier(Some(catalogPlugin), ident)) =>
loadTable(catalogPlugin, ident).map(DataSourceV2Relation.create).getOrElse(u)
}
}

/**
* Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
*/
Expand Down Expand Up @@ -689,10 +705,15 @@ class Analyzer(
// Note this is compatible with the views defined by older versions of Spark(before 2.2), which
// have empty defaultDatabase and all the relations in viewText have database part defined.
def resolveRelation(plan: LogicalPlan): LogicalPlan = plan match {
case u: UnresolvedRelation if !isRunningDirectlyOnFiles(u.tableIdentifier) =>
case u @ UnresolvedRelation(AsTableIdentifier(ident)) if !isRunningDirectlyOnFiles(ident) =>
val defaultDatabase = AnalysisContext.get.defaultDatabase
val foundRelation = lookupTableFromCatalog(u, defaultDatabase)
resolveRelation(foundRelation)
val foundRelation = lookupTableFromCatalog(ident, u, defaultDatabase)
if (foundRelation != u) {
resolveRelation(foundRelation)
} else {
u
}

// The view's child should be a logical plan parsed from the `desc.viewText`, the variable
// `viewText` should be defined, or else we throw an error on the generation of the View
// operator.
Expand All @@ -715,8 +736,9 @@ class Analyzer(
}

def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
case i @ InsertIntoTable(u @ UnresolvedRelation(AsTableIdentifier(ident)), _, child, _, _)
if child.resolved =>
EliminateSubqueryAliases(lookupTableFromCatalog(ident, u)) match {
case v: View =>
u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
case other => i.copy(table = other)
Expand All @@ -731,20 +753,16 @@ class Analyzer(
// and the default database is only used to look up a view);
// 3. Use the currentDb of the SessionCatalog.
private def lookupTableFromCatalog(
tableIdentifier: TableIdentifier,
u: UnresolvedRelation,
defaultDatabase: Option[String] = None): LogicalPlan = {
val tableIdentWithDb = u.tableIdentifier.copy(
database = u.tableIdentifier.database.orElse(defaultDatabase))
val tableIdentWithDb = tableIdentifier.copy(
database = tableIdentifier.database.orElse(defaultDatabase))
try {
catalog.lookupRelation(tableIdentWithDb)
} catch {
case e: NoSuchTableException =>
u.failAnalysis(s"Table or view not found: ${tableIdentWithDb.unquotedString}", e)
// If the database is defined and that database is not found, throw an AnalysisException.
// Note that if the database is not defined, it is possible we are looking up a temp view.
case e: NoSuchDatabaseException =>
u.failAnalysis(s"Table or view not found: ${tableIdentWithDb.unquotedString}, the " +
s"database ${e.db} doesn't exist.", e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original error messages are still helpful. Let us keep it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately not possible since Analysis exception is thrown by checkAnalysis now:

      case u: UnresolvedRelation =>
        u.failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile, we plan to update checkAnalysis to produce more friendly error messages, but not until #24560 is merged. Without that, we can't check whether the namespace exists to produce the right error message.

I should also note that checkAnalysis is the right place for the exception to be thrown. Individual rules should not fail analysis. In this case, a different rule for looking up tables in v2 catalogs is used. And later, an UnresolvedRelation could be resolved by an independent ResolveViews rule. Allowing these rules to be separate makes them smaller and doesn't mix view handling and table handling, as we see in this current rule.

case _: NoSuchTableException | _: NoSuchDatabaseException =>
u
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add some comments to explain why we need to delay the exception here. To me it's because we still have a chance to resolve the table relation with v2 rules.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ trait CheckAnalysis extends PredicateHelper {
case p if p.analyzed => // Skip already analyzed sub-plans

case u: UnresolvedRelation =>
u.failAnalysis(s"Table or view not found: ${u.tableIdentifier}")
u.failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}")

case operator: LogicalPlan =>
// Check argument data types of higher-order functions downwards first.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,20 @@ object ResolveHints {

val newNode = CurrentOrigin.withOrigin(plan.origin) {
plan match {
case ResolvedHint(u: UnresolvedRelation, hint)
if relations.exists(resolver(_, u.tableIdentifier.table)) =>
relations.remove(u.tableIdentifier.table)
case ResolvedHint(u @ UnresolvedRelation(ident), hint)
if relations.exists(resolver(_, ident.last)) =>
relations.remove(ident.last)
ResolvedHint(u, createHintInfo(hintName).merge(hint, handleOverriddenHintInfo))

case ResolvedHint(r: SubqueryAlias, hint)
if relations.exists(resolver(_, r.alias)) =>
relations.remove(r.alias)
ResolvedHint(r, createHintInfo(hintName).merge(hint, handleOverriddenHintInfo))

case u: UnresolvedRelation if relations.exists(resolver(_, u.tableIdentifier.table)) =>
relations.remove(u.tableIdentifier.table)
case u @ UnresolvedRelation(ident) if relations.exists(resolver(_, ident.last)) =>
relations.remove(ident.last)
ResolvedHint(plan, createHintInfo(hintName))

case r: SubqueryAlias if relations.exists(resolver(_, r.alias)) =>
relations.remove(r.alias)
ResolvedHint(plan, createHintInfo(hintName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,24 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
/**
* Holds the name of a relation that has yet to be looked up in a catalog.
*
* @param tableIdentifier table name
* @param multipartIdentifier table name
*/
case class UnresolvedRelation(tableIdentifier: TableIdentifier)
extends LeafNode {
case class UnresolvedRelation(multipartIdentifier: Seq[String]) extends LeafNode {
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._

/** Returns a `.` separated name for this relation. */
def tableName: String = tableIdentifier.unquotedString
def tableName: String = multipartIdentifier.quoted

override def output: Seq[Attribute] = Nil

override lazy val resolved = false
}

object UnresolvedRelation {
def apply(tableIdentifier: TableIdentifier): UnresolvedRelation =
UnresolvedRelation(tableIdentifier.database.toSeq :+ tableIdentifier.table)
}

/**
* An inline table that has not been resolved yet. Once resolved, it is turned by the analyzer into
* a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,7 @@ package object dsl {
object expressions extends ExpressionConversions // scalastyle:ignore

object plans { // scalastyle:ignore
def table(ref: String): LogicalPlan = UnresolvedRelation(TableIdentifier(ref))

def table(db: String, ref: String): LogicalPlan =
UnresolvedRelation(TableIdentifier(ref, Option(db)))
def table(parts: String*): LogicalPlan = UnresolvedRelation(parts)

implicit class DslLogicalPlan(val logicalPlan: LogicalPlan) {
def select(exprs: Expression*): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,14 +898,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
* }}}
*/
override def visitTable(ctx: TableContext): LogicalPlan = withOrigin(ctx) {
UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier))
UnresolvedRelation(visitMultipartIdentifier(ctx.multipartIdentifier))
}

/**
* Create an aliased table reference. This is typically used in FROM clauses.
*/
override def visitTableName(ctx: TableNameContext): LogicalPlan = withOrigin(ctx) {
val tableId = visitTableIdentifier(ctx.tableIdentifier)
val tableId = visitMultipartIdentifier(ctx.multipartIdentifier)
val table = mayApplyAliasPlan(ctx.tableAlias, UnresolvedRelation(tableId))
table.optionalMap(ctx.sample)(withSample)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ object DataSourceV2Relation {
DataSourceV2Relation(table, output, options)
}

def create(table: Table): DataSourceV2Relation = create(table, CaseInsensitiveStringMap.empty)

/**
* This is used to transform data source v2 statistics to logical.Statistics.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ class PlanParserSuite extends AnalysisTest {
private def intercept(sqlCommand: String, messages: String*): Unit =
interceptParseException(parsePlan)(sqlCommand, messages: _*)

private def cte(plan: LogicalPlan, namedPlans: (String, LogicalPlan)*): With = {
val ctes = namedPlans.map {
case (name, cte) =>
name -> SubqueryAlias(name, cte)
}
With(plan, ctes)
}

test("case insensitive") {
val plan = table("a").select(star())
assertEqual("sELEct * FroM a", plan)
Expand Down Expand Up @@ -74,13 +82,6 @@ class PlanParserSuite extends AnalysisTest {
}

test("common table expressions") {
def cte(plan: LogicalPlan, namedPlans: (String, LogicalPlan)*): With = {
val ctes = namedPlans.map {
case (name, cte) =>
name -> SubqueryAlias(name, cte)
}
With(plan, ctes)
}
assertEqual(
"with cte1 as (select * from a) select * from cte1",
cte(table("cte1").select(star()), "cte1" -> table("a").select(star())))
Expand Down Expand Up @@ -801,4 +802,20 @@ class PlanParserSuite extends AnalysisTest {
}.getMessage
assert(m2.contains("mismatched input 'IN' expecting"))
}

test("relation in v2 catalog") {
assertEqual("TABLE testcat.db.tab", table("testcat", "db", "tab"))
assertEqual("SELECT * FROM testcat.db.tab", table("testcat", "db", "tab").select(star()))

assertEqual(
"""
|WITH cte1 AS (SELECT * FROM testcat.db.tab)
|SELECT * FROM cte1
""".stripMargin,
cte(table("cte1").select(star()), "cte1" -> table("testcat", "db", "tab").select(star())))

assertEqual(
"SELECT /*+ BROADCAST(tab) */ * FROM testcat.db.tab",
table("testcat", "db", "tab").select(star()).hint("BROADCAST", $"tab"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,11 @@ class SparkSession private(
* @since 2.0.0
*/
def table(tableName: String): DataFrame = {
table(sessionState.sqlParser.parseTableIdentifier(tableName))
table(sessionState.sqlParser.parseMultipartIdentifier(tableName))
}

private[sql] def table(multipartIdentifier: Seq[String]): DataFrame = {
Dataset.ofRows(self, UnresolvedRelation(multipartIdentifier))
}

private[sql] def table(tableIdent: TableIdentifier): DataFrame = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ case class CreateViewCommand(
* Permanent views are not allowed to reference temp objects, including temp function and views
*/
private def verifyTemporaryObjectsNotExists(sparkSession: SparkSession): Unit = {
import sparkSession.sessionState.analyzer.AsTableIdentifier

if (!isTemporary) {
// This func traverses the unresolved plan `child`. Below are the reasons:
// 1) Analyzer replaces unresolved temporary views by a SubqueryAlias with the corresponding
Expand All @@ -190,10 +192,11 @@ case class CreateViewCommand(
// package (e.g., HiveGenericUDF).
child.collect {
// Disallow creating permanent views based on temporary views.
case s: UnresolvedRelation
if sparkSession.sessionState.catalog.isTemporaryTable(s.tableIdentifier) =>
case UnresolvedRelation(AsTableIdentifier(ident))
if sparkSession.sessionState.catalog.isTemporaryTable(ident) =>
// temporary views are only stored in the session catalog
throw new AnalysisException(s"Not allowed to create a permanent view $name by " +
s"referencing a temporary view ${s.tableIdentifier}")
s"referencing a temporary view $ident")
case other if !other.resolved => other.expressions.flatMap(_.collect {
// Disallow creating permanent views based on temporary UDFs.
case e: UnresolvedFunction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,16 @@ import org.apache.spark.sql.util.SchemaUtils
*/
class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] {
private def maybeSQLFile(u: UnresolvedRelation): Boolean = {
sparkSession.sessionState.conf.runSQLonFile && u.tableIdentifier.database.isDefined
sparkSession.sessionState.conf.runSQLonFile && u.multipartIdentifier.size == 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a separate problem, but could you also file an issue to fix this rule? This relies on rule ordering for correctness because it will convert any 2-part identifier to a SQL-on-file plan if the namespace is also a DataSource name. This should additionally check that the path either looks like a path (contains /) or exists in some file system.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

What should be the expected behavior for this test in sql.SQLQuerySuite."run sql directly on files"?

    e = intercept[AnalysisException] {
      sql("select * from json.invalid_file")
    }
    assert(e.message.contains("Path does not exist"))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be that the table could not be found. If there were some way to determine that it is a path, like checking for /, then that message would be correct. So it could be updated to this:

    e = intercept[AnalysisException] {
      sql("select * from json.`/invalid_file`")
    }
    assert(e.message.contains("Path does not exist"))

}

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case u: UnresolvedRelation if maybeSQLFile(u) =>
try {
val dataSource = DataSource(
sparkSession,
paths = u.tableIdentifier.table :: Nil,
className = u.tableIdentifier.database.get)
paths = u.multipartIdentifier.last :: Nil,
className = u.multipartIdentifier.head)

// `dataSource.providingClass` may throw ClassNotFoundException, then the outer try-catch
// will catch it and return the original plan, so that the analyzer can report table not
Expand All @@ -55,7 +55,7 @@ class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] {
if (!isFileFormat ||
dataSource.className.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Unsupported data source type for direct query on files: " +
s"${u.tableIdentifier.database.get}")
s"${dataSource.className}")
}
LogicalRelation(dataSource.resolveRelation())
} catch {
Expand Down
Loading