-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-27322][SQL] DataSourceV2 table relation #24741
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ | |
import org.apache.spark.sql.catalyst.rules._ | ||
import org.apache.spark.sql.catalyst.trees.TreeNodeRef | ||
import org.apache.spark.sql.catalyst.util.toPrettySQL | ||
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation | ||
import org.apache.spark.sql.internal.SQLConf | ||
import org.apache.spark.sql.types._ | ||
|
||
|
@@ -162,6 +163,7 @@ class Analyzer( | |
new SubstituteUnresolvedOrdinals(conf)), | ||
Batch("Resolution", fixedPoint, | ||
ResolveTableValuedFunctions :: | ||
ResolveTables :: | ||
ResolveRelations :: | ||
ResolveReferences :: | ||
ResolveCreateNamedStruct :: | ||
|
@@ -226,7 +228,7 @@ class Analyzer( | |
|
||
def substituteCTE(plan: LogicalPlan, cteName: String, ctePlan: LogicalPlan): LogicalPlan = { | ||
plan resolveOperatorsUp { | ||
case UnresolvedRelation(TableIdentifier(table, None)) if resolver(cteName, table) => | ||
case UnresolvedRelation(Seq(table)) if resolver(cteName, table) => | ||
ctePlan | ||
case u: UnresolvedRelation => | ||
u | ||
|
@@ -657,6 +659,20 @@ class Analyzer( | |
} | ||
} | ||
|
||
/** | ||
* Resolve table relations with concrete relations from v2 catalog. | ||
jzhuge marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* | ||
* [[ResolveRelations]] still resolves v1 tables. | ||
*/ | ||
object ResolveTables extends Rule[LogicalPlan] { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
- * Resolve table relations with concrete relations from v2 catalog.
- *
- * [[ResolveRelations]] still resolves v1 tables.
+ * Replaces [[UnresolvedRelation]]s with concrete relations from the v2 catalog.
*/
- object ResolveTables extends Rule[LogicalPlan] {
+ object ResolveV2Relations extends Rule[LogicalPlan] { There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please ignore the above comment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Name it |
||
import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util._ | ||
|
||
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { | ||
case u @ UnresolvedRelation(CatalogObjectIdentifier(Some(catalogPlugin), ident)) => | ||
loadTable(catalogPlugin, ident).map(DataSourceV2Relation.create).getOrElse(u) | ||
} | ||
} | ||
|
||
/** | ||
* Replaces [[UnresolvedRelation]]s with concrete relations from the catalog. | ||
*/ | ||
|
@@ -689,10 +705,15 @@ class Analyzer( | |
// Note this is compatible with the views defined by older versions of Spark(before 2.2), which | ||
// have empty defaultDatabase and all the relations in viewText have database part defined. | ||
def resolveRelation(plan: LogicalPlan): LogicalPlan = plan match { | ||
case u: UnresolvedRelation if !isRunningDirectlyOnFiles(u.tableIdentifier) => | ||
case u @ UnresolvedRelation(AsTableIdentifier(ident)) if !isRunningDirectlyOnFiles(ident) => | ||
val defaultDatabase = AnalysisContext.get.defaultDatabase | ||
val foundRelation = lookupTableFromCatalog(u, defaultDatabase) | ||
resolveRelation(foundRelation) | ||
val foundRelation = lookupTableFromCatalog(ident, u, defaultDatabase) | ||
if (foundRelation != u) { | ||
resolveRelation(foundRelation) | ||
} else { | ||
u | ||
} | ||
|
||
// The view's child should be a logical plan parsed from the `desc.viewText`, the variable | ||
// `viewText` should be defined, or else we throw an error on the generation of the View | ||
// operator. | ||
|
@@ -715,8 +736,9 @@ class Analyzer( | |
} | ||
|
||
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { | ||
case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => | ||
EliminateSubqueryAliases(lookupTableFromCatalog(u)) match { | ||
case i @ InsertIntoTable(u @ UnresolvedRelation(AsTableIdentifier(ident)), _, child, _, _) | ||
if child.resolved => | ||
EliminateSubqueryAliases(lookupTableFromCatalog(ident, u)) match { | ||
case v: View => | ||
u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.") | ||
case other => i.copy(table = other) | ||
|
@@ -731,20 +753,16 @@ class Analyzer( | |
// and the default database is only used to look up a view); | ||
// 3. Use the currentDb of the SessionCatalog. | ||
private def lookupTableFromCatalog( | ||
tableIdentifier: TableIdentifier, | ||
u: UnresolvedRelation, | ||
defaultDatabase: Option[String] = None): LogicalPlan = { | ||
val tableIdentWithDb = u.tableIdentifier.copy( | ||
database = u.tableIdentifier.database.orElse(defaultDatabase)) | ||
val tableIdentWithDb = tableIdentifier.copy( | ||
database = tableIdentifier.database.orElse(defaultDatabase)) | ||
try { | ||
catalog.lookupRelation(tableIdentWithDb) | ||
} catch { | ||
case e: NoSuchTableException => | ||
u.failAnalysis(s"Table or view not found: ${tableIdentWithDb.unquotedString}", e) | ||
// If the database is defined and that database is not found, throw an AnalysisException. | ||
// Note that if the database is not defined, it is possible we are looking up a temp view. | ||
case e: NoSuchDatabaseException => | ||
u.failAnalysis(s"Table or view not found: ${tableIdentWithDb.unquotedString}, the " + | ||
s"database ${e.db} doesn't exist.", e) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The original error messages are still helpful. Let us keep it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately not possible since Analysis exception is thrown by checkAnalysis now:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gatorsmile, we plan to update I should also note that |
||
case _: NoSuchTableException | _: NoSuchDatabaseException => | ||
u | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should add some comments to explain why we need to delay the exception here. To me it's because we still have a chance to resolve the table relation with v2 rules. |
||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,16 +37,16 @@ import org.apache.spark.sql.util.SchemaUtils | |
*/ | ||
class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] { | ||
private def maybeSQLFile(u: UnresolvedRelation): Boolean = { | ||
sparkSession.sessionState.conf.runSQLonFile && u.tableIdentifier.database.isDefined | ||
sparkSession.sessionState.conf.runSQLonFile && u.multipartIdentifier.size == 2 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a separate problem, but could you also file an issue to fix this rule? This relies on rule ordering for correctness because it will convert any 2-part identifier to a SQL-on-file plan if the namespace is also a DataSource name. This should additionally check that the path either looks like a path (contains There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good. What should be the expected behavior for this test in sql.SQLQuerySuite."run sql directly on files"?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it should be that the table could not be found. If there were some way to determine that it is a path, like checking for e = intercept[AnalysisException] {
sql("select * from json.`/invalid_file`")
}
assert(e.message.contains("Path does not exist")) |
||
} | ||
|
||
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | ||
case u: UnresolvedRelation if maybeSQLFile(u) => | ||
try { | ||
val dataSource = DataSource( | ||
sparkSession, | ||
paths = u.tableIdentifier.table :: Nil, | ||
className = u.tableIdentifier.database.get) | ||
paths = u.multipartIdentifier.last :: Nil, | ||
className = u.multipartIdentifier.head) | ||
|
||
// `dataSource.providingClass` may throw ClassNotFoundException, then the outer try-catch | ||
// will catch it and return the original plan, so that the analyzer can report table not | ||
|
@@ -55,7 +55,7 @@ class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] { | |
if (!isFileFormat || | ||
dataSource.className.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) { | ||
throw new AnalysisException("Unsupported data source type for direct query on files: " + | ||
s"${u.tableIdentifier.database.get}") | ||
s"${dataSource.className}") | ||
} | ||
LogicalRelation(dataSource.resolveRelation()) | ||
} catch { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, we return
None
forNoSuchTableException
only and propagate exceptions for all catalog errors likeCatalogNotFoundException
fromloadTable
andAnalysisException
fromasTableCatalog
?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
BTW, I don't think TableCatalog.loadTable throws CatalogNotFoundException because catalog plugin has already been found.