Skip to content

Commit d259020

Browse files
committed
[SPARK-27322][SQL] DataSourceV2: Select from multiple catalogs
1 parent 955eef9 commit d259020

File tree

15 files changed

+203
-33
lines changed

15 files changed

+203
-33
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,7 @@ queryTerm
397397

398398
queryPrimary
399399
: querySpecification #queryPrimaryDefault
400-
| TABLE tableIdentifier #table
400+
| TABLE multipartIdentifier #table
401401
| inlineTable #inlineTableDefault1
402402
| '(' queryNoWith ')' #subquery
403403
;
@@ -536,7 +536,7 @@ identifierComment
536536
;
537537

538538
relationPrimary
539-
: tableIdentifier sample? tableAlias #tableName
539+
: multipartIdentifier sample? tableAlias #tableName
540540
| '(' queryNoWith ')' sample? tableAlias #aliasedQuery
541541
| '(' relation ')' sample? tableAlias #aliasedRelation
542542
| inlineTable #inlineTableDefault2
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalog.v2
19+
20+
import org.apache.spark.sql.catalyst.{CatalogTableIdentifier, TableIdentifier, TableIdentifierLike}
21+
22+
/**
23+
* Resolve multipart table identifier to [[CatalogTableIdentifier]] or [[TableIdentifier]].
24+
*/
25+
trait TableIdentifierHelper extends LookupCatalog {
26+
import CatalogV2Implicits._
27+
28+
implicit class TableIdentifierHelper(parts: Seq[String]) {
29+
def asCatalogTableIdentifier: TableIdentifierLike = parts match {
30+
case CatalogObjectIdentifier(Some(catalog), ident) =>
31+
CatalogTableIdentifier(catalog.asTableCatalog, ident)
32+
33+
case AsTableIdentifier(tableIdentifier) =>
34+
tableIdentifier
35+
}
36+
}
37+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,8 @@ class Analyzer(
224224

225225
def substituteCTE(plan: LogicalPlan, cteRelations: Seq[(String, LogicalPlan)]): LogicalPlan = {
226226
plan resolveOperatorsDown {
227-
case u: UnresolvedRelation =>
228-
cteRelations.find(x => resolver(x._1, u.tableIdentifier.table))
229-
.map(_._2).getOrElse(u)
227+
case u @ UnresolvedRelation(TableIdentifier(table, _)) =>
228+
cteRelations.find(x => resolver(x._1, table)).map(_._2).getOrElse(u)
230229
case other =>
231230
// This cannot be done in ResolveSubquery because ResolveSubquery does not know the CTE.
232231
other transformExpressions {
@@ -718,7 +717,7 @@ class Analyzer(
718717
u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
719718
case other => i.copy(table = other)
720719
}
721-
case u: UnresolvedRelation => resolveRelation(u)
720+
case u @ UnresolvedRelation(_: TableIdentifier) => resolveRelation(u)
722721
}
723722

724723
// Look up the table with the given name from catalog. The database we used is decided by the

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.util.Locale
2222
import scala.collection.mutable
2323

2424
import org.apache.spark.sql.AnalysisException
25+
import org.apache.spark.sql.catalyst.TableIdentifier
2526
import org.apache.spark.sql.catalyst.expressions.IntegerLiteral
2627
import org.apache.spark.sql.catalyst.plans.logical._
2728
import org.apache.spark.sql.catalyst.rules.Rule
@@ -71,17 +72,18 @@ object ResolveHints {
7172

7273
val newNode = CurrentOrigin.withOrigin(plan.origin) {
7374
plan match {
74-
case ResolvedHint(u: UnresolvedRelation, hint)
75-
if relations.exists(resolver(_, u.tableIdentifier.table)) =>
76-
relations.remove(u.tableIdentifier.table)
75+
case ResolvedHint(u @ UnresolvedRelation(TableIdentifier(table, _)), hint)
76+
if relations.exists(resolver(_, table)) =>
77+
relations.remove(table)
7778
ResolvedHint(u, createHintInfo(hintName).merge(hint, handleOverriddenHintInfo))
7879
case ResolvedHint(r: SubqueryAlias, hint)
7980
if relations.exists(resolver(_, r.alias)) =>
8081
relations.remove(r.alias)
8182
ResolvedHint(r, createHintInfo(hintName).merge(hint, handleOverriddenHintInfo))
8283

83-
case u: UnresolvedRelation if relations.exists(resolver(_, u.tableIdentifier.table)) =>
84-
relations.remove(u.tableIdentifier.table)
84+
case UnresolvedRelation(TableIdentifier(table, _))
85+
if relations.exists(resolver(_, table)) =>
86+
relations.remove(table)
8587
ResolvedHint(plan, createHintInfo(hintName))
8688
case r: SubqueryAlias if relations.exists(resolver(_, r.alias)) =>
8789
relations.remove(r.alias)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.sql.catalyst.analysis
1919

2020
import org.apache.spark.sql.AnalysisException
21-
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
21+
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier, TableIdentifierLike}
2222
import org.apache.spark.sql.catalyst.errors.TreeNodeException
2323
import org.apache.spark.sql.catalyst.expressions._
2424
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
@@ -38,11 +38,13 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
3838
/**
3939
* Holds the name of a relation that has yet to be looked up in a catalog.
4040
*
41-
* @param tableIdentifier table name
41+
* @param table table name
4242
*/
43-
case class UnresolvedRelation(tableIdentifier: TableIdentifier)
43+
case class UnresolvedRelation(table: TableIdentifierLike)
4444
extends LeafNode {
4545

46+
def tableIdentifier: TableIdentifier = table.tableIdentifier
47+
4648
/** Returns a `.` separated name for this relation. */
4749
def tableName: String = tableIdentifier.unquotedString
4850

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.catalyst
1919

20+
import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog}
21+
2022
/**
2123
* An identifier that optionally specifies a database.
2224
*
@@ -64,14 +66,37 @@ object AliasIdentifier {
6466
def apply(identifier: String): AliasIdentifier = new AliasIdentifier(identifier)
6567
}
6668

69+
/**
70+
* An interface to ease transition from [[TableIdentifier]] to [[CatalogTableIdentifier]].
71+
*/
72+
sealed trait TableIdentifierLike {
73+
def tableIdentifier: TableIdentifier
74+
}
75+
76+
/**
77+
* A data source V2 table identifier.
78+
*
79+
* @param catalog a catalog plugin
80+
* @param ident an object identifier
81+
*/
82+
case class CatalogTableIdentifier(catalog: TableCatalog, ident: Identifier)
83+
extends TableIdentifierLike {
84+
85+
override def tableIdentifier: TableIdentifier =
86+
throw new UnsupportedOperationException(
87+
s"$this should not be used on non-DSv2 code path")
88+
}
89+
6790
/**
6891
* Identifies a table in a database.
6992
* If `database` is not defined, the current database is used.
7093
* When we register a permanent function in the FunctionRegistry, we use
7194
* unquotedString as the function name.
7295
*/
7396
case class TableIdentifier(table: String, database: Option[String])
74-
extends IdentifierWithDatabase {
97+
extends IdentifierWithDatabase with TableIdentifierLike {
98+
99+
override def tableIdentifier: TableIdentifier = this
75100

76101
override val identifier: String = table
77102

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, TerminalNode}
2929
import org.apache.spark.internal.Logging
3030
import org.apache.spark.sql.AnalysisException
3131
import org.apache.spark.sql.catalog.v2
32+
import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, TableIdentifierHelper}
3233
import org.apache.spark.sql.catalog.v2.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
3334
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
3435
import org.apache.spark.sql.catalyst.analysis._
@@ -49,11 +50,15 @@ import org.apache.spark.util.random.RandomSampler
4950
* The AstBuilder converts an ANTLR4 ParseTree into a catalyst Expression, LogicalPlan or
5051
* TableIdentifier.
5152
*/
52-
class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging {
53+
class AstBuilder(conf: SQLConf)
54+
extends SqlBaseBaseVisitor[AnyRef] with Logging with TableIdentifierHelper {
5355
import ParserUtils._
5456

5557
def this() = this(new SQLConf())
5658

59+
override protected def lookupCatalog(name: String): CatalogPlugin =
60+
throw new CatalogNotFoundException("No catalog lookup function")
61+
5762
protected def typedVisit[T](ctx: ParseTree): T = {
5863
ctx.accept(this).asInstanceOf[T]
5964
}
@@ -844,14 +849,15 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
844849
* }}}
845850
*/
846851
override def visitTable(ctx: TableContext): LogicalPlan = withOrigin(ctx) {
847-
UnresolvedRelation(visitTableIdentifier(ctx.tableIdentifier))
852+
val tableId = visitMultipartIdentifier(ctx.multipartIdentifier).asCatalogTableIdentifier
853+
UnresolvedRelation(tableId)
848854
}
849855

850856
/**
851857
* Create an aliased table reference. This is typically used in FROM clauses.
852858
*/
853859
override def visitTableName(ctx: TableNameContext): LogicalPlan = withOrigin(ctx) {
854-
val tableId = visitTableIdentifier(ctx.tableIdentifier)
860+
val tableId = visitMultipartIdentifier(ctx.multipartIdentifier).asCatalogTableIdentifier
855861
val table = mayApplyAliasPlan(ctx.tableAlias, UnresolvedRelation(tableId))
856862
table.optionalMap(ctx.sample)(withSample)
857863
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/TableIdentifierParserSuite.scala

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,28 @@
1717
package org.apache.spark.sql.catalyst.parser
1818

1919
import org.apache.spark.SparkFunSuite
20-
import org.apache.spark.sql.catalyst.TableIdentifier
20+
import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, Identifier, TableIdentifierHelper, TestTableCatalog}
21+
import org.apache.spark.sql.catalyst.{CatalogTableIdentifier, TableIdentifier}
2122
import org.apache.spark.sql.catalyst.plans.SQLHelper
2223
import org.apache.spark.sql.internal.SQLConf
24+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
2325

24-
class TableIdentifierParserSuite extends SparkFunSuite with SQLHelper {
26+
class TableIdentifierParserSuite extends SparkFunSuite with SQLHelper with TableIdentifierHelper {
2527
import CatalystSqlParser._
2628

29+
private val testCat = new TestTableCatalog {
30+
initialize("testcat", CaseInsensitiveStringMap.empty())
31+
}
32+
33+
private def findCatalog(name: String): CatalogPlugin = name match {
34+
case "testcat" =>
35+
testCat
36+
case _ =>
37+
throw new CatalogNotFoundException(s"$name not found")
38+
}
39+
40+
override protected def lookupCatalog(name: String): CatalogPlugin = findCatalog(name)
41+
2742
// Add "$elem$", "$value$" & "$key$"
2843
// It is recommended to list them in alphabetical order.
2944
val hiveNonReservedKeyword = Array(
@@ -699,4 +714,11 @@ class TableIdentifierParserSuite extends SparkFunSuite with SQLHelper {
699714
val complexName2 = TableIdentifier("x``y", Some("d``b"))
700715
assert(complexName2 === parseTableIdentifier(complexName2.quotedString))
701716
}
717+
718+
test("multipart table identifier") {
719+
assert(parseMultipartIdentifier("testcat.v2tbl").asCatalogTableIdentifier ===
720+
CatalogTableIdentifier(testCat, Identifier.of(Array.empty, "v2tbl")))
721+
assert(parseMultipartIdentifier("db.tbl").asCatalogTableIdentifier ===
722+
TableIdentifier("tbl", Some("db")))
723+
}
702724
}

sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.internal.Logging
3333
import org.apache.spark.rdd.RDD
3434
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
3535
import org.apache.spark.sql.catalog.Catalog
36-
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Catalogs}
36+
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Catalogs, TableIdentifierHelper}
3737
import org.apache.spark.sql.catalyst._
3838
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
3939
import org.apache.spark.sql.catalyst.encoders._
@@ -82,7 +82,7 @@ class SparkSession private(
8282
@transient private val existingSharedState: Option[SharedState],
8383
@transient private val parentSessionState: Option[SessionState],
8484
@transient private[sql] val extensions: SparkSessionExtensions)
85-
extends Serializable with Closeable with Logging { self =>
85+
extends Serializable with Closeable with Logging with TableIdentifierHelper { self =>
8686

8787
// The call site where this SparkSession was constructed.
8888
private val creationSite: CallSite = Utils.getCallSite()
@@ -613,6 +613,8 @@ class SparkSession private(
613613
catalogs.getOrElseUpdate(name, Catalogs.load(name, sessionState.conf))
614614
}
615615

616+
override def lookupCatalog(name: String): CatalogPlugin = catalog(name)
617+
616618
/**
617619
* Returns the specified table/view as a `DataFrame`.
618620
*
@@ -624,10 +626,10 @@ class SparkSession private(
624626
* @since 2.0.0
625627
*/
626628
def table(tableName: String): DataFrame = {
627-
table(sessionState.sqlParser.parseTableIdentifier(tableName))
629+
table(sessionState.sqlParser.parseMultipartIdentifier(tableName).asCatalogTableIdentifier)
628630
}
629631

630-
private[sql] def table(tableIdent: TableIdentifier): DataFrame = {
632+
private[sql] def table(tableIdent: TableIdentifierLike): DataFrame = {
631633
Dataset.ofRows(self, UnresolvedRelation(tableIdent))
632634
}
633635

sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,10 +190,10 @@ case class CreateViewCommand(
190190
// package (e.g., HiveGenericUDF).
191191
child.collect {
192192
// Disallow creating permanent views based on temporary views.
193-
case s: UnresolvedRelation
194-
if sparkSession.sessionState.catalog.isTemporaryTable(s.tableIdentifier) =>
193+
case UnresolvedRelation(tableIdentifier: TableIdentifier)
194+
if sparkSession.sessionState.catalog.isTemporaryTable(tableIdentifier) =>
195195
throw new AnalysisException(s"Not allowed to create a permanent view $name by " +
196-
s"referencing a temporary view ${s.tableIdentifier}")
196+
s"referencing a temporary view ${tableIdentifier}")
197197
case other if !other.resolved => other.expressions.flatMap(_.collect {
198198
// Disallow creating permanent views based on temporary UDFs.
199199
case e: UnresolvedFunction

0 commit comments

Comments
 (0)