-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-27322][SQL] DataSourceV2 table relation #24741
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #105947 has finished for PR 24741 at commit
|
There are 2 major design points in this PR:
I discovered these sore spots so far in the transition from TableIdentifier to CatalogTableIdentifier:
Alternatives to resolving multipart table identifier in AstBuilder
|
sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Outdated
Show resolved
Hide resolved
Test build #105973 has finished for PR 24741 at commit
|
Please hold off review. Testing changes suggested by Ryan to move resolution to Analyzer. |
@jzhuge and I have been working on a version that does the table resolution in the analyzer instead of in AstBuilder, which should be cleaner to keep the parser code separate from the implementation. |
Test build #106049 has finished for PR 24741 at commit
|
Test build #106050 has finished for PR 24741 at commit
|
Test build #106060 has finished for PR 24741 at commit
|
@cloud-fan @dongjoon-hyun @HyukjinKwon This PR is ready for review. @rdblue and I have switched Relation Resolution from AstBuilder (as in the original commit) to |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
Outdated
Show resolved
Hide resolved
Rebased and squashed. |
Test build #106159 has finished for PR 24741 at commit
|
@@ -153,7 +153,8 @@ object HiveAnalysis extends Rule[LogicalPlan] { | |||
case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) => | |||
CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore) | |||
|
|||
case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) => | |||
case CreateTable(tableDesc, mode, Some(query)) | |||
if DDLUtils.isHiveTable(tableDesc) && query.resolved => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds like a separate issue. Could we submit a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR prevents lookupTableFromCatalog from throwing NoSuchTableException right away. Instead, it relies on checkAnalysis to throw an exception for UnresolvedRelation.
The test hive.SQLQuerySuite."double nested data" would fail in the following sql without this change:
CREATE TABLE test_ctas_1234 AS SELECT * from notexists
HiveAnalysis gets to run before checkAnalysis, thus exposing this bug where query.output is used before query is resolved.
So wouldn't say it is a totally separate issue.
In addition, outside of this PR, it'd be hard to write a unit test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with John. This is needed as a consequence of fixing the ResolveRelations
rule to no longer throw AnalysisException
if it can't resolve the name and doesn't think that ResolveSQLOnFile
would either.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala
Outdated
Show resolved
Hide resolved
@@ -185,6 +186,8 @@ abstract class BaseSessionStateBuilder( | |||
V2WriteSupportCheck +: | |||
V2StreamingScanSupportCheck +: | |||
customCheckRules | |||
|
|||
override protected def lookupCatalog(name: String): CatalogPlugin = session.catalog(name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not catalog(name)
? Any difference between catalog(name)
and session.catalog(name)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SparkSession.catalog returns CatalogPlugin for DSv2 while BaseSessionStateBuilder.catalog is a SessionCatalog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might not get the difference. How to avoid misusing them?
- One is our global catalog? Another is the local catalogs?
- What is the semantics of
lookupCatalog
for SessionCatalog? - What is the semantics of
lookupCatalog
for CatalogPlugin for DSv2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#24768 passes a single LookupCatalog (the analyzer), as you suggested in the other comment. I like using the same lookup everywhere instead of having multiple classes implement LookupCatalog.
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
Show resolved
Hide resolved
// Note that if the database is not defined, it is possible we are looking up a temp view. | ||
case e: NoSuchDatabaseException => | ||
u.failAnalysis(s"Table or view not found: ${tableIdentWithDb.unquotedString}, the " + | ||
s"database ${e.db} doesn't exist.", e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original error messages are still helpful. Let us keep it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately not possible since Analysis exception is thrown by checkAnalysis now:
case u: UnresolvedRelation =>
u.failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile, we plan to update checkAnalysis
to produce more friendly error messages, but not until #24560 is merged. Without that, we can't check whether the namespace exists to produce the right error message.
I should also note that checkAnalysis
is the right place for the exception to be thrown. Individual rules should not fail analysis. In this case, a different rule for looking up tables in v2 catalogs is used. And later, an UnresolvedRelation
could be resolved by an independent ResolveViews
rule. Allowing these rules to be separate makes them smaller and doesn't mix view handling and table handling, as we see in this current rule.
I did a quick pass and left a few comments. @jzhuge Thank you for your work! @jiangxb1987 Please take a look, especially about the test case coverage? |
Rebase and fix review comments. |
Test build #106190 has finished for PR 24741 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
Outdated
Show resolved
Hide resolved
Test build #106392 has finished for PR 24741 at commit
|
Rebase and squash |
Test build #106397 has finished for PR 24741 at commit
|
Test build #106399 has finished for PR 24741 at commit
|
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
Show resolved
Hide resolved
try { | ||
Option(catalog.asTableCatalog.loadTable(ident)) | ||
} catch { | ||
case _: NoSuchTableException => None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, we return None
for NoSuchTableException
only and propagate exceptions for all catalog errors like CatalogNotFoundException
from loadTable
and AnalysisException
from asTableCatalog
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
BTW, I don't think TableCatalog.loadTable throws CatalogNotFoundException because catalog plugin has already been found.
* | ||
* [[ResolveRelations]] still resolves v1 tables. | ||
*/ | ||
object ResolveTables extends Rule[LogicalPlan] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use ResolveV2Relations
instead in order to avoid those confusion?
- * Resolve table relations with concrete relations from v2 catalog.
- *
- * [[ResolveRelations]] still resolves v1 tables.
+ * Replaces [[UnresolvedRelation]]s with concrete relations from the v2 catalog.
*/
- object ResolveTables extends Rule[LogicalPlan] {
+ object ResolveV2Relations extends Rule[LogicalPlan] {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please ignore the above comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Name it ResolveTables
because there may be a new rule ResolveViews
down the road, which will be part of ViewCatalog effort. More details to come.
Test build #106439 has finished for PR 24741 at commit
|
@@ -1694,8 +1694,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { | |||
e = intercept[AnalysisException] { | |||
sql(s"select id from `org.apache.spark.sql.sources.HadoopFsRelationProvider`.`file_path`") | |||
} | |||
assert(e.message.contains("Table or view not found: " + | |||
"`org.apache.spark.sql.sources.HadoopFsRelationProvider`.`file_path`")) | |||
assert(e.message.contains("Table or view not found")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit. Shall we keep the original form because only backticks are gone?
assert(e.message.contains("Table or view not found: " +
"`org.apache.spark.sql.sources.HadoopFsRelationProvider`.file_path"))
Mostly, looks correct. We need to fix |
Test build #106449 has finished for PR 24741 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
u.failAnalysis(s"Table or view not found: ${tableIdentWithDb.unquotedString}, the " + | ||
s"database ${e.db} doesn't exist.", e) | ||
case _: NoSuchTableException | _: NoSuchDatabaseException => | ||
u |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add some comments to explain why we need to delay the exception here. To me it's because we still have a chance to resolve the table relation with v2 rules.
Thanks @cloud-fan @dongjoon-hyun @gatorsmile @rdblue for the excellent reviews! Thanks @rdblue for the great help! |
## What changes were proposed in this pull request? Support multi-catalog in the following SELECT code paths: - SELECT * FROM catalog.db.tbl - TABLE catalog.db.tbl - JOIN or UNION tables from different catalogs - SparkSession.table("catalog.db.tbl") - CTE relation - View text ## How was this patch tested? New unit tests. All existing unit tests in catalyst and sql core. Closes apache#24741 from jzhuge/SPARK-27322-pr. Authored-by: John Zhuge <jzhuge@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Support multi-catalog in the following SELECT code paths:
How was this patch tested?
New unit tests.
All existing unit tests in catalyst and sql core.